blob: f02d90aa56d8d48ad8349c3b898d80ac970950e9 [file] [log] [blame]
#![allow(clippy::redundant_clone)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
use std::sync::Arc;
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}
#[test]
fn send_recv_with_buffer() {
let (tx, rx) = mpsc::channel::<i32>(16);
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
// Using poll_ready / try_send
assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
tx.try_send(1).unwrap();
// Without poll_ready
tx.try_send(2).unwrap();
drop(tx);
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
assert!(val.is_none());
}
#[test]
fn disarm() {
let (tx, rx) = mpsc::channel::<i32>(2);
let mut tx1 = task::spawn(tx.clone());
let mut tx2 = task::spawn(tx.clone());
let mut tx3 = task::spawn(tx.clone());
let mut tx4 = task::spawn(tx);
let mut rx = task::spawn(rx);
// We should be able to `poll_ready` two handles without problem
assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
// But a third should not be ready
assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
// Using one of the reserved slots should allow a new handle to become ready
tx1.try_send(1).unwrap();
// We also need to receive for the slot to be free
let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
// Now there's a free slot!
assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
// Dropping a ready handle should also open up a slot
drop(tx2);
assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
// Explicitly disarming a handle should also open a slot
assert!(tx3.disarm());
assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
// Disarming a non-armed sender does not free up a slot
assert!(!tx3.disarm());
assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
}
#[tokio::test]
async fn send_recv_stream_with_buffer() {
use tokio::stream::StreamExt;
let (mut tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
#[tokio::test]
async fn async_send_recv_with_buffer() {
let (mut tx, mut rx) = mpsc::channel(16);
tokio::spawn(async move {
assert_ok!(tx.send(1).await);
assert_ok!(tx.send(2).await);
});
assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
assert_eq!(None, rx.recv().await);
}
#[test]
fn start_send_past_cap() {
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
assert_ok!(tx1.try_send(()));
t1.enter(|cx, _| {
assert_pending!(tx1.poll_ready(cx));
});
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_some());
assert!(t2.is_woken());
assert!(!t1.is_woken());
drop(tx2);
let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
assert!(val.is_none());
}
#[test]
#[should_panic]
fn buffer_gteq_one() {
mpsc::channel::<i32>(0);
}
#[test]
fn send_recv_unbounded() {
let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// Using `try_send`
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(1));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some(2));
drop(tx);
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
#[tokio::test]
async fn async_send_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
});
assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
assert_eq!(None, rx.recv().await);
}
#[tokio::test]
async fn send_recv_stream_unbounded() {
use tokio::stream::StreamExt;
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
tokio::spawn(async move {
assert_ok!(tx.send(1));
assert_ok!(tx.send(2));
});
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
#[test]
fn no_t_bounds_buffer() {
struct NoImpls;
let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::channel(100);
// sender should be Debug even though T isn't Debug
println!("{:?}", tx);
// same with Receiver
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().try_send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
#[test]
fn no_t_bounds_unbounded() {
struct NoImpls;
let mut t1 = task::spawn(());
let (tx, mut rx) = mpsc::unbounded_channel();
// sender should be Debug even though T isn't Debug
println!("{:?}", tx);
// same with Receiver
println!("{:?}", rx);
// and sender should be Clone even though T isn't Clone
assert!(tx.clone().send(NoImpls).is_ok());
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_some());
}
#[test]
fn send_recv_buffer_limited() {
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(1);
// Run on a task context
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
// Send first message
assert_ok!(tx.try_send(1));
// Not ready
assert_pending!(tx.poll_ready(cx));
// Send second message
assert_err!(tx.try_send(1337));
});
t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(1), val);
});
assert!(t1.is_woken());
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
assert_ok!(tx.try_send(2));
// Not ready
assert_pending!(tx.poll_ready(cx));
});
t2.enter(|cx, _| {
// Take the value
let val = assert_ready!(rx.poll_recv(cx));
assert_eq!(Some(2), val);
});
t1.enter(|cx, _| {
assert_ready_ok!(tx.poll_ready(cx));
});
}
#[test]
fn recv_close_gets_none_idle() {
let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel::<i32>(10);
rx.close();
t1.enter(|cx, _| {
let val = assert_ready!(rx.poll_recv(cx));
assert!(val.is_none());
assert_ready_err!(tx.poll_ready(cx));
});
}
#[test]
fn recv_close_gets_none_reserved() {
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
let mut t3 = task::spawn(());
let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
rx.close();
assert!(t2.is_woken());
t2.enter(|cx, _| {
assert_ready_err!(tx2.poll_ready(cx));
});
t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
assert!(!t1.is_woken());
assert!(!t2.is_woken());
assert_ok!(tx1.try_send(123));
assert!(t3.is_woken());
t3.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert_eq!(v, Some(123));
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
#[test]
fn tx_close_gets_none() {
let mut t1 = task::spawn(());
let (_, mut rx) = mpsc::channel::<i32>(10);
// Run on a task context
t1.enter(|cx, _| {
let v = assert_ready!(rx.poll_recv(cx));
assert!(v.is_none());
});
}
#[test]
fn try_send_fail() {
let mut t1 = task::spawn(());
let (mut tx, mut rx) = mpsc::channel(1);
tx.try_send("hello").unwrap();
// This should fail
match assert_err!(tx.try_send("fail")) {
TrySendError::Full(..) => {}
_ => panic!(),
}
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("hello"));
assert_ok!(tx.try_send("goodbye"));
drop(tx);
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert_eq!(val, Some("goodbye"));
let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
assert!(val.is_none());
}
#[test]
fn drop_tx_with_permit_releases_permit() {
let mut t1 = task::spawn(());
let mut t2 = task::spawn(());
// poll_ready reserves capacity, ensure that the capacity is released if tx
// is dropped w/o sending a value.
let (mut tx1, _rx) = mpsc::channel::<i32>(1);
let mut tx2 = tx1.clone();
assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
t2.enter(|cx, _| {
assert_pending!(tx2.poll_ready(cx));
});
drop(tx1);
assert!(t2.is_woken());
assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
}
#[test]
fn dropping_rx_closes_channel() {
let mut t1 = task::spawn(());
let (mut tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
assert_ok!(tx.try_send(msg.clone()));
drop(rx);
assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
assert_eq!(1, Arc::strong_count(&msg));
}
#[test]
fn dropping_rx_closes_channel_for_try() {
let (mut tx, rx) = mpsc::channel(100);
let msg = Arc::new(());
tx.try_send(msg.clone()).unwrap();
drop(rx);
{
let err = assert_err!(tx.try_send(msg.clone()));
match err {
TrySendError::Closed(..) => {}
_ => panic!(),
}
}
assert_eq!(1, Arc::strong_count(&msg));
}
#[test]
fn unconsumed_messages_are_dropped() {
let msg = Arc::new(());
let (mut tx, rx) = mpsc::channel(100);
tx.try_send(msg.clone()).unwrap();
assert_eq!(2, Arc::strong_count(&msg));
drop((tx, rx));
assert_eq!(1, Arc::strong_count(&msg));
}
#[test]
fn try_recv() {
let (mut tx, mut rx) = mpsc::channel(1);
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
_ => panic!(),
}
tx.try_send(42).unwrap();
match rx.try_recv() {
Ok(42) => {}
_ => panic!(),
}
drop(tx);
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => panic!(),
}
}
#[test]
fn try_recv_unbounded() {
let (tx, mut rx) = mpsc::unbounded_channel();
match rx.try_recv() {
Err(TryRecvError::Empty) => {}
_ => panic!(),
}
tx.send(42).unwrap();
match rx.try_recv() {
Ok(42) => {}
_ => panic!(),
}
drop(tx);
match rx.try_recv() {
Err(TryRecvError::Closed) => {}
_ => panic!(),
}
}