blob: 45c19968765256231ad63bdde5701b189389bb9d [file] [log] [blame]
extern crate futures;
use std::sync::mpsc;
use std::thread;
use futures::prelude::*;
use futures::future::{lazy, ok};
use futures::sync::oneshot::*;
mod support;
use support::*;
#[test]
fn smoke_poll() {
let (mut tx, rx) = channel::<u32>();
let mut task = futures::executor::spawn(lazy(|| {
assert!(tx.poll_cancel().unwrap().is_not_ready());
assert!(tx.poll_cancel().unwrap().is_not_ready());
drop(rx);
assert!(tx.poll_cancel().unwrap().is_ready());
assert!(tx.poll_cancel().unwrap().is_ready());
ok::<(), ()>(())
}));
assert!(task.poll_future_notify(&notify_noop(), 0).unwrap().is_ready());
}
#[test]
fn cancel_notifies() {
let (tx, rx) = channel::<u32>();
let (tx2, rx2) = mpsc::channel();
WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
drop(rx);
rx2.recv().unwrap().unwrap();
}
struct WaitForCancel {
tx: Sender<u32>,
}
impl Future for WaitForCancel {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.tx.poll_cancel()
}
}
#[test]
fn cancel_lots() {
let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
let t = thread::spawn(move || {
for (tx, tx2) in rx {
WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
}
});
for _ in 0..20000 {
let (otx, orx) = channel::<u32>();
let (tx2, rx2) = mpsc::channel();
tx.send((otx, tx2)).unwrap();
drop(orx);
rx2.recv().unwrap().unwrap();
}
drop(tx);
t.join().unwrap();
}
#[test]
fn close() {
let (mut tx, mut rx) = channel::<u32>();
rx.close();
assert!(rx.poll().is_err());
assert!(tx.poll_cancel().unwrap().is_ready());
}
#[test]
fn close_wakes() {
let (tx, mut rx) = channel::<u32>();
let (tx2, rx2) = mpsc::channel();
let t = thread::spawn(move || {
rx.close();
rx2.recv().unwrap();
});
WaitForCancel { tx: tx }.wait().unwrap();
tx2.send(()).unwrap();
t.join().unwrap();
}
#[test]
fn is_canceled() {
let (tx, rx) = channel::<u32>();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
}
#[test]
fn cancel_sends() {
let (tx, rx) = mpsc::channel::<Sender<_>>();
let t = thread::spawn(move || {
for otx in rx {
let _ = otx.send(42);
}
});
for _ in 0..20000 {
let (otx, mut orx) = channel::<u32>();
tx.send(otx).unwrap();
orx.close();
// Not necessary to wrap in a task because the implementation of oneshot
// never calls `task::current()` if the channel has been closed already.
let _ = orx.poll();
}
drop(tx);
t.join().unwrap();
}
#[test]
fn spawn_sends_items() {
let core = local_executor::Core::new();
let future = ok::<_, ()>(1);
let rx = spawn(future, &core);
assert_eq!(core.run(rx).unwrap(), 1);
}
#[test]
fn spawn_kill_dead_stream() {
use std::thread;
use std::time::Duration;
use futures::future::Either;
use futures::sync::oneshot;
// a future which never returns anything (forever accepting incoming
// connections), but dropping it leads to observable side effects
// (like closing listening sockets, releasing limited resources,
// ...)
#[derive(Debug)]
struct Dead {
// when dropped you should get Err(oneshot::Canceled) on the
// receiving end
done: oneshot::Sender<()>,
}
impl Future for Dead {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::NotReady)
}
}
// need to implement a timeout for the test, as it would hang
// forever right now
let (timeout_tx, timeout_rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let _ = timeout_tx.send(());
});
let core = local_executor::Core::new();
let (done_tx, done_rx) = oneshot::channel();
let future = Dead{done: done_tx};
let rx = spawn(future, &core);
let res = core.run(
Ok::<_, ()>(())
.into_future()
.then(move |_| {
// now drop the spawned future: maybe some timeout exceeded,
// or some connection on this end was closed by the remote
// end.
drop(rx);
// and wait for the spawned future to release its resources
done_rx
})
.select2(timeout_rx)
);
match res {
Err(Either::A((oneshot::Canceled, _))) => (),
Ok(Either::B(((), _))) => {
panic!("dead future wasn't canceled (timeout)");
},
_ => {
panic!("dead future wasn't canceled (unexpected result)");
},
}
}
#[test]
fn spawn_dont_kill_forgot_dead_stream() {
use std::thread;
use std::time::Duration;
use futures::future::Either;
use futures::sync::oneshot;
// a future which never returns anything (forever accepting incoming
// connections), but dropping it leads to observable side effects
// (like closing listening sockets, releasing limited resources,
// ...)
#[derive(Debug)]
struct Dead {
// when dropped you should get Err(oneshot::Canceled) on the
// receiving end
done: oneshot::Sender<()>,
}
impl Future for Dead {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::NotReady)
}
}
// need to implement a timeout for the test, as it would hang
// forever right now
let (timeout_tx, timeout_rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let _ = timeout_tx.send(());
});
let core = local_executor::Core::new();
let (done_tx, done_rx) = oneshot::channel();
let future = Dead{done: done_tx};
let rx = spawn(future, &core);
let res = core.run(
Ok::<_, ()>(())
.into_future()
.then(move |_| {
// forget the spawned future: should keep running, i.e. hit
// the timeout below.
rx.forget();
// and wait for the spawned future to release its resources
done_rx
})
.select2(timeout_rx)
);
match res {
Err(Either::A((oneshot::Canceled, _))) => {
panic!("forgotten dead future was canceled");
},
Ok(Either::B(((), _))) => (), // reached timeout
_ => {
panic!("forgotten dead future was canceled (unexpected result)");
},
}
}