blob: d2260c8e6f252ca2251ca4c8bc13b1f3ed1aae97 [file] [log] [blame]
use std::mem;
use futures::{Async, Future, Poll, Stream};
use futures::future::Shared;
use futures::sync::{mpsc, oneshot};
use super::Never;
pub fn channel() -> (Signal, Watch) {
let (tx, rx) = oneshot::channel();
let (drained_tx, drained_rx) = mpsc::channel(0);
(
Signal {
drained_rx,
tx,
},
Watch {
drained_tx,
rx: rx.shared(),
},
)
}
pub struct Signal {
drained_rx: mpsc::Receiver<Never>,
tx: oneshot::Sender<()>,
}
pub struct Draining {
drained_rx: mpsc::Receiver<Never>,
}
#[derive(Clone)]
pub struct Watch {
drained_tx: mpsc::Sender<Never>,
rx: Shared<oneshot::Receiver<()>>,
}
#[allow(missing_debug_implementations)]
pub struct Watching<F, FN> {
future: F,
state: State<FN>,
watch: Watch,
}
enum State<F> {
Watch(F),
Draining,
}
impl Signal {
pub fn drain(self) -> Draining {
let _ = self.tx.send(());
Draining {
drained_rx: self.drained_rx,
}
}
}
impl Future for Draining {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.drained_rx.poll()) {
Some(never) => match never {},
None => Ok(Async::Ready(())),
}
}
}
impl Watch {
pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
where
F: Future,
FN: FnOnce(&mut F),
{
Watching {
future,
state: State::Watch(on_drain),
watch: self,
}
}
}
impl<F, FN> Future for Watching<F, FN>
where
F: Future,
FN: FnOnce(&mut F),
{
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Draining) {
State::Watch(on_drain) => {
match self.watch.rx.poll() {
Ok(Async::Ready(_)) | Err(_) => {
// Drain has been triggered!
on_drain(&mut self.future);
},
Ok(Async::NotReady) => {
self.state = State::Watch(on_drain);
return self.future.poll();
},
}
},
State::Draining => {
return self.future.poll();
},
}
}
}
}
#[cfg(test)]
mod tests {
use futures::{future, Async, Future, Poll};
use super::*;
struct TestMe {
draining: bool,
finished: bool,
poll_cnt: usize,
}
impl Future for TestMe {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_cnt += 1;
if self.finished {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
#[test]
fn watch() {
future::lazy(|| {
let (tx, rx) = channel();
let fut = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let mut watch = rx.watch(fut, |fut| {
fut.draining = true;
});
assert_eq!(watch.future.poll_cnt, 0);
// First poll should poll the inner future
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 1);
// Second poll should poll the inner future again
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 2);
let mut draining = tx.drain();
// Drain signaled, but needs another poll to be noticed.
assert!(!watch.future.draining);
assert_eq!(watch.future.poll_cnt, 2);
// Now, poll after drain has been signaled.
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 3);
assert!(watch.future.draining);
// Draining is not ready until watcher completes
assert!(draining.poll().unwrap().is_not_ready());
// Finishing up the watch future
watch.future.finished = true;
assert!(watch.poll().unwrap().is_ready());
assert_eq!(watch.future.poll_cnt, 4);
drop(watch);
assert!(draining.poll().unwrap().is_ready());
Ok::<_, ()>(())
}).wait().unwrap();
}
#[test]
fn watch_clones() {
future::lazy(|| {
let (tx, rx) = channel();
let fut1 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let fut2 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let watch1 = rx.clone().watch(fut1, |fut| {
fut.draining = true;
});
let watch2 = rx.watch(fut2, |fut| {
fut.draining = true;
});
let mut draining = tx.drain();
// Still 2 outstanding watchers
assert!(draining.poll().unwrap().is_not_ready());
// drop 1 for whatever reason
drop(watch1);
// Still not ready, 1 other watcher still pending
assert!(draining.poll().unwrap().is_not_ready());
drop(watch2);
// Now all watchers are gone, draining is complete
assert!(draining.poll().unwrap().is_ready());
Ok::<_, ()>(())
}).wait().unwrap();
}
}