blob: b18d66d7cd039b8ffbe443c7838225b10760c38d [file] [log] [blame]
//! Tests for channel selection using the `Select` struct.
extern crate crossbeam_channel;
extern crate crossbeam_utils;
use std::any::Any;
use std::cell::Cell;
use std::thread;
use std::time::{Duration, Instant};
use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
use crossbeam_utils::thread::scope;
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn smoke1() {
let (s1, r1) = unbounded::<usize>();
let (s2, r2) = unbounded::<usize>();
s1.send(1).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select();
match oper.index() {
i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
i if i == oper2 => panic!(),
_ => unreachable!(),
}
s2.send(2).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select();
match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
_ => unreachable!(),
}
}
#[test]
fn smoke2() {
let (_s1, r1) = unbounded::<i32>();
let (_s2, r2) = unbounded::<i32>();
let (_s3, r3) = unbounded::<i32>();
let (_s4, r4) = unbounded::<i32>();
let (s5, r5) = unbounded::<i32>();
s5.send(5).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper3 = sel.recv(&r3);
let oper4 = sel.recv(&r4);
let oper5 = sel.recv(&r5);
let oper = sel.select();
match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => panic!(),
i if i == oper3 => panic!(),
i if i == oper4 => panic!(),
i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
_ => unreachable!(),
}
}
#[test]
fn disconnected() {
let (s1, r1) = unbounded::<i32>();
let (s2, r2) = unbounded::<i32>();
scope(|scope| {
scope.spawn(|_| {
drop(s1);
thread::sleep(ms(500));
s2.send(5).unwrap();
});
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r1).is_err()),
i if i == oper2 => panic!(),
_ => unreachable!(),
},
}
r2.recv().unwrap();
})
.unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r1).is_err()),
i if i == oper2 => panic!(),
_ => unreachable!(),
},
}
scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(500));
drop(s2);
});
let mut sel = Select::new();
let oper1 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r2).is_err()),
_ => unreachable!(),
},
}
})
.unwrap();
}
#[test]
fn default() {
let (s1, r1) = unbounded::<i32>();
let (s2, r2) = unbounded::<i32>();
let mut sel = Select::new();
let _oper1 = sel.recv(&r1);
let _oper2 = sel.recv(&r2);
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(_) => panic!(),
}
drop(s1);
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.try_select();
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r1).is_err()),
i if i == oper2 => panic!(),
_ => unreachable!(),
},
}
s2.send(2).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r2);
let oper = sel.try_select();
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
_ => unreachable!(),
},
}
let mut sel = Select::new();
let _oper1 = sel.recv(&r2);
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(_) => panic!(),
}
let mut sel = Select::new();
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(_) => panic!(),
}
}
#[test]
fn timeout() {
let (_s1, r1) = unbounded::<i32>();
let (s2, r2) = unbounded::<i32>();
scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(1500));
s2.send(2).unwrap();
});
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => panic!(),
_ => unreachable!(),
},
}
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
_ => unreachable!(),
},
}
})
.unwrap();
scope(|scope| {
let (s, r) = unbounded::<i32>();
scope.spawn(move |_| {
thread::sleep(ms(500));
drop(s);
});
let mut sel = Select::new();
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.try_select();
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r).is_err()),
_ => unreachable!(),
},
}
}
Ok(_) => unreachable!(),
}
})
.unwrap();
}
#[test]
fn default_when_disconnected() {
let (_, r) = unbounded::<i32>();
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.try_select();
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r).is_err()),
_ => unreachable!(),
},
}
let (_, r) = unbounded::<i32>();
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.recv(&r).is_err()),
_ => unreachable!(),
},
}
let (s, _) = bounded::<i32>(0);
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.try_select();
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
_ => unreachable!(),
},
}
let (s, _) = bounded::<i32>(0);
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
_ => unreachable!(),
},
}
}
#[test]
fn default_only() {
let start = Instant::now();
let mut sel = Select::new();
let oper = sel.try_select();
assert!(oper.is_err());
let now = Instant::now();
assert!(now - start <= ms(50));
let start = Instant::now();
let mut sel = Select::new();
let oper = sel.select_timeout(ms(500));
assert!(oper.is_err());
let now = Instant::now();
assert!(now - start >= ms(450));
assert!(now - start <= ms(550));
}
#[test]
fn unblocks() {
let (s1, r1) = bounded::<i32>(0);
let (s2, r2) = bounded::<i32>(0);
scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(500));
s2.send(2).unwrap();
});
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
_ => unreachable!(),
},
}
})
.unwrap();
scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(500));
assert_eq!(r1.recv().unwrap(), 1);
});
let mut sel = Select::new();
let oper1 = sel.send(&s1);
let oper2 = sel.send(&s2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
i if i == oper1 => oper.send(&s1, 1).unwrap(),
i if i == oper2 => panic!(),
_ => unreachable!(),
},
}
})
.unwrap();
}
#[test]
fn both_ready() {
let (s1, r1) = bounded(0);
let (s2, r2) = bounded(0);
scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(500));
s1.send(1).unwrap();
assert_eq!(r2.recv().unwrap(), 2);
});
for _ in 0..2 {
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.send(&s2);
let oper = sel.select();
match oper.index() {
i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
i if i == oper2 => oper.send(&s2, 2).unwrap(),
_ => unreachable!(),
}
}
})
.unwrap();
}
#[test]
fn loop_try() {
const RUNS: usize = 20;
for _ in 0..RUNS {
let (s1, r1) = bounded::<i32>(0);
let (s2, r2) = bounded::<i32>(0);
let (s_end, r_end) = bounded::<()>(0);
scope(|scope| {
scope.spawn(|_| loop {
let mut done = false;
let mut sel = Select::new();
let oper1 = sel.send(&s1);
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
i if i == oper1 => {
let _ = oper.send(&s1, 1);
done = true;
}
_ => unreachable!(),
},
}
if done {
break;
}
let mut sel = Select::new();
let oper1 = sel.recv(&r_end);
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
i if i == oper1 => {
let _ = oper.recv(&r_end);
done = true;
}
_ => unreachable!(),
},
}
if done {
break;
}
});
scope.spawn(|_| loop {
if let Ok(x) = r2.try_recv() {
assert_eq!(x, 2);
break;
}
let mut done = false;
let mut sel = Select::new();
let oper1 = sel.recv(&r_end);
let oper = sel.try_select();
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
i if i == oper1 => {
let _ = oper.recv(&r_end);
done = true;
}
_ => unreachable!(),
},
}
if done {
break;
}
});
scope.spawn(|_| {
thread::sleep(ms(500));
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.send(&s2);
let oper = sel.select_timeout(ms(1000));
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
_ => unreachable!(),
},
}
drop(s_end);
});
})
.unwrap();
}
}
#[test]
fn cloning1() {
scope(|scope| {
let (s1, r1) = unbounded::<i32>();
let (_s2, r2) = unbounded::<i32>();
let (s3, r3) = unbounded::<()>();
scope.spawn(move |_| {
r3.recv().unwrap();
drop(s1.clone());
assert!(r3.try_recv().is_err());
s1.send(1).unwrap();
r3.recv().unwrap();
});
s3.send(()).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select();
match oper.index() {
i if i == oper1 => drop(oper.recv(&r1)),
i if i == oper2 => drop(oper.recv(&r2)),
_ => unreachable!(),
}
s3.send(()).unwrap();
})
.unwrap();
}
#[test]
fn cloning2() {
let (s1, r1) = unbounded::<()>();
let (s2, r2) = unbounded::<()>();
let (_s3, _r3) = unbounded::<()>();
scope(|scope| {
scope.spawn(move |_| {
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select();
match oper.index() {
i if i == oper1 => panic!(),
i if i == oper2 => drop(oper.recv(&r2)),
_ => unreachable!(),
}
});
thread::sleep(ms(500));
drop(s1.clone());
s2.send(()).unwrap();
})
.unwrap();
}
#[test]
fn preflight1() {
let (s, r) = unbounded();
s.send(()).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
i if i == oper1 => drop(oper.recv(&r)),
_ => unreachable!(),
}
}
#[test]
fn preflight2() {
let (s, r) = unbounded();
drop(s.clone());
s.send(()).unwrap();
drop(s);
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
_ => unreachable!(),
}
assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
}
#[test]
fn preflight3() {
let (s, r) = unbounded();
drop(s.clone());
s.send(()).unwrap();
drop(s);
r.recv().unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
i if i == oper1 => assert!(oper.recv(&r).is_err()),
_ => unreachable!(),
}
}
#[test]
fn duplicate_operations() {
let (s, r) = unbounded::<i32>();
let hit = vec![Cell::new(false); 4];
while hit.iter().map(|h| h.get()).any(|hit| !hit) {
let mut sel = Select::new();
let oper0 = sel.recv(&r);
let oper1 = sel.recv(&r);
let oper2 = sel.send(&s);
let oper3 = sel.send(&s);
let oper = sel.select();
match oper.index() {
i if i == oper0 => {
assert!(oper.recv(&r).is_ok());
hit[0].set(true);
}
i if i == oper1 => {
assert!(oper.recv(&r).is_ok());
hit[1].set(true);
}
i if i == oper2 => {
assert!(oper.send(&s, 0).is_ok());
hit[2].set(true);
}
i if i == oper3 => {
assert!(oper.send(&s, 0).is_ok());
hit[3].set(true);
}
_ => unreachable!(),
}
}
}
#[test]
fn nesting() {
let (s, r) = unbounded::<i32>();
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
assert!(oper.send(&s, 0).is_ok());
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
assert_eq!(oper.recv(&r), Ok(0));
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
assert!(oper.send(&s, 1).is_ok());
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
assert_eq!(oper.recv(&r), Ok(1));
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[test]
fn stress_recv() {
const COUNT: usize = 10_000;
let (s1, r1) = unbounded();
let (s2, r2) = bounded(5);
let (s3, r3) = bounded(100);
scope(|scope| {
scope.spawn(|_| {
for i in 0..COUNT {
s1.send(i).unwrap();
r3.recv().unwrap();
s2.send(i).unwrap();
r3.recv().unwrap();
}
});
for i in 0..COUNT {
for _ in 0..2 {
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
_ => unreachable!(),
}
s3.send(()).unwrap();
}
}
})
.unwrap();
}
#[test]
fn stress_send() {
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
let (s2, r2) = bounded(0);
let (s3, r3) = bounded(100);
scope(|scope| {
scope.spawn(|_| {
for i in 0..COUNT {
assert_eq!(r1.recv().unwrap(), i);
assert_eq!(r2.recv().unwrap(), i);
r3.recv().unwrap();
}
});
for i in 0..COUNT {
for _ in 0..2 {
let mut sel = Select::new();
let oper1 = sel.send(&s1);
let oper2 = sel.send(&s2);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
_ => unreachable!(),
}
}
s3.send(()).unwrap();
}
})
.unwrap();
}
#[test]
fn stress_mixed() {
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
let (s2, r2) = bounded(0);
let (s3, r3) = bounded(100);
scope(|scope| {
scope.spawn(|_| {
for i in 0..COUNT {
s1.send(i).unwrap();
assert_eq!(r2.recv().unwrap(), i);
r3.recv().unwrap();
}
});
for i in 0..COUNT {
for _ in 0..2 {
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.send(&s2);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
_ => unreachable!(),
}
}
s3.send(()).unwrap();
}
})
.unwrap();
}
#[test]
fn stress_timeout_two_threads() {
const COUNT: usize = 20;
let (s, r) = bounded(2);
scope(|scope| {
scope.spawn(|_| {
for i in 0..COUNT {
if i % 2 == 0 {
thread::sleep(ms(500));
}
let done = false;
while !done {
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select_timeout(ms(100));
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
ix if ix == oper1 => {
assert!(oper.send(&s, i).is_ok());
break;
}
_ => unreachable!(),
},
}
}
}
});
scope.spawn(|_| {
for i in 0..COUNT {
if i % 2 == 0 {
thread::sleep(ms(500));
}
let mut done = false;
while !done {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select_timeout(ms(100));
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
ix if ix == oper1 => {
assert_eq!(oper.recv(&r), Ok(i));
done = true;
}
_ => unreachable!(),
},
}
}
}
});
})
.unwrap();
}
#[test]
fn send_recv_same_channel() {
let (s, r) = bounded::<i32>(0);
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper2 = sel.recv(&r);
let oper = sel.select_timeout(ms(100));
match oper {
Err(_) => {}
Ok(oper) => match oper.index() {
ix if ix == oper1 => panic!(),
ix if ix == oper2 => panic!(),
_ => unreachable!(),
},
}
let (s, r) = unbounded::<i32>();
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper2 = sel.recv(&r);
let oper = sel.select_timeout(ms(100));
match oper {
Err(_) => panic!(),
Ok(oper) => match oper.index() {
ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
ix if ix == oper2 => panic!(),
_ => unreachable!(),
},
}
}
#[test]
fn matching() {
const THREADS: usize = 44;
let (s, r) = &bounded::<usize>(0);
scope(|scope| {
for i in 0..THREADS {
scope.spawn(move |_| {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper2 = sel.send(&s);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
_ => unreachable!(),
}
});
}
})
.unwrap();
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn matching_with_leftover() {
const THREADS: usize = 55;
let (s, r) = &bounded::<usize>(0);
scope(|scope| {
for i in 0..THREADS {
scope.spawn(move |_| {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper2 = sel.send(&s);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
_ => unreachable!(),
}
});
}
s.send(!0).unwrap();
})
.unwrap();
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn channel_through_channel() {
const COUNT: usize = 1000;
type T = Box<dyn Any + Send>;
for cap in 0..3 {
let (s, r) = bounded::<T>(cap);
scope(|scope| {
scope.spawn(move |_| {
let mut s = s;
for _ in 0..COUNT {
let (new_s, new_r) = bounded(cap);
let new_r: T = Box::new(Some(new_r));
{
let mut sel = Select::new();
let oper1 = sel.send(&s);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
_ => unreachable!(),
}
}
s = new_s;
}
});
scope.spawn(move |_| {
let mut r = r;
for _ in 0..COUNT {
let new = {
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => oper
.recv(&r)
.unwrap()
.downcast_mut::<Option<Receiver<T>>>()
.unwrap()
.take()
.unwrap(),
_ => unreachable!(),
}
};
r = new;
}
});
})
.unwrap();
}
}
#[test]
fn linearizable_try() {
const COUNT: usize = 100_000;
for step in 0..2 {
let (start_s, start_r) = bounded::<()>(0);
let (end_s, end_r) = bounded::<()>(0);
let ((s1, r1), (s2, r2)) = if step == 0 {
(bounded::<i32>(1), bounded::<i32>(1))
} else {
(unbounded::<i32>(), unbounded::<i32>())
};
scope(|scope| {
scope.spawn(|_| {
for _ in 0..COUNT {
start_s.send(()).unwrap();
s1.send(1).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.try_select();
match oper {
Err(_) => unreachable!(),
Ok(oper) => match oper.index() {
ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
_ => unreachable!(),
},
}
end_s.send(()).unwrap();
let _ = r2.try_recv();
}
});
for _ in 0..COUNT {
start_r.recv().unwrap();
s2.send(1).unwrap();
let _ = r1.try_recv();
end_r.recv().unwrap();
}
})
.unwrap();
}
}
#[test]
fn linearizable_timeout() {
const COUNT: usize = 100_000;
for step in 0..2 {
let (start_s, start_r) = bounded::<()>(0);
let (end_s, end_r) = bounded::<()>(0);
let ((s1, r1), (s2, r2)) = if step == 0 {
(bounded::<i32>(1), bounded::<i32>(1))
} else {
(unbounded::<i32>(), unbounded::<i32>())
};
scope(|scope| {
scope.spawn(|_| {
for _ in 0..COUNT {
start_s.send(()).unwrap();
s1.send(1).unwrap();
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper = sel.select_timeout(ms(0));
match oper {
Err(_) => unreachable!(),
Ok(oper) => match oper.index() {
ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
_ => unreachable!(),
},
}
end_s.send(()).unwrap();
let _ = r2.try_recv();
}
});
for _ in 0..COUNT {
start_r.recv().unwrap();
s2.send(1).unwrap();
let _ = r1.try_recv();
end_r.recv().unwrap();
}
})
.unwrap();
}
}
#[test]
fn fairness1() {
const COUNT: usize = 10_000;
let (s1, r1) = bounded::<()>(COUNT);
let (s2, r2) = unbounded::<()>();
for _ in 0..COUNT {
s1.send(()).unwrap();
s2.send(()).unwrap();
}
let hits = vec![Cell::new(0usize); 4];
for _ in 0..COUNT {
let after = after(ms(0));
let tick = tick(ms(0));
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper3 = sel.recv(&after);
let oper4 = sel.recv(&tick);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
oper.recv(&r1).unwrap();
hits[0].set(hits[0].get() + 1);
}
i if i == oper2 => {
oper.recv(&r2).unwrap();
hits[1].set(hits[1].get() + 1);
}
i if i == oper3 => {
oper.recv(&after).unwrap();
hits[2].set(hits[2].get() + 1);
}
i if i == oper4 => {
oper.recv(&tick).unwrap();
hits[3].set(hits[3].get() + 1);
}
_ => unreachable!(),
}
}
assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
}
#[test]
fn fairness2() {
const COUNT: usize = 10_000;
let (s1, r1) = unbounded::<()>();
let (s2, r2) = bounded::<()>(1);
let (s3, r3) = bounded::<()>(0);
scope(|scope| {
scope.spawn(|_| {
for _ in 0..COUNT {
let mut sel = Select::new();
let mut oper1 = None;
let mut oper2 = None;
if s1.is_empty() {
oper1 = Some(sel.send(&s1));
}
if s2.is_empty() {
oper2 = Some(sel.send(&s2));
}
let oper3 = sel.send(&s3);
let oper = sel.select();
match oper.index() {
i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
_ => unreachable!(),
}
}
});
let hits = vec![Cell::new(0usize); 3];
for _ in 0..COUNT {
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.recv(&r2);
let oper3 = sel.recv(&r3);
let oper = sel.select();
match oper.index() {
i if i == oper1 => {
oper.recv(&r1).unwrap();
hits[0].set(hits[0].get() + 1);
}
i if i == oper2 => {
oper.recv(&r2).unwrap();
hits[1].set(hits[1].get() + 1);
}
i if i == oper3 => {
oper.recv(&r3).unwrap();
hits[2].set(hits[2].get() + 1);
}
_ => unreachable!(),
}
}
assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
})
.unwrap();
}
#[test]
fn sync_and_clone() {
const THREADS: usize = 20;
let (s, r) = &bounded::<usize>(0);
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper2 = sel.send(&s);
let sel = &sel;
scope(|scope| {
for i in 0..THREADS {
scope.spawn(move |_| {
let mut sel = sel.clone();
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
_ => unreachable!(),
}
});
}
})
.unwrap();
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn send_and_clone() {
const THREADS: usize = 20;
let (s, r) = &bounded::<usize>(0);
let mut sel = Select::new();
let oper1 = sel.recv(&r);
let oper2 = sel.send(&s);
scope(|scope| {
for i in 0..THREADS {
let mut sel = sel.clone();
scope.spawn(move |_| {
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
_ => unreachable!(),
}
});
}
})
.unwrap();
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn reuse() {
const COUNT: usize = 10_000;
let (s1, r1) = bounded(0);
let (s2, r2) = bounded(0);
let (s3, r3) = bounded(100);
scope(|scope| {
scope.spawn(|_| {
for i in 0..COUNT {
s1.send(i).unwrap();
assert_eq!(r2.recv().unwrap(), i);
r3.recv().unwrap();
}
});
let mut sel = Select::new();
let oper1 = sel.recv(&r1);
let oper2 = sel.send(&s2);
for i in 0..COUNT {
for _ in 0..2 {
let oper = sel.select();
match oper.index() {
ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
_ => unreachable!(),
}
}
s3.send(()).unwrap();
}
})
.unwrap();
}