blob: fb6fe48da8349bc16be8bb9ef73bf0244b953410 [file] [log] [blame]
#![doc(html_root_url = "https://docs.rs/want/0.0.6")]
#![deny(warnings)]
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
//! A Futures channel-like utility to signal when a value is wanted.
//!
//! Futures are supposed to be lazy, and only starting work if `Future::poll`
//! is called. The same is true of `Stream`s, but when using a channel as
//! a `Stream`, it can be hard to know if the receiver is ready for the next
//! value.
//!
//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
//! work to be produced? Just because there is room in the channel buffer
//! doesn't mean the work would be used by the receiver.
//!
//! This is where something like `want` comes in. Added to a channel, you can
//! make sure that the `tx` only creates the message and sends it when the `rx`
//! has `poll()` for it, and the buffer was empty.
extern crate futures;
#[macro_use]
extern crate log;
extern crate try_lock;
use std::fmt;
use std::mem;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
// SeqCst is the only ordering used to ensure accessing the state and
// TryLock are never re-ordered.
use std::sync::atomic::Ordering::SeqCst;
use futures::{Async, Poll};
use futures::task::{self, Task};
use try_lock::TryLock;
/// Create a new `want` channel.
pub fn new() -> (Giver, Taker) {
let inner = Arc::new(Inner {
state: AtomicUsize::new(State::Idle.into()),
task: TryLock::new(None),
});
let inner2 = inner.clone();
(
Giver {
inner: inner,
},
Taker {
inner: inner2,
},
)
}
/// An entity that gives a value when wanted.
pub struct Giver {
inner: Arc<Inner>,
}
/// An entity that wants a value.
pub struct Taker {
inner: Arc<Inner>,
}
/// A cloneable `Giver`.
///
/// It differs from `Giver` in that you cannot poll for `want`. It's only
/// usable as a cancellation watcher.
#[derive(Clone)]
pub struct SharedGiver {
inner: Arc<Inner>,
}
/// The `Taker` has canceled its interest in a value.
pub struct Closed {
_inner: (),
}
#[derive(Clone, Copy, Debug)]
enum State {
Idle,
Want,
Give,
Closed,
}
impl From<State> for usize {
fn from(s: State) -> usize {
match s {
State::Idle => 0,
State::Want => 1,
State::Give => 2,
State::Closed => 3,
}
}
}
impl From<usize> for State {
fn from(num: usize) -> State {
match num {
0 => State::Idle,
1 => State::Want,
2 => State::Give,
3 => State::Closed,
_ => unreachable!("unknown state: {}", num),
}
}
}
struct Inner {
state: AtomicUsize,
task: TryLock<Option<Task>>,
}
// ===== impl Giver ======
impl Giver {
/// Poll whether the `Taker` has registered interest in another value.
///
/// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
/// - If the `Taker` has not called `want()` since last poll, this
/// returns `Async::NotReady`, and parks the current task to be notified
/// when the `Taker` does call `want()`.
/// - If the `Taker` has canceled (or dropped), this returns `Closed`.
///
/// After knowing that the Taker is wanting, the state can be reset by
/// calling [`give`](Giver::give).
pub fn poll_want(&mut self) -> Poll<(), Closed> {
loop {
let state = self.inner.state.load(SeqCst).into();
match state {
State::Want => {
trace!("poll_want: taker wants!");
return Ok(Async::Ready(()));
},
State::Closed => {
trace!("poll_want: closed");
return Err(Closed { _inner: () });
},
State::Idle | State::Give => {
// Taker doesn't want anything yet, so park.
if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {
// While we have the lock, try to set to GIVE.
let old = self.inner.state.compare_and_swap(
state.into(),
State::Give.into(),
SeqCst,
);
// If it's still the first state (Idle or Give), park current task.
if old == state.into() {
let park = locked.as_ref()
.map(|t| !t.will_notify_current())
.unwrap_or(true);
if park {
let old = mem::replace(&mut *locked, Some(task::current()));
drop(locked);
old.map(|prev_task| {
// there was an old task parked here.
// it might be waiting to be notified,
// so poke it before dropping.
prev_task.notify();
});
}
return Ok(Async::NotReady)
}
// Otherwise, something happened! Go around the loop again.
} else {
// if we couldn't take the lock, then a Taker has it.
// The *ONLY* reason is because it is in the process of notifying us
// of its want.
//
// We need to loop again to see what state it was changed to.
}
},
}
}
}
/// Mark the state as idle, if the Taker currently is wanting.
///
/// Returns true if Taker was wanting, false otherwise.
#[inline]
pub fn give(&self) -> bool {
// only set to IDLE if it is still Want
self.inner.state.compare_and_swap(
State::Want.into(),
State::Idle.into(),
SeqCst,
) == State::Want.into()
}
/// Check if the `Taker` has called `want()` without parking a task.
///
/// This is safe to call outside of a futures task context, but other
/// means of being notified is left to the user.
#[inline]
pub fn is_wanting(&self) -> bool {
self.inner.state.load(SeqCst) == State::Want.into()
}
/// Check if the `Taker` has canceled interest without parking a task.
#[inline]
pub fn is_canceled(&self) -> bool {
self.inner.state.load(SeqCst) == State::Closed.into()
}
/// Converts this into a `SharedGiver`.
#[inline]
pub fn shared(self) -> SharedGiver {
SharedGiver {
inner: self.inner,
}
}
}
impl fmt::Debug for Giver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Giver")
.field("state", &self.inner.state())
.finish()
}
}
// ===== impl SharedGiver ======
impl SharedGiver {
/// Check if the `Taker` has called `want()` without parking a task.
///
/// This is safe to call outside of a futures task context, but other
/// means of being notified is left to the user.
#[inline]
pub fn is_wanting(&self) -> bool {
self.inner.state.load(SeqCst) == State::Want.into()
}
/// Check if the `Taker` has canceled interest without parking a task.
#[inline]
pub fn is_canceled(&self) -> bool {
self.inner.state.load(SeqCst) == State::Closed.into()
}
}
impl fmt::Debug for SharedGiver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SharedGiver")
.field("state", &self.inner.state())
.finish()
}
}
// ===== impl Taker ======
impl Taker {
/// Signal to the `Giver` that the want is canceled.
///
/// This is useful to tell that the channel is closed if you cannot
/// drop the value yet.
#[inline]
pub fn cancel(&mut self) {
trace!("signal: {:?}", State::Closed);
self.signal(State::Closed)
}
/// Signal to the `Giver` that a value is wanted.
#[inline]
pub fn want(&mut self) {
debug_assert!(
self.inner.state.load(SeqCst) != State::Closed.into(),
"want called after cancel"
);
trace!("signal: {:?}", State::Want);
self.signal(State::Want)
}
#[inline]
fn signal(&mut self, state: State) {
let old_state = self.inner.state.swap(state.into(), SeqCst).into();
match old_state {
State::Idle | State::Want | State::Closed => (),
State::Give => {
loop {
if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {
if let Some(task) = locked.take() {
drop(locked);
trace!("signal found waiting giver, notifying");
task.notify();
}
return;
} else {
// if we couldn't take the lock, then a Giver has it.
// The *ONLY* reason is because it is in the process of parking.
//
// We need to loop and take the lock so we can notify this task.
}
}
},
}
}
}
impl Drop for Taker {
#[inline]
fn drop(&mut self) {
self.signal(State::Closed);
}
}
impl fmt::Debug for Taker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Taker")
.field("state", &self.inner.state())
.finish()
}
}
// ===== impl Closed ======
impl fmt::Debug for Closed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Closed")
.finish()
}
}
// ===== impl Inner ======
impl Inner {
#[inline]
fn state(&self) -> State {
self.state.load(SeqCst).into()
}
}
#[cfg(test)]
mod tests {
use std::thread;
use futures::{Async, Stream};
use futures::future::{poll_fn, Future};
use futures::sync::{mpsc, oneshot};
use super::*;
#[test]
fn want_ready() {
let (mut gv, mut tk) = new();
tk.want();
assert!(gv.poll_want().unwrap().is_ready());
}
#[test]
fn want_notify_0() {
let (mut gv, mut tk) = new();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
tk.want();
// use a oneshot to keep this thread alive
// until other thread was notified of want
rx.wait().expect("rx");
});
poll_fn(|| {
gv.poll_want()
}).wait().expect("wait");
assert!(gv.is_wanting(), "still wanting after poll_want success");
assert!(gv.give(), "give is true when wanting");
assert!(!gv.is_wanting(), "no longer wanting after give");
assert!(!gv.is_canceled(), "give doesn't cancel");
assert!(!gv.give(), "give is false if not wanting");
tx.send(()).expect("tx");
}
/// This tests that if the Giver moves tasks after parking,
/// it will still wake up the correct task.
#[test]
fn want_notify_moving_tasks() {
use std::sync::Arc;
use futures::executor::{spawn, Notify, NotifyHandle};
struct WantNotify;
impl Notify for WantNotify {
fn notify(&self, _id: usize) {
}
}
fn n() -> NotifyHandle {
Arc::new(WantNotify).into()
}
let (mut gv, mut tk) = new();
let mut s = spawn(poll_fn(move || {
gv.poll_want()
}));
// Register with t1 as the task::current()
let t1 = n();
assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
thread::spawn(move || {
thread::sleep(::std::time::Duration::from_millis(100));
tk.want();
});
// And now, move to a ThreadNotify task.
s.into_inner().wait().expect("poll_want");
}
#[test]
fn cancel() {
// explicit
let (mut gv, mut tk) = new();
assert!(!gv.is_canceled());
tk.cancel();
assert!(gv.is_canceled());
assert!(gv.poll_want().is_err());
// implicit
let (mut gv, tk) = new();
assert!(!gv.is_canceled());
drop(tk);
assert!(gv.is_canceled());
assert!(gv.poll_want().is_err());
// notifies
let (mut gv, tk) = new();
thread::spawn(move || {
let _tk = tk;
// and dropped
});
poll_fn(move || {
gv.poll_want()
}).wait().expect_err("wait");
}
#[test]
fn stress() {
let nthreads = 5;
let nwants = 100;
for _ in 0..nthreads {
let (mut gv, mut tk) = new();
let (mut tx, mut rx) = mpsc::channel(0);
// rx thread
thread::spawn(move || {
let mut cnt = 0;
poll_fn(move || {
while cnt < nwants {
let n = match rx.poll().expect("rx poll") {
Async::Ready(n) => n.expect("rx opt"),
Async::NotReady => {
tk.want();
return Ok(Async::NotReady);
},
};
assert_eq!(cnt, n);
cnt += 1;
}
Ok::<_, ()>(Async::Ready(()))
}).wait().expect("rx wait");
});
// tx thread
thread::spawn(move || {
let mut cnt = 0;
let nsent = poll_fn(move || {
loop {
while let Ok(()) = tx.try_send(cnt) {
cnt += 1;
}
match gv.poll_want() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
Err(_) => return Ok(Async::Ready(cnt)),
}
}
}).wait().expect("tx wait");
assert_eq!(nsent, nwants);
}).join().expect("thread join");
}
}
}