blob: 13e526d48e8a0390553cacd263de77edf64d4350 [file] [log] [blame]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::sync::oneshot;
use tokio_test::*;
use std::future::Future;
use std::pin::Pin;
trait AssertSend: Send {}
impl AssertSend for oneshot::Sender<i32> {}
impl AssertSend for oneshot::Receiver<i32> {}
#[test]
fn send_recv() {
let (tx, rx) = oneshot::channel();
let mut rx = task::spawn(rx);
assert_pending!(rx.poll());
assert_ok!(tx.send(1));
assert!(rx.is_woken());
let val = assert_ready_ok!(rx.poll());
assert_eq!(val, 1);
}
#[tokio::test]
async fn async_send_recv() {
let (tx, rx) = oneshot::channel();
assert_ok!(tx.send(1));
assert_eq!(1, assert_ok!(rx.await));
}
#[test]
fn close_tx() {
let (tx, rx) = oneshot::channel::<i32>();
let mut rx = task::spawn(rx);
assert_pending!(rx.poll());
drop(tx);
assert!(rx.is_woken());
assert_ready_err!(rx.poll());
}
#[test]
fn close_rx() {
// First, without checking poll_closed()
//
let (tx, _) = oneshot::channel();
assert_err!(tx.send(1));
// Second, via poll_closed();
let (tx, rx) = oneshot::channel();
let mut tx = task::spawn(tx);
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
drop(rx);
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(tx.into_inner().send(1));
}
#[tokio::test]
async fn async_rx_closed() {
let (mut tx, rx) = oneshot::channel::<()>();
tokio::spawn(async move {
drop(rx);
});
tx.closed().await;
}
#[test]
fn explicit_close_poll() {
// First, with message sent
let (tx, rx) = oneshot::channel();
let mut rx = task::spawn(rx);
assert_ok!(tx.send(1));
rx.close();
let value = assert_ready_ok!(rx.poll());
assert_eq!(value, 1);
// Second, without the message sent
let (tx, rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(tx.into_inner().send(1));
assert_ready_err!(rx.poll());
// Again, but without sending the value this time
let (tx, rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
let mut rx = task::spawn(rx);
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_ready_err!(rx.poll());
}
#[test]
fn explicit_close_try_recv() {
// First, with message sent
let (tx, mut rx) = oneshot::channel();
assert_ok!(tx.send(1));
rx.close();
let val = assert_ok!(rx.try_recv());
assert_eq!(1, val);
// Second, without the message sent
let (tx, mut rx) = oneshot::channel::<i32>();
let mut tx = task::spawn(tx);
assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
rx.close();
assert!(tx.is_woken());
assert!(tx.is_closed());
assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
assert_err!(rx.try_recv());
}
#[test]
#[should_panic]
fn close_try_recv_poll() {
let (_tx, rx) = oneshot::channel::<i32>();
let mut rx = task::spawn(rx);
rx.close();
assert_err!(rx.try_recv());
let _ = rx.poll();
}
#[test]
fn drops_tasks() {
let (mut tx, mut rx) = oneshot::channel::<i32>();
let mut tx_task = task::spawn(());
let mut rx_task = task::spawn(());
assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
drop(tx);
drop(rx);
assert_eq!(1, tx_task.waker_ref_count());
assert_eq!(1, rx_task.waker_ref_count());
}
#[test]
fn receiver_changes_task() {
let (tx, mut rx) = oneshot::channel();
let mut task1 = task::spawn(());
let mut task2 = task::spawn(());
assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
assert_eq!(2, task1.waker_ref_count());
assert_eq!(1, task2.waker_ref_count());
assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
assert_eq!(1, task1.waker_ref_count());
assert_eq!(2, task2.waker_ref_count());
assert_ok!(tx.send(1));
assert!(!task1.is_woken());
assert!(task2.is_woken());
assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
}
#[test]
fn sender_changes_task() {
let (mut tx, rx) = oneshot::channel::<i32>();
let mut task1 = task::spawn(());
let mut task2 = task::spawn(());
assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
assert_eq!(2, task1.waker_ref_count());
assert_eq!(1, task2.waker_ref_count());
assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
assert_eq!(1, task1.waker_ref_count());
assert_eq!(2, task2.waker_ref_count());
drop(rx);
assert!(!task1.is_woken());
assert!(task2.is_woken());
assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
}