blob: 14f9056e041a9864159225a5afc580d69da9ac22 [file] [log] [blame]
use crate::{
alias::{AsyncHeapCons, AsyncHeapProd, AsyncHeapRb},
async_transfer,
traits::*,
};
use alloc::vec::Vec;
use core::{
marker::PhantomData,
sync::atomic::{AtomicUsize, Ordering},
};
use futures::task::{noop_waker_ref, AtomicWaker};
#[cfg(feature = "std")]
use std::sync::Arc;
#[test]
fn atomic_waker() {
let waker = AtomicWaker::new();
assert!(waker.take().is_none());
waker.register(noop_waker_ref());
assert!(waker.take().is_some());
assert!(waker.take().is_none());
waker.register(noop_waker_ref());
waker.wake();
assert!(waker.take().is_none());
}
#[test]
fn send_sync() {
struct Check<T: Send + Sync>(PhantomData<T>);
let _ = Check::<AsyncHeapRb<i32>>(PhantomData);
let _ = Check::<AsyncHeapProd<i32>>(PhantomData);
let _ = Check::<AsyncHeapCons<i32>>(PhantomData);
}
macro_rules! execute {
( $( $tasks:expr ),* $(,)? ) => {
futures::executor::block_on(async {
futures::join!($($tasks),*)
});
};
}
const COUNT: usize = 16;
#[test]
fn push_pop() {
let (prod, cons) = AsyncHeapRb::<usize>::new(2).split();
execute!(
async move {
let mut prod = prod;
for i in 0..COUNT {
prod.push(i).await.unwrap();
}
},
async move {
let mut cons = cons;
for i in 0..COUNT {
assert_eq!(cons.pop().await.unwrap(), i);
}
assert!(cons.pop().await.is_none());
},
);
}
#[test]
fn push_pop_slice() {
let (prod, cons) = AsyncHeapRb::<usize>::new(3).split();
execute!(
async move {
let mut prod = prod;
let data = (0..COUNT).collect::<Vec<_>>();
prod.push_exact(&data).await.unwrap();
},
async move {
let mut cons = cons;
let mut data = [0; COUNT + 1];
let count = cons.pop_exact(&mut data).await.unwrap_err();
assert_eq!(count, COUNT);
assert!(data.into_iter().take(COUNT).eq(0..COUNT));
},
);
}
#[test]
fn push_pop_vec() {
let (prod, cons) = AsyncHeapRb::<usize>::new(3).split();
execute!(
async move {
let mut prod = prod;
let data = (0..COUNT).collect::<Vec<_>>();
prod.push_exact(&data).await.unwrap();
},
async move {
let mut cons = cons;
let mut data = Vec::new();
cons.pop_until_end(&mut data).await;
assert_eq!(data.len(), COUNT);
assert!(data.into_iter().eq(0..COUNT));
},
);
}
#[test]
fn sink_stream() {
use futures::{
sink::SinkExt,
stream::{self, StreamExt},
};
let (prod, cons) = AsyncHeapRb::<usize>::new(2).split();
execute!(
async move {
let mut prod = prod;
let mut input = stream::iter(0..COUNT).map(Ok);
prod.send_all(&mut input).await.unwrap();
},
async move {
let cons = cons;
assert_eq!(
cons.fold(0, |s, x| async move {
assert_eq!(s, x);
s + 1
})
.await,
COUNT
);
},
);
}
#[cfg(feature = "std")]
#[test]
fn read_write() {
use futures::{AsyncReadExt, AsyncWriteExt};
let (prod, cons) = AsyncHeapRb::<u8>::new(3).split();
let input = (0..255).cycle().take(COUNT);
let output = input.clone();
execute!(
async move {
let mut prod = prod;
let data = input.collect::<Vec<_>>();
prod.write_all(&data).await.unwrap();
},
async move {
let mut cons = cons;
let mut data = Vec::new();
let count = cons.read_to_end(&mut data).await.unwrap();
assert_eq!(count, COUNT);
assert!(data.into_iter().take(COUNT).eq(output));
},
);
}
#[test]
fn transfer() {
use futures::stream::StreamExt;
let (src_prod, src_cons) = AsyncHeapRb::<usize>::new(3).split();
let (dst_prod, dst_cons) = AsyncHeapRb::<usize>::new(5).split();
execute!(
async move {
let mut prod = src_prod;
assert!(prod.push_iter_all(0..COUNT).await);
},
async move {
let mut src = src_cons;
let mut dst = dst_prod;
async_transfer(&mut src, &mut dst, None).await
},
async move {
let cons = dst_cons;
assert_eq!(
cons.fold(0, |s, x| async move {
assert_eq!(s, x);
s + 1
})
.await,
COUNT
);
},
);
}
#[test]
fn wait() {
let (mut prod, mut cons) = AsyncHeapRb::<usize>::new(3).split();
let stage = AtomicUsize::new(0);
execute!(
async {
prod.push(0).await.unwrap();
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0);
prod.push(1).await.unwrap();
prod.wait_vacant(2).await;
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2);
},
async {
cons.wait_occupied(2).await;
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1);
cons.pop().await.unwrap();
},
);
}
#[cfg(feature = "std")]
#[test]
fn drop_close_prod() {
let (prod, mut cons) = AsyncHeapRb::<usize>::new(1).split();
let stage = Arc::new(AtomicUsize::new(0));
let stage_clone = stage.clone();
let t0 = std::thread::spawn(move || {
execute!(async {
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0);
drop(prod);
});
});
let t1 = std::thread::spawn(move || {
execute!(async {
cons.wait_occupied(1).await;
assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1);
assert!(cons.is_closed());
});
});
t0.join().unwrap();
t1.join().unwrap();
}
#[cfg(feature = "std")]
#[test]
fn drop_close_cons() {
let (mut prod, mut cons) = AsyncHeapRb::<usize>::new(1).split();
let stage = Arc::new(AtomicUsize::new(0));
let stage_clone = stage.clone();
let t0 = std::thread::spawn(move || {
execute!(async {
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0);
prod.push(0).await.unwrap();
prod.wait_vacant(1).await;
assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2);
assert!(prod.is_closed());
});
});
let t1 = std::thread::spawn(move || {
execute!(async {
cons.wait_occupied(1).await;
assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1);
drop(cons);
});
});
t0.join().unwrap();
t1.join().unwrap();
}