| use crate::{ |
| alias::{AsyncHeapCons, AsyncHeapProd, AsyncHeapRb}, |
| async_transfer, |
| traits::*, |
| }; |
| use alloc::vec::Vec; |
| use core::{ |
| marker::PhantomData, |
| sync::atomic::{AtomicUsize, Ordering}, |
| }; |
| use futures::task::{noop_waker_ref, AtomicWaker}; |
| #[cfg(feature = "std")] |
| use std::sync::Arc; |
| |
| #[test] |
| fn atomic_waker() { |
| let waker = AtomicWaker::new(); |
| assert!(waker.take().is_none()); |
| |
| waker.register(noop_waker_ref()); |
| assert!(waker.take().is_some()); |
| assert!(waker.take().is_none()); |
| |
| waker.register(noop_waker_ref()); |
| waker.wake(); |
| assert!(waker.take().is_none()); |
| } |
| |
| #[test] |
| fn send_sync() { |
| struct Check<T: Send + Sync>(PhantomData<T>); |
| let _ = Check::<AsyncHeapRb<i32>>(PhantomData); |
| let _ = Check::<AsyncHeapProd<i32>>(PhantomData); |
| let _ = Check::<AsyncHeapCons<i32>>(PhantomData); |
| } |
| |
| macro_rules! execute { |
| ( $( $tasks:expr ),* $(,)? ) => { |
| futures::executor::block_on(async { |
| futures::join!($($tasks),*) |
| }); |
| }; |
| } |
| |
| const COUNT: usize = 16; |
| |
| #[test] |
| fn push_pop() { |
| let (prod, cons) = AsyncHeapRb::<usize>::new(2).split(); |
| execute!( |
| async move { |
| let mut prod = prod; |
| for i in 0..COUNT { |
| prod.push(i).await.unwrap(); |
| } |
| }, |
| async move { |
| let mut cons = cons; |
| for i in 0..COUNT { |
| assert_eq!(cons.pop().await.unwrap(), i); |
| } |
| assert!(cons.pop().await.is_none()); |
| }, |
| ); |
| } |
| |
| #[test] |
| fn push_pop_slice() { |
| let (prod, cons) = AsyncHeapRb::<usize>::new(3).split(); |
| execute!( |
| async move { |
| let mut prod = prod; |
| let data = (0..COUNT).collect::<Vec<_>>(); |
| prod.push_exact(&data).await.unwrap(); |
| }, |
| async move { |
| let mut cons = cons; |
| let mut data = [0; COUNT + 1]; |
| let count = cons.pop_exact(&mut data).await.unwrap_err(); |
| assert_eq!(count, COUNT); |
| assert!(data.into_iter().take(COUNT).eq(0..COUNT)); |
| }, |
| ); |
| } |
| |
| #[test] |
| fn push_pop_vec() { |
| let (prod, cons) = AsyncHeapRb::<usize>::new(3).split(); |
| execute!( |
| async move { |
| let mut prod = prod; |
| let data = (0..COUNT).collect::<Vec<_>>(); |
| prod.push_exact(&data).await.unwrap(); |
| }, |
| async move { |
| let mut cons = cons; |
| let mut data = Vec::new(); |
| cons.pop_until_end(&mut data).await; |
| assert_eq!(data.len(), COUNT); |
| assert!(data.into_iter().eq(0..COUNT)); |
| }, |
| ); |
| } |
| |
| #[test] |
| fn sink_stream() { |
| use futures::{ |
| sink::SinkExt, |
| stream::{self, StreamExt}, |
| }; |
| let (prod, cons) = AsyncHeapRb::<usize>::new(2).split(); |
| execute!( |
| async move { |
| let mut prod = prod; |
| let mut input = stream::iter(0..COUNT).map(Ok); |
| prod.send_all(&mut input).await.unwrap(); |
| }, |
| async move { |
| let cons = cons; |
| assert_eq!( |
| cons.fold(0, |s, x| async move { |
| assert_eq!(s, x); |
| s + 1 |
| }) |
| .await, |
| COUNT |
| ); |
| }, |
| ); |
| } |
| |
| #[cfg(feature = "std")] |
| #[test] |
| fn read_write() { |
| use futures::{AsyncReadExt, AsyncWriteExt}; |
| let (prod, cons) = AsyncHeapRb::<u8>::new(3).split(); |
| let input = (0..255).cycle().take(COUNT); |
| let output = input.clone(); |
| execute!( |
| async move { |
| let mut prod = prod; |
| let data = input.collect::<Vec<_>>(); |
| prod.write_all(&data).await.unwrap(); |
| }, |
| async move { |
| let mut cons = cons; |
| let mut data = Vec::new(); |
| let count = cons.read_to_end(&mut data).await.unwrap(); |
| assert_eq!(count, COUNT); |
| assert!(data.into_iter().take(COUNT).eq(output)); |
| }, |
| ); |
| } |
| |
| #[test] |
| fn transfer() { |
| use futures::stream::StreamExt; |
| let (src_prod, src_cons) = AsyncHeapRb::<usize>::new(3).split(); |
| let (dst_prod, dst_cons) = AsyncHeapRb::<usize>::new(5).split(); |
| execute!( |
| async move { |
| let mut prod = src_prod; |
| assert!(prod.push_iter_all(0..COUNT).await); |
| }, |
| async move { |
| let mut src = src_cons; |
| let mut dst = dst_prod; |
| async_transfer(&mut src, &mut dst, None).await |
| }, |
| async move { |
| let cons = dst_cons; |
| assert_eq!( |
| cons.fold(0, |s, x| async move { |
| assert_eq!(s, x); |
| s + 1 |
| }) |
| .await, |
| COUNT |
| ); |
| }, |
| ); |
| } |
| |
| #[test] |
| fn wait() { |
| let (mut prod, mut cons) = AsyncHeapRb::<usize>::new(3).split(); |
| let stage = AtomicUsize::new(0); |
| execute!( |
| async { |
| prod.push(0).await.unwrap(); |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); |
| prod.push(1).await.unwrap(); |
| |
| prod.wait_vacant(2).await; |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2); |
| }, |
| async { |
| cons.wait_occupied(2).await; |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 1); |
| |
| cons.pop().await.unwrap(); |
| }, |
| ); |
| } |
| |
| #[cfg(feature = "std")] |
| #[test] |
| fn drop_close_prod() { |
| let (prod, mut cons) = AsyncHeapRb::<usize>::new(1).split(); |
| let stage = Arc::new(AtomicUsize::new(0)); |
| let stage_clone = stage.clone(); |
| let t0 = std::thread::spawn(move || { |
| execute!(async { |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); |
| drop(prod); |
| }); |
| }); |
| let t1 = std::thread::spawn(move || { |
| execute!(async { |
| cons.wait_occupied(1).await; |
| assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1); |
| assert!(cons.is_closed()); |
| }); |
| }); |
| t0.join().unwrap(); |
| t1.join().unwrap(); |
| } |
| |
| #[cfg(feature = "std")] |
| #[test] |
| fn drop_close_cons() { |
| let (mut prod, mut cons) = AsyncHeapRb::<usize>::new(1).split(); |
| let stage = Arc::new(AtomicUsize::new(0)); |
| let stage_clone = stage.clone(); |
| let t0 = std::thread::spawn(move || { |
| execute!(async { |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 0); |
| prod.push(0).await.unwrap(); |
| |
| prod.wait_vacant(1).await; |
| assert_eq!(stage.fetch_add(1, Ordering::SeqCst), 2); |
| assert!(prod.is_closed()); |
| }); |
| }); |
| let t1 = std::thread::spawn(move || { |
| execute!(async { |
| cons.wait_occupied(1).await; |
| assert_eq!(stage_clone.fetch_add(1, Ordering::SeqCst), 1); |
| drop(cons); |
| }); |
| }); |
| t0.join().unwrap(); |
| t1.join().unwrap(); |
| } |