blob: d67d48163abd481ac63ef7392fd3f357f0c602ef [file] [log] [blame]
//! Async broadcast channel
//!
//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
//! that implement [`Clone`].
//!
//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
//! among multiple threads.
//!
//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
//! closed, no more messages can be sent, but remaining messages can still be received.
//!
//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
//!
//! ## Examples
//!
//! ```rust
//! use async_broadcast::{broadcast, TryRecvError};
//! use futures_lite::{future::block_on, stream::StreamExt};
//!
//! block_on(async move {
//! let (s1, mut r1) = broadcast(2);
//! let s2 = s1.clone();
//! let mut r2 = r1.clone();
//!
//! // Send 2 messages from two different senders.
//! s1.broadcast(7).await.unwrap();
//! s2.broadcast(8).await.unwrap();
//!
//! // Channel is now at capacity so sending more messages will result in an error.
//! assert!(s2.try_broadcast(9).unwrap_err().is_full());
//! assert!(s1.try_broadcast(10).unwrap_err().is_full());
//!
//! // We can use `recv` method of the `Stream` implementation to receive messages.
//! assert_eq!(r1.next().await.unwrap(), 7);
//! assert_eq!(r1.recv().await.unwrap(), 8);
//! assert_eq!(r2.next().await.unwrap(), 7);
//! assert_eq!(r2.recv().await.unwrap(), 8);
//!
//! // All receiver got all messages so channel is now empty.
//! assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
//! assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
//!
//! // Drop both senders, which closes the channel.
//! drop(s1);
//! drop(s2);
//!
//! assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
//! assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
//! })
//! ```
#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, missing_doc_code_examples, unreachable_pub)]
#[cfg(doctest)]
mod doctests {
doc_comment::doctest!("../README.md");
}
use std::collections::VecDeque;
use std::error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use event_listener::{Event, EventListener};
use futures_core::stream::Stream;
/// Create a new broadcast channel.
///
/// The created channel has space to hold at most `cap` messages at a time.
///
/// # Panics
///
/// Capacity must be a positive number. If `cap` is zero, this function will panic.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
///
/// let (s, mut r1) = broadcast(1);
/// let mut r2 = r1.clone();
///
/// assert_eq!(s.broadcast(10).await, Ok(()));
/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
///
/// assert_eq!(r1.recv().await, Ok(10));
/// assert_eq!(r2.recv().await, Ok(10));
/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
/// # });
/// ```
pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
assert!(cap > 0, "capacity cannot be zero");
let inner = Arc::new(Mutex::new(Inner {
queue: VecDeque::with_capacity(cap),
receiver_count: 1,
sender_count: 1,
send_count: 0,
is_closed: false,
send_ops: Event::new(),
recv_ops: Event::new(),
}));
let s = Sender {
inner: inner.clone(),
capacity: cap,
};
let r = Receiver {
inner,
capacity: cap,
recv_count: 0,
listener: None,
};
(s, r)
}
#[derive(Debug)]
struct Inner<T> {
queue: VecDeque<(T, usize)>,
receiver_count: usize,
sender_count: usize,
send_count: usize,
is_closed: bool,
/// Send operations waiting while the channel is full.
send_ops: Event,
/// Receive operations waiting while the channel is empty and not closed.
recv_ops: Event,
}
impl<T> Inner<T> {
/// Closes the channel and notifies all waiting operations.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
fn close(&mut self) -> bool {
if self.is_closed {
return false;
}
self.is_closed = true;
// Notify all waiting senders and receivers.
self.send_ops.notify(usize::MAX);
self.recv_ops.notify(usize::MAX);
true
}
}
/// The sending side of the broadcast channel.
///
/// Senders can be cloned and shared among threads. When all senders associated with a channel are
/// dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Sender::close()`].
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Mutex<Inner<T>>>,
capacity: usize,
}
impl<T> Sender<T> {
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// use async_broadcast::broadcast;
///
/// let (s, r) = broadcast::<i32>(5);
/// assert_eq!(s.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl<T: Clone> Sender<T> {
/// Broadcasts a message on the channel.
///
/// If the channel is full, this method waits until there is space for a message.
///
/// If the channel is closed, this method returns an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, SendError};
///
/// let (s, r) = broadcast(1);
///
/// assert_eq!(s.broadcast(1).await, Ok(()));
/// drop(r);
/// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
/// # });
/// ```
pub fn broadcast(&self, msg: T) -> Send<'_, T> {
Send {
sender: self,
listener: None,
msg: Some(msg),
}
}
/// Attempts to broadcast a message on the channel.
///
/// If the channel is full or closed, this method returns an error.
///
/// # Examples
///
/// ```
/// use async_broadcast::{broadcast, TrySendError};
///
/// let (s, r) = broadcast(1);
///
/// assert_eq!(s.try_broadcast(1), Ok(()));
/// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
///
/// drop(r);
/// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
/// ```
pub fn try_broadcast(&self, msg: T) -> Result<(), TrySendError<T>> {
let mut inner = self.inner.lock().unwrap();
if inner.is_closed {
return Err(TrySendError::Closed(msg));
} else if inner.queue.len() == self.capacity {
return Err(TrySendError::Full(msg));
}
let receiver_count = inner.receiver_count;
inner.queue.push_back((msg, receiver_count));
inner.send_count += 1;
// Notify all awaiting receive operations.
inner.recv_ops.notify(usize::MAX);
Ok(())
}
/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
///
/// The remaining messages can still be received.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, RecvError};
///
/// let (s, mut r) = broadcast(1);
/// s.broadcast(1).await.unwrap();
/// assert!(s.close());
///
/// assert_eq!(r.recv().await.unwrap(), 1);
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub fn close(&self) -> bool {
self.inner.lock().unwrap().close()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap();
inner.sender_count -= 1;
if inner.sender_count == 0 {
inner.close();
}
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.lock().unwrap().sender_count += 1;
Sender {
inner: self.inner.clone(),
capacity: self.capacity,
}
}
}
/// The receiving side of a channel.
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Mutex<Inner<T>>>,
capacity: usize,
recv_count: usize,
/// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
}
impl<T: Clone> Receiver<T> {
/// Receives a message from the channel.
///
/// If the channel is empty, this method waits until there is a message.
///
/// If the channel is closed, this method receives a message or returns an error if there are
/// no more messages.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, RecvError};
///
/// let (s, mut r1) = broadcast(1);
/// let mut r2 = r1.clone();
///
/// assert_eq!(s.broadcast(1).await, Ok(()));
/// drop(s);
///
/// assert_eq!(r1.recv().await, Ok(1));
/// assert_eq!(r1.recv().await, Err(RecvError));
/// assert_eq!(r2.recv().await, Ok(1));
/// assert_eq!(r2.recv().await, Err(RecvError));
/// # });
/// ```
pub fn recv(&mut self) -> Recv<'_, T> {
Recv {
receiver: self,
listener: None,
}
}
/// Attempts to receive a message from the channel.
///
/// If the channel is empty or closed, this method returns an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, TryRecvError};
///
/// let (s, mut r1) = broadcast(1);
/// let mut r2 = r1.clone();
/// assert_eq!(s.broadcast(1).await, Ok(()));
///
/// assert_eq!(r1.try_recv(), Ok(1));
/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
/// assert_eq!(r2.try_recv(), Ok(1));
/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
///
/// drop(s);
/// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
/// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
/// # });
/// ```
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
let mut inner = self.inner.lock().unwrap();
let msg_count = inner.send_count - self.recv_count;
if msg_count == 0 {
if inner.is_closed {
return Err(TryRecvError::Closed);
} else {
return Err(TryRecvError::Empty);
}
}
let len = inner.queue.len();
let msg = inner.queue[len - msg_count].0.clone();
inner.queue[len - msg_count].1 -= 1;
if inner.queue[len - msg_count].1 == 0 {
inner.queue.pop_front();
// Notify 1 awaiting senders that there is now room. If there is still room in the
// queue, the notified operation will notify another awaiting sender.
inner.send_ops.notify(1);
}
self.recv_count += 1;
Ok(msg)
}
/// Returns the channel capacity.
///
/// # Examples
///
/// ```
/// use async_broadcast::broadcast;
///
/// let (s, r) = broadcast::<i32>(5);
/// assert_eq!(r.capacity(), 5);
/// ```
pub fn capacity(&self) -> usize {
self.capacity
}
/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
///
/// The remaining messages can still be received.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, RecvError};
///
/// let (s, mut r) = broadcast(1);
/// s.broadcast(1).await.unwrap();
/// assert!(s.close());
///
/// assert_eq!(r.recv().await.unwrap(), 1);
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub fn close(&self) -> bool {
self.inner.lock().unwrap().close()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap();
let msg_count = inner.send_count - self.recv_count;
let len = inner.queue.len();
for i in len - msg_count..len {
inner.queue[i].1 -= 1;
}
let mut poped = false;
while let Some((_, 0)) = inner.queue.front() {
inner.queue.pop_front();
if !poped {
poped = true;
}
}
if poped {
// Notify 1 awaiting senders that there is now room. If there is still room in the
// queue, the notified operation will notify another awaiting sender.
inner.send_ops.notify(1);
}
inner.receiver_count -= 1;
if inner.receiver_count == 0 {
inner.close();
}
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let mut inner = self.inner.lock().unwrap();
inner.receiver_count += 1;
Receiver {
inner: self.inner.clone(),
capacity: self.capacity,
recv_count: inner.send_count,
listener: None,
}
}
}
impl<T: Clone> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
futures_core::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.lock().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.is_closed && inner.queue.is_empty()
}
}
/// An error returned from [`Sender::broadcast()`].
///
/// Received because the channel is closed.
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError(..)")
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending into a closed channel")
}
}
/// An error returned from [`Sender::try_broadcast()`].
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
/// The channel is full but not closed.
Full(T),
/// The channel is closed.
Closed(T),
}
impl<T> TrySendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(t) => t,
TrySendError::Closed(t) => t,
}
}
/// Returns `true` if the channel is full but not closed.
pub fn is_full(&self) -> bool {
match self {
TrySendError::Full(_) => true,
TrySendError::Closed(_) => false,
}
}
/// Returns `true` if the channel is closed.
pub fn is_closed(&self) -> bool {
match self {
TrySendError::Full(_) => false,
TrySendError::Closed(_) => true,
}
}
}
impl<T> error::Error for TrySendError<T> {}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Closed(..) => write!(f, "Closed(..)"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "sending into a full channel"),
TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
}
}
}
/// An error returned from [`Receiver::recv()`].
///
/// Received because the channel is empty and closed.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct RecvError;
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "receiving from an empty and closed channel")
}
}
/// An error returned from [`Receiver::try_recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
/// The channel is empty but not closed.
Empty,
/// The channel is empty and closed.
Closed,
}
impl TryRecvError {
/// Returns `true` if the channel is empty but not closed.
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
TryRecvError::Closed => false,
}
}
/// Returns `true` if the channel is empty and closed.
pub fn is_closed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => true,
}
}
}
impl error::Error for TryRecvError {}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => write!(f, "receiving from an empty channel"),
TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
}
}
}
/// A future returned by [`Sender::broadcast()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T> Unpin for Send<'a, T> {}
impl<'a, T: Clone> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
let msg = this.msg.take().unwrap();
// Attempt to send a message.
match this.sender.try_broadcast(msg) {
Ok(()) => {
let inner = this.sender.inner.lock().unwrap();
if inner.queue.len() < this.sender.capacity() {
// Not full still, so notify the next awaiting sender.
inner.send_ops.notify(1);
}
return Poll::Ready(Ok(()));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
}
// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try sending again.
let inner = this.sender.inner.lock().unwrap();
this.listener = Some(inner.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<EventListener>,
}
impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T: Clone> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = {
let inner = this.receiver.inner.lock().unwrap();
Some(inner.recv_ops.listen())
};
}
Some(l) => {
// Wait for a notification.
match Pin::new(l).poll(cx) {
Poll::Ready(_) => {
this.listener = None;
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}
}