use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::thread::sleep; | |
use std::time::Duration; | |
use async_channel::{unbounded, RecvError, SendError, TryRecvError, TrySendError}; | |
use easy_parallel::Parallel; | |
use futures_lite::{future, prelude::*}; | |
fn ms(ms: u64) -> Duration { | |
Duration::from_millis(ms) | |
} | |
#[test] | |
fn smoke() { | |
let (s, r) = unbounded(); | |
s.try_send(7).unwrap(); | |
assert_eq!(r.try_recv(), Ok(7)); | |
future::block_on(s.send(8)).unwrap(); | |
assert_eq!(future::block_on(r.recv()), Ok(8)); | |
assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
} | |
#[test] | |
fn capacity() { | |
let (s, r) = unbounded::<()>(); | |
assert_eq!(s.capacity(), None); | |
assert_eq!(r.capacity(), None); | |
} | |
#[test] | |
fn len_empty_full() { | |
let (s, r) = unbounded(); | |
assert_eq!(s.len(), 0); | |
assert_eq!(s.is_empty(), true); | |
assert_eq!(s.is_full(), false); | |
assert_eq!(r.len(), 0); | |
assert_eq!(r.is_empty(), true); | |
assert_eq!(r.is_full(), false); | |
future::block_on(s.send(())).unwrap(); | |
assert_eq!(s.len(), 1); | |
assert_eq!(s.is_empty(), false); | |
assert_eq!(s.is_full(), false); | |
assert_eq!(r.len(), 1); | |
assert_eq!(r.is_empty(), false); | |
assert_eq!(r.is_full(), false); | |
future::block_on(r.recv()).unwrap(); | |
assert_eq!(s.len(), 0); | |
assert_eq!(s.is_empty(), true); | |
assert_eq!(s.is_full(), false); | |
assert_eq!(r.len(), 0); | |
assert_eq!(r.is_empty(), true); | |
assert_eq!(r.is_full(), false); | |
} | |
#[test] | |
fn try_recv() { | |
let (s, r) = unbounded(); | |
Parallel::new() | |
.add(move || { | |
assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
sleep(ms(1500)); | |
assert_eq!(r.try_recv(), Ok(7)); | |
sleep(ms(500)); | |
assert_eq!(r.try_recv(), Err(TryRecvError::Closed)); | |
}) | |
.add(move || { | |
sleep(ms(1000)); | |
future::block_on(s.send(7)).unwrap(); | |
}) | |
.run(); | |
} | |
#[test] | |
fn recv() { | |
let (s, r) = unbounded(); | |
Parallel::new() | |
.add(move || { | |
assert_eq!(future::block_on(r.recv()), Ok(7)); | |
sleep(ms(1000)); | |
assert_eq!(future::block_on(r.recv()), Ok(8)); | |
sleep(ms(1000)); | |
assert_eq!(future::block_on(r.recv()), Ok(9)); | |
assert_eq!(future::block_on(r.recv()), Err(RecvError)); | |
}) | |
.add(move || { | |
sleep(ms(1500)); | |
future::block_on(s.send(7)).unwrap(); | |
future::block_on(s.send(8)).unwrap(); | |
future::block_on(s.send(9)).unwrap(); | |
}) | |
.run(); | |
} | |
#[test] | |
fn try_send() { | |
let (s, r) = unbounded(); | |
for i in 0..1000 { | |
assert_eq!(s.try_send(i), Ok(())); | |
} | |
drop(r); | |
assert_eq!(s.try_send(777), Err(TrySendError::Closed(777))); | |
} | |
#[test] | |
fn send() { | |
let (s, r) = unbounded(); | |
for i in 0..1000 { | |
assert_eq!(future::block_on(s.send(i)), Ok(())); | |
} | |
drop(r); | |
assert_eq!(future::block_on(s.send(777)), Err(SendError(777))); | |
} | |
#[test] | |
fn send_after_close() { | |
let (s, r) = unbounded(); | |
future::block_on(s.send(1)).unwrap(); | |
future::block_on(s.send(2)).unwrap(); | |
future::block_on(s.send(3)).unwrap(); | |
drop(r); | |
assert_eq!(future::block_on(s.send(4)), Err(SendError(4))); | |
assert_eq!(s.try_send(5), Err(TrySendError::Closed(5))); | |
} | |
#[test] | |
fn recv_after_close() { | |
let (s, r) = unbounded(); | |
future::block_on(s.send(1)).unwrap(); | |
future::block_on(s.send(2)).unwrap(); | |
future::block_on(s.send(3)).unwrap(); | |
drop(s); | |
assert_eq!(future::block_on(r.recv()), Ok(1)); | |
assert_eq!(future::block_on(r.recv()), Ok(2)); | |
assert_eq!(future::block_on(r.recv()), Ok(3)); | |
assert_eq!(future::block_on(r.recv()), Err(RecvError)); | |
} | |
#[test] | |
fn len() { | |
let (s, r) = unbounded(); | |
assert_eq!(s.len(), 0); | |
assert_eq!(r.len(), 0); | |
for i in 0..50 { | |
future::block_on(s.send(i)).unwrap(); | |
assert_eq!(s.len(), i + 1); | |
} | |
for i in 0..50 { | |
future::block_on(r.recv()).unwrap(); | |
assert_eq!(r.len(), 50 - i - 1); | |
} | |
assert_eq!(s.len(), 0); | |
assert_eq!(r.len(), 0); | |
} | |
#[test] | |
fn receiver_count() { | |
let (s, r) = unbounded::<()>(); | |
let receiver_clones: Vec<_> = (0..20).map(|_| r.clone()).collect(); | |
assert_eq!(s.receiver_count(), 21); | |
assert_eq!(r.receiver_count(), 21); | |
drop(receiver_clones); | |
assert_eq!(s.receiver_count(), 1); | |
assert_eq!(r.receiver_count(), 1); | |
} | |
#[test] | |
fn sender_count() { | |
let (s, r) = unbounded::<()>(); | |
let sender_clones: Vec<_> = (0..20).map(|_| s.clone()).collect(); | |
assert_eq!(s.sender_count(), 21); | |
assert_eq!(r.sender_count(), 21); | |
drop(sender_clones); | |
assert_eq!(s.receiver_count(), 1); | |
assert_eq!(r.receiver_count(), 1); | |
} | |
#[test] | |
fn close_wakes_receiver() { | |
let (s, r) = unbounded::<()>(); | |
Parallel::new() | |
.add(move || { | |
assert_eq!(future::block_on(r.recv()), Err(RecvError)); | |
}) | |
.add(move || { | |
sleep(ms(1000)); | |
drop(s); | |
}) | |
.run(); | |
} | |
#[test] | |
fn spsc() { | |
const COUNT: usize = 100_000; | |
let (s, r) = unbounded(); | |
Parallel::new() | |
.add(move || { | |
for i in 0..COUNT { | |
assert_eq!(future::block_on(r.recv()), Ok(i)); | |
} | |
assert_eq!(future::block_on(r.recv()), Err(RecvError)); | |
}) | |
.add(move || { | |
for i in 0..COUNT { | |
future::block_on(s.send(i)).unwrap(); | |
} | |
}) | |
.run(); | |
} | |
#[test] | |
fn mpmc() { | |
const COUNT: usize = 25_000; | |
const THREADS: usize = 4; | |
let (s, r) = unbounded::<usize>(); | |
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); | |
Parallel::new() | |
.each(0..THREADS, |_| { | |
for _ in 0..COUNT { | |
let n = future::block_on(r.recv()).unwrap(); | |
v[n].fetch_add(1, Ordering::SeqCst); | |
} | |
}) | |
.each(0..THREADS, |_| { | |
for i in 0..COUNT { | |
future::block_on(s.send(i)).unwrap(); | |
} | |
}) | |
.run(); | |
assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
for c in v { | |
assert_eq!(c.load(Ordering::SeqCst), THREADS); | |
} | |
} | |
#[test] | |
fn mpmc_stream() { | |
const COUNT: usize = 25_000; | |
const THREADS: usize = 4; | |
let (s, r) = unbounded::<usize>(); | |
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); | |
let v = &v; | |
Parallel::new() | |
.each(0..THREADS, { | |
let mut r = r.clone(); | |
move |_| { | |
for _ in 0..COUNT { | |
let n = future::block_on(r.next()).unwrap(); | |
v[n].fetch_add(1, Ordering::SeqCst); | |
} | |
} | |
}) | |
.each(0..THREADS, |_| { | |
for i in 0..COUNT { | |
future::block_on(s.send(i)).unwrap(); | |
} | |
}) | |
.run(); | |
assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
for c in v { | |
assert_eq!(c.load(Ordering::SeqCst), THREADS); | |
} | |
} |