blob: 55b0ca5ac2a9cbd0bf09126b929000e8b02bc122 [file] [log] [blame]
extern crate futures;
use futures::prelude::*;
use futures::future;
use futures::unsync::oneshot::{channel, Canceled, spawn};
mod support;
use support::local_executor;
#[test]
fn smoke() {
let (tx, rx) = channel();
tx.send(33).unwrap();
assert_eq!(rx.wait().unwrap(), 33);
}
#[test]
fn canceled() {
let (_, rx) = channel::<()>();
assert_eq!(rx.wait().unwrap_err(), Canceled);
}
#[test]
fn poll_cancel() {
let (mut tx, _) = channel::<()>();
assert!(tx.poll_cancel().unwrap().is_ready());
}
#[test]
fn tx_complete_rx_unparked() {
let (tx, rx) = channel();
let res = rx.join(future::lazy(move || {
tx.send(55).unwrap();
Ok(11)
}));
assert_eq!(res.wait().unwrap(), (55, 11));
}
#[test]
fn tx_dropped_rx_unparked() {
let (tx, rx) = channel::<i32>();
let res = rx.join(future::lazy(move || {
let _tx = tx;
Ok(11)
}));
assert_eq!(res.wait().unwrap_err(), Canceled);
}
#[test]
fn is_canceled() {
let (tx, rx) = channel::<u32>();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
}
#[test]
fn spawn_sends_items() {
let core = local_executor::Core::new();
let future = future::ok::<_, ()>(1);
let rx = spawn(future, &core);
assert_eq!(core.run(rx).unwrap(), 1);
}
#[test]
fn spawn_kill_dead_stream() {
use std::thread;
use std::time::Duration;
use futures::future::Either;
use futures::sync::oneshot;
// a future which never returns anything (forever accepting incoming
// connections), but dropping it leads to observable side effects
// (like closing listening sockets, releasing limited resources,
// ...)
#[derive(Debug)]
struct Dead {
// when dropped you should get Err(oneshot::Canceled) on the
// receiving end
done: oneshot::Sender<()>,
}
impl Future for Dead {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::NotReady)
}
}
// need to implement a timeout for the test, as it would hang
// forever right now
let (timeout_tx, timeout_rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let _ = timeout_tx.send(());
});
let core = local_executor::Core::new();
let (done_tx, done_rx) = oneshot::channel();
let future = Dead{done: done_tx};
let rx = spawn(future, &core);
let res = core.run(
Ok::<_, ()>(())
.into_future()
.then(move |_| {
// now drop the spawned future: maybe some timeout exceeded,
// or some connection on this end was closed by the remote
// end.
drop(rx);
// and wait for the spawned future to release its resources
done_rx
})
.select2(timeout_rx)
);
match res {
Err(Either::A((oneshot::Canceled, _))) => (),
Ok(Either::B(((), _))) => {
panic!("dead future wasn't canceled (timeout)");
},
_ => {
panic!("dead future wasn't canceled (unexpected result)");
},
}
}
#[test]
fn spawn_dont_kill_forgot_dead_stream() {
use std::thread;
use std::time::Duration;
use futures::future::Either;
use futures::sync::oneshot;
// a future which never returns anything (forever accepting incoming
// connections), but dropping it leads to observable side effects
// (like closing listening sockets, releasing limited resources,
// ...)
#[derive(Debug)]
struct Dead {
// when dropped you should get Err(oneshot::Canceled) on the
// receiving end
done: oneshot::Sender<()>,
}
impl Future for Dead {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::NotReady)
}
}
// need to implement a timeout for the test, as it would hang
// forever right now
let (timeout_tx, timeout_rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let _ = timeout_tx.send(());
});
let core = local_executor::Core::new();
let (done_tx, done_rx) = oneshot::channel();
let future = Dead{done: done_tx};
let rx = spawn(future, &core);
let res = core.run(
Ok::<_, ()>(())
.into_future()
.then(move |_| {
// forget the spawned future: should keep running, i.e. hit
// the timeout below.
rx.forget();
// and wait for the spawned future to release its resources
done_rx
})
.select2(timeout_rx)
);
match res {
Err(Either::A((oneshot::Canceled, _))) => {
panic!("forgotten dead future was canceled");
},
Ok(Either::B(((), _))) => (), // reached timeout
_ => {
panic!("forgotten dead future was canceled (unexpected result)");
},
}
}