blob: b3303bdce2106e5006cbb78e428541aeb8e272d7 [file] [log] [blame]
//! A single-producer, multi-consumer channel that only retains the *last* sent
//! value.
//!
//! This channel is useful for watching for changes to a value from multiple
//! points in the code base, for example, changes to configuration values.
//!
//! # Usage
//!
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
//! and sender halves of the channel. The channel is created with an initial
//! value. `Receiver::poll` will always be ready upon creation and will yield
//! either this initial value or the latest value that has been sent by
//! `Sender`.
//!
//! Calls to [`Receiver::poll`] and [`Receiver::poll_ref`] will always yield
//! the latest value.
//!
//! # Examples
//!
//! ```
//! # extern crate futures;
//! extern crate tokio;
//!
//! use tokio::prelude::*;
//! use tokio::sync::watch;
//!
//! # tokio::run(futures::future::lazy(|| {
//! let (mut tx, rx) = watch::channel("hello");
//!
//! tokio::spawn(rx.for_each(|value| {
//! println!("received = {:?}", value);
//! Ok(())
//! }).map_err(|_| ()));
//!
//! tx.broadcast("world").unwrap();
//! # Ok(())
//! # }));
//! ```
//!
//! # Closing
//!
//! [`Sender::poll_close`] allows the producer to detect when all [`Sender`]
//! handles have been dropped. This indicates that there is no further interest
//! in the values being produced and work can be stopped.
//!
//! # Thread safety
//!
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
//! handles may be moved to separate threads and also used concurrently.
//!
//! [`Sender`]: struct.Sender.html
//! [`Receiver`]: struct.Receiver.html
//! [`channel`]: fn.channel.html
//! [`Sender::poll_close`]: struct.Sender.html#method.poll_close
//! [`Receiver::poll`]: struct.Receiver.html#method.poll
//! [`Receiver::poll_ref`]: struct.Receiver.html#method.poll_ref
use fnv::FnvHashMap;
use futures::task::AtomicTask;
use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
/// Receives values from the associated `Sender`.
///
/// Instances are created by the [`channel`](fn.channel.html) function.
#[derive(Debug)]
pub struct Receiver<T> {
/// Pointer to the shared state
shared: Arc<Shared<T>>,
/// Pointer to the watcher's internal state
inner: Arc<WatchInner>,
/// Watcher ID.
id: u64,
/// Last observed version
ver: usize,
}
/// Sends values to the associated `Receiver`.
///
/// Instances are created by the [`channel`](fn.channel.html) function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Weak<Shared<T>>,
}
/// Returns a reference to the inner value
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long lived borrows could cause the produce half to block. It is recommended
/// to keep the borrow as short lived as possible.
#[derive(Debug)]
pub struct Ref<'a, T: 'a> {
inner: RwLockReadGuard<'a, T>,
}
pub mod error {
//! Watch error types
use std::fmt;
/// Error produced when receiving a value fails.
#[derive(Debug)]
pub struct RecvError {
pub(crate) _p: (),
}
/// Error produced when sending a value fails.
#[derive(Debug)]
pub struct SendError<T> {
pub(crate) inner: T,
}
// ===== impl RecvError =====
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl ::std::error::Error for RecvError {
fn description(&self) -> &str {
"channel closed"
}
}
// ===== impl SendError =====
impl<T: fmt::Debug> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl<T: fmt::Debug> ::std::error::Error for SendError<T> {
fn description(&self) -> &str {
"channel closed"
}
}
}
#[derive(Debug)]
struct Shared<T> {
/// The most recent value
value: RwLock<T>,
/// The current version
///
/// The lowest bit represents a "closed" state. The rest of the bits
/// represent the current version.
version: AtomicUsize,
/// All watchers
watchers: Mutex<Watchers>,
/// Task to notify when all watchers drop
cancel: AtomicTask,
}
#[derive(Debug)]
struct Watchers {
next_id: u64,
watchers: FnvHashMap<u64, Arc<WatchInner>>,
}
#[derive(Debug)]
struct WatchInner {
task: AtomicTask,
}
const CLOSED: usize = 1;
/// Create a new watch channel, returning the "send" and "receive" handles.
///
/// All values sent by `Sender` will become visible to the `Receiver` handles.
/// Only the last value sent is made available to the `Receiver` half. All
/// intermediate values are dropped.
///
/// # Examples
///
/// ```
/// # extern crate futures;
/// extern crate tokio;
///
/// use tokio::prelude::*;
/// use tokio::sync::watch;
///
/// # tokio::run(futures::future::lazy(|| {
/// let (mut tx, rx) = watch::channel("hello");
///
/// tokio::spawn(rx.for_each(|value| {
/// println!("received = {:?}", value);
/// Ok(())
/// }).map_err(|_| ()));
///
/// tx.broadcast("world").unwrap();
/// # Ok(())
/// # }));
/// ```
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
const INIT_ID: u64 = 0;
let inner = Arc::new(WatchInner::new());
// Insert the watcher
let mut watchers = FnvHashMap::with_capacity_and_hasher(0, Default::default());
watchers.insert(INIT_ID, inner.clone());
let shared = Arc::new(Shared {
value: RwLock::new(init),
version: AtomicUsize::new(2),
watchers: Mutex::new(Watchers {
next_id: INIT_ID + 1,
watchers,
}),
cancel: AtomicTask::new(),
});
let tx = Sender {
shared: Arc::downgrade(&shared),
};
let rx = Receiver {
shared,
inner,
id: INIT_ID,
ver: 0,
};
(tx, rx)
}
impl<T> Receiver<T> {
/// Returns a reference to the most recently sent value
///
/// Outstanding borrows hold a read lock. This means that long lived borrows
/// could cause the send half to block. It is recommended to keep the borrow
/// as short lived as possible.
///
/// # Examples
///
/// ```
/// # extern crate tokio;
/// # use tokio::sync::watch;
/// let (_, rx) = watch::channel("hello");
/// assert_eq!(*rx.get_ref(), "hello");
/// ```
pub fn get_ref(&self) -> Ref<T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
}
/// Attempts to receive the latest value sent via the channel.
///
/// If a new, unobserved, value has been sent, a reference to it is
/// returned. If no new value has been sent, then `NotReady` is returned and
/// the current task is notified once a new value is sent.
///
/// Only the **most recent** value is returned. If the receiver is falling
/// behind the sender, intermediate values are dropped.
pub fn poll_ref(&mut self) -> Poll<Option<Ref<T>>, error::RecvError> {
// Make sure the task is up to date
self.inner.task.register();
let state = self.shared.version.load(SeqCst);
let version = state & !CLOSED;
if version != self.ver {
// Track the latest version
self.ver = version;
let inner = self.shared.value.read().unwrap();
return Ok(Some(Ref { inner }).into());
}
if CLOSED == state & CLOSED {
// The `Store` handle has been dropped.
return Ok(None.into());
}
Ok(Async::NotReady)
}
}
impl<T: Clone> Stream for Receiver<T> {
type Item = T;
type Error = error::RecvError;
fn poll(&mut self) -> Poll<Option<T>, error::RecvError> {
let item = try_ready!(self.poll_ref());
Ok(Async::Ready(item.map(|v_ref| v_ref.clone())))
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let inner = Arc::new(WatchInner::new());
let shared = self.shared.clone();
let id = {
let mut watchers = shared.watchers.lock().unwrap();
let id = watchers.next_id;
watchers.next_id += 1;
watchers.watchers.insert(id, inner.clone());
id
};
let ver = self.ver;
Receiver {
shared: shared,
inner,
id,
ver,
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut watchers = self.shared.watchers.lock().unwrap();
watchers.watchers.remove(&self.id);
}
}
impl WatchInner {
fn new() -> Self {
WatchInner {
task: AtomicTask::new(),
}
}
}
impl<T> Sender<T> {
/// Broadcast a new value via the channel, notifying all receivers.
pub fn broadcast(&mut self, value: T) -> Result<(), error::SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
// All `Watch` handles have been canceled
None => return Err(error::SendError { inner: value }),
};
// Replace the value
{
let mut lock = shared.value.write().unwrap();
*lock = value;
}
// Update the version. 2 is used so that the CLOSED bit is not set.
shared.version.fetch_add(2, SeqCst);
// Notify all watchers
notify_all(&*shared);
// Return the old value
Ok(())
}
/// Returns `Ready` when all receivers have dropped.
///
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
pub fn poll_close(&mut self) -> Poll<(), ()> {
match self.shared.upgrade() {
Some(shared) => {
shared.cancel.register();
Ok(Async::NotReady)
}
None => Ok(Async::Ready(())),
}
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = error::SendError<T>;
fn start_send(&mut self, item: T) -> StartSend<T, error::SendError<T>> {
let _ = self.broadcast(item)?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), error::SendError<T>> {
Ok(().into())
}
}
/// Notify all watchers of a change
fn notify_all<T>(shared: &Shared<T>) {
let watchers = shared.watchers.lock().unwrap();
for watcher in watchers.watchers.values() {
// Notify the task
watcher.task.notify();
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if let Some(shared) = self.shared.upgrade() {
shared.version.fetch_or(CLOSED, SeqCst);
notify_all(&*shared);
}
}
}
// ===== impl Ref =====
impl<'a, T: 'a> ops::Deref for Ref<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.inner.deref()
}
}
// ===== impl Shared =====
impl<T> Drop for Shared<T> {
fn drop(&mut self) {
self.cancel.notify();
}
}