blob: 854d0bd49d1f387853ed40b48fca305be17de250 [file] [log] [blame]
//! An async multi-producer multi-consumer channel, where each message can be received by only
//! one of all existing consumers.
//!
//! There are two kinds of channels:
//!
//! 1. [Bounded][`bounded()`] channel with limited capacity.
//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
//!
//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
//! among multiple threads.
//!
//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
//! channel is closed, no more messages can be sent, but remaining messages can still be received.
//!
//! The channel can also be closed manually by calling [`Sender::close()`] or
//! [`Receiver::close()`].
//!
//! # Examples
//!
//! ```
//! # futures_lite::future::block_on(async {
//! let (s, r) = async_channel::unbounded();
//!
//! assert_eq!(s.send("Hello").await, Ok(()));
//! assert_eq!(r.recv().await, Ok("Hello"));
//! # });
//! ```
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::usize;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use event_listener::{Event, EventListener};
use futures_core::stream::Stream;
struct Channel<T> {
/// Inner message queue.
queue: ConcurrentQueue<T>,
/// Send operations waiting while the channel is full.
send_ops: Event,
/// Receive operations waiting while the channel is empty and not closed.
recv_ops: Event,
/// Stream operations while the channel is empty and not closed.
stream_ops: Event,
/// The number of currently active `Sender`s.
sender_count: AtomicUsize,
/// The number of currently active `Receivers`s.
receiver_count: AtomicUsize,
}
impl<T> Channel<T> {
/// Closes the channel and notifies all blocked operations.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
fn close(&self) -> bool {
if self.queue.close() {
// Notify all send operations.
self.send_ops.notify(usize::MAX);
// Notify all receive and stream operations.
self.recv_ops.notify(usize::MAX);
self.stream_ops.notify(usize::MAX);
true
} else {
false
}
}
}
/// Creates a bounded channel.
///
/// The created channel has space to hold at most `cap` messages at a time.
///
/// # Panics
///
/// Capacity must be a positive number. If `cap` is zero, this function will panic.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{bounded, TryRecvError, TrySendError};
///
/// let (s, r) = bounded(1);
///
/// assert_eq!(s.send(10).await, Ok(()));
/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
///
/// assert_eq!(r.recv().await, Ok(10));
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
/// # });
/// ```
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
assert!(cap > 0, "capacity cannot be zero");
let channel = Arc::new(Channel {
queue: ConcurrentQueue::bounded(cap),
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
let s = Sender {
channel: channel.clone(),
};
let r = Receiver {
channel,
listener: None,
};
(s, r)
}
/// Creates an unbounded channel.
///
/// The created channel can hold an unlimited number of messages.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, TryRecvError};
///
/// let (s, r) = unbounded();
///
/// assert_eq!(s.send(10).await, Ok(()));
/// assert_eq!(s.send(20).await, Ok(()));
///
/// assert_eq!(r.recv().await, Ok(10));
/// assert_eq!(r.recv().await, Ok(20));
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
/// # });
/// ```
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let channel = Arc::new(Channel {
queue: ConcurrentQueue::unbounded(),
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});
let s = Sender {
channel: channel.clone(),
};
let r = Receiver {
channel,
listener: None,
};
(s, r)
}
/// The sending side of a channel.
///
/// Senders can be cloned and shared among threads. When all senders associated with a channel are
/// dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Sender::close()`].
pub struct Sender<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,
}
impl<T> Sender<T> {
/// Attempts to send a message into the channel.
///
/// If the channel is full or closed, this method returns an error.
///
/// # Examples
///
/// ```
/// use async_channel::{bounded, TrySendError};
///
/// let (s, r) = bounded(1);
///
/// assert_eq!(s.try_send(1), Ok(()));
/// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
///
/// drop(r);
/// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
/// ```
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
match self.channel.queue.push(msg) {
Ok(()) => {
// Notify a blocked receive operation. If the notified operation gets canceled,
// it will notify another blocked receive operation.
self.channel.recv_ops.notify_additional(1);
// Notify all blocked streams.
self.channel.stream_ops.notify(usize::MAX);
Ok(())
}
Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
}
}
/// Sends a message into the channel.
///
/// If the channel is full, this method waits until there is space for a message.
///
/// If the channel is closed, this method returns an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, SendError};
///
/// let (s, r) = unbounded();
///
/// assert_eq!(s.send(1).await, Ok(()));
/// drop(r);
/// assert_eq!(s.send(2).await, Err(SendError(2)));
/// # });
/// ```
pub fn send(&self, msg: T) -> Send<'_, T> {
Send {
sender: self,
listener: None,
msg: Some(msg),
}
}
/// Sends a message into this channel using the blocking strategy.
///
/// If the channel is full, this method will block until there is room.
/// If the channel is closed, this method returns an error.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
/// this method will block the current thread until the message is sent.
///
/// This method should not be used in an asynchronous context. It is intended
/// to be used such that a channel can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in deadlocks.
///
/// # Examples
///
/// ```
/// use async_channel::{unbounded, SendError};
///
/// let (s, r) = unbounded();
///
/// assert_eq!(s.send_blocking(1), Ok(()));
/// drop(r);
/// assert_eq!(s.send_blocking(2), Err(SendError(2)));
/// ```
pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
self.send(msg).wait()
}
/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
///
/// The remaining messages can still be received.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded();
/// assert_eq!(s.send(1).await, Ok(()));
/// assert!(s.close());
///
/// assert_eq!(r.recv().await, Ok(1));
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub fn close(&self) -> bool {
self.channel.close()
}
/// Returns `true` if the channel is closed.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded::<()>();
/// assert!(!s.is_closed());
///
/// drop(r);
/// assert!(s.is_closed());
/// # });
/// ```
pub fn is_closed(&self) -> bool {
self.channel.queue.is_closed()
}
/// Returns `true` if the channel is empty.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded();
///
/// assert!(s.is_empty());
/// s.send(1).await;
/// assert!(!s.is_empty());
/// # });
/// ```
pub fn is_empty(&self) -> bool {
self.channel.queue.is_empty()
}
/// Returns `true` if the channel is full.
///
/// Unbounded channels are never full.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::bounded;
///
/// let (s, r) = bounded(1);
///
/// assert!(!s.is_full());
/// s.send(1).await;
/// assert!(s.is_full());
/// # });
/// ```
pub fn is_full(&self) -> bool {
self.channel.queue.is_full()
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded();
/// assert_eq!(s.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(s.len(), 2);
/// # });
/// ```
pub fn len(&self) -> usize {
self.channel.queue.len()
}
/// Returns the channel capacity if it's bounded.
///
/// # Examples
///
/// ```
/// use async_channel::{bounded, unbounded};
///
/// let (s, r) = bounded::<i32>(5);
/// assert_eq!(s.capacity(), Some(5));
///
/// let (s, r) = unbounded::<i32>();
/// assert_eq!(s.capacity(), None);
/// ```
pub fn capacity(&self) -> Option<usize> {
self.channel.queue.capacity()
}
/// Returns the number of receivers for the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded::<()>();
/// assert_eq!(s.receiver_count(), 1);
///
/// let r2 = r.clone();
/// assert_eq!(s.receiver_count(), 2);
/// # });
/// ```
pub fn receiver_count(&self) -> usize {
self.channel.receiver_count.load(Ordering::SeqCst)
}
/// Returns the number of senders for the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded::<()>();
/// assert_eq!(s.sender_count(), 1);
///
/// let s2 = s.clone();
/// assert_eq!(s.sender_count(), 2);
/// # });
/// ```
pub fn sender_count(&self) -> usize {
self.channel.sender_count.load(Ordering::SeqCst)
}
/// Downgrade the sender to a weak reference.
pub fn downgrade(&self) -> WeakSender<T> {
WeakSender {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
// Decrement the sender count and close the channel if it drops down to zero.
if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Sender {{ .. }}")
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of sender clones are leaked.
if count > usize::MAX / 2 {
process::abort();
}
Sender {
channel: self.channel.clone(),
}
}
}
/// The receiving side of a channel.
///
/// Receivers can be cloned and shared among threads. When all receivers associated with a channel
/// are dropped, the channel becomes closed.
///
/// The channel can also be closed manually by calling [`Receiver::close()`].
///
/// Receivers implement the [`Stream`] trait.
pub struct Receiver<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,
/// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
}
impl<T> Receiver<T> {
/// Attempts to receive a message from the channel.
///
/// If the channel is empty, or empty and closed, this method returns an error.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, TryRecvError};
///
/// let (s, r) = unbounded();
/// assert_eq!(s.send(1).await, Ok(()));
///
/// assert_eq!(r.try_recv(), Ok(1));
/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
///
/// drop(s);
/// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
/// # });
/// ```
pub fn try_recv(&self) -> Result<T, TryRecvError> {
match self.channel.queue.pop() {
Ok(msg) => {
// Notify a blocked send operation. If the notified operation gets canceled, it
// will notify another blocked send operation.
self.channel.send_ops.notify_additional(1);
Ok(msg)
}
Err(PopError::Empty) => Err(TryRecvError::Empty),
Err(PopError::Closed) => Err(TryRecvError::Closed),
}
}
/// Receives a message from the channel.
///
/// If the channel is empty, this method waits until there is a message.
///
/// If the channel is closed, this method receives a message or returns an error if there are
/// no more messages.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded();
///
/// assert_eq!(s.send(1).await, Ok(()));
/// drop(s);
///
/// assert_eq!(r.recv().await, Ok(1));
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub fn recv(&self) -> Recv<'_, T> {
Recv {
receiver: self,
listener: None,
}
}
/// Receives a message from the channel using the blocking strategy.
///
/// If the channel is empty, this method waits until there is a message.
/// If the channel is closed, this method receives a message or returns an error if there are
/// no more messages.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
/// this method will block the current thread until the message is sent.
///
/// This method should not be used in an asynchronous context. It is intended
/// to be used such that a channel can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in deadlocks.
///
/// # Examples
///
/// ```
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded();
///
/// assert_eq!(s.send_blocking(1), Ok(()));
/// drop(s);
///
/// assert_eq!(r.recv_blocking(), Ok(1));
/// assert_eq!(r.recv_blocking(), Err(RecvError));
/// ```
pub fn recv_blocking(&self) -> Result<T, RecvError> {
self.recv().wait()
}
/// Closes the channel.
///
/// Returns `true` if this call has closed the channel and it was not closed already.
///
/// The remaining messages can still be received.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded();
/// assert_eq!(s.send(1).await, Ok(()));
///
/// assert!(r.close());
/// assert_eq!(r.recv().await, Ok(1));
/// assert_eq!(r.recv().await, Err(RecvError));
/// # });
/// ```
pub fn close(&self) -> bool {
self.channel.close()
}
/// Returns `true` if the channel is closed.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::{unbounded, RecvError};
///
/// let (s, r) = unbounded::<()>();
/// assert!(!r.is_closed());
///
/// drop(s);
/// assert!(r.is_closed());
/// # });
/// ```
pub fn is_closed(&self) -> bool {
self.channel.queue.is_closed()
}
/// Returns `true` if the channel is empty.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded();
///
/// assert!(s.is_empty());
/// s.send(1).await;
/// assert!(!s.is_empty());
/// # });
/// ```
pub fn is_empty(&self) -> bool {
self.channel.queue.is_empty()
}
/// Returns `true` if the channel is full.
///
/// Unbounded channels are never full.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::bounded;
///
/// let (s, r) = bounded(1);
///
/// assert!(!r.is_full());
/// s.send(1).await;
/// assert!(r.is_full());
/// # });
/// ```
pub fn is_full(&self) -> bool {
self.channel.queue.is_full()
}
/// Returns the number of messages in the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded();
/// assert_eq!(r.len(), 0);
///
/// s.send(1).await;
/// s.send(2).await;
/// assert_eq!(r.len(), 2);
/// # });
/// ```
pub fn len(&self) -> usize {
self.channel.queue.len()
}
/// Returns the channel capacity if it's bounded.
///
/// # Examples
///
/// ```
/// use async_channel::{bounded, unbounded};
///
/// let (s, r) = bounded::<i32>(5);
/// assert_eq!(r.capacity(), Some(5));
///
/// let (s, r) = unbounded::<i32>();
/// assert_eq!(r.capacity(), None);
/// ```
pub fn capacity(&self) -> Option<usize> {
self.channel.queue.capacity()
}
/// Returns the number of receivers for the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded::<()>();
/// assert_eq!(r.receiver_count(), 1);
///
/// let r2 = r.clone();
/// assert_eq!(r.receiver_count(), 2);
/// # });
/// ```
pub fn receiver_count(&self) -> usize {
self.channel.receiver_count.load(Ordering::SeqCst)
}
/// Returns the number of senders for the channel.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_channel::unbounded;
///
/// let (s, r) = unbounded::<()>();
/// assert_eq!(r.sender_count(), 1);
///
/// let s2 = s.clone();
/// assert_eq!(r.sender_count(), 2);
/// # });
/// ```
pub fn sender_count(&self) -> usize {
self.channel.sender_count.load(Ordering::SeqCst)
}
/// Downgrade the receiver to a weak reference.
pub fn downgrade(&self) -> WeakReceiver<T> {
WeakReceiver {
channel: self.channel.clone(),
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// Decrement the receiver count and close the channel if it drops down to zero.
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Receiver {{ .. }}")
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Receiver<T> {
let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
// Make sure the count never overflows, even if lots of receiver clones are leaked.
if count > usize::MAX / 2 {
process::abort();
}
Receiver {
channel: self.channel.clone(),
listener: None,
}
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
futures_core::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Create a listener and try sending the message again.
self.listener = Some(self.channel.stream_ops.listen());
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
impl<T> futures_core::stream::FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
self.channel.queue.is_closed() && self.channel.queue.is_empty()
}
}
/// A [`Sender`] that prevents the channel from not being closed.
///
/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
/// to be upgraded into a [`Sender`] through the `upgrade` method.
#[derive(Clone)]
pub struct WeakSender<T> {
channel: Arc<Channel<T>>,
}
impl<T> WeakSender<T> {
/// Upgrade the [`WeakSender`] into a [`Sender`].
pub fn upgrade(&self) -> Option<Sender<T>> {
if self.channel.queue.is_closed() {
None
} else {
let old_count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
if old_count == 0 {
// Channel was closed while we were incrementing the count.
self.channel.sender_count.store(0, Ordering::Release);
None
} else if old_count > usize::MAX / 2 {
// Make sure the count never overflows, even if lots of sender clones are leaked.
process::abort();
} else {
Some(Sender {
channel: self.channel.clone(),
})
}
}
}
}
impl<T> fmt::Debug for WeakSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WeakSender {{ .. }}")
}
}
/// A [`Receiver`] that prevents the channel from not being closed.
///
/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
/// to be upgraded into a [`Receiver`] through the `upgrade` method.
#[derive(Clone)]
pub struct WeakReceiver<T> {
channel: Arc<Channel<T>>,
}
impl<T> WeakReceiver<T> {
/// Upgrade the [`WeakReceiver`] into a [`Receiver`].
pub fn upgrade(&self) -> Option<Receiver<T>> {
if self.channel.queue.is_closed() {
None
} else {
let old_count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
if old_count == 0 {
// Channel was closed while we were incrementing the count.
self.channel.receiver_count.store(0, Ordering::Release);
None
} else if old_count > usize::MAX / 2 {
// Make sure the count never overflows, even if lots of receiver clones are leaked.
process::abort();
} else {
Some(Receiver {
channel: self.channel.clone(),
listener: None,
})
}
}
}
}
impl<T> fmt::Debug for WeakReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WeakReceiver {{ .. }}")
}
}
/// An error returned from [`Sender::send()`].
///
/// Received because the channel is closed.
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError(..)")
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending into a closed channel")
}
}
/// An error returned from [`Sender::try_send()`].
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
/// The channel is full but not closed.
Full(T),
/// The channel is closed.
Closed(T),
}
impl<T> TrySendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(t) => t,
TrySendError::Closed(t) => t,
}
}
/// Returns `true` if the channel is full but not closed.
pub fn is_full(&self) -> bool {
match self {
TrySendError::Full(_) => true,
TrySendError::Closed(_) => false,
}
}
/// Returns `true` if the channel is closed.
pub fn is_closed(&self) -> bool {
match self {
TrySendError::Full(_) => false,
TrySendError::Closed(_) => true,
}
}
}
impl<T> error::Error for TrySendError<T> {}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Closed(..) => write!(f, "Closed(..)"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "sending into a full channel"),
TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
}
}
}
/// An error returned from [`Receiver::recv()`].
///
/// Received because the channel is empty and closed.
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub struct RecvError;
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "receiving from an empty and closed channel")
}
}
/// An error returned from [`Receiver::try_recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
/// The channel is empty but not closed.
Empty,
/// The channel is empty and closed.
Closed,
}
impl TryRecvError {
/// Returns `true` if the channel is empty but not closed.
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
TryRecvError::Closed => false,
}
}
/// Returns `true` if the channel is empty and closed.
pub fn is_closed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => true,
}
}
}
impl error::Error for TryRecvError {}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => write!(f, "receiving from an empty channel"),
TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
}
}
}
/// A future returned by [`Sender::send()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T> Send<'a, T> {
/// Run this future with the given `Strategy`.
fn run_with_strategy<S: Strategy>(
&mut self,
cx: &mut S::Context,
) -> Poll<Result<(), SendError<T>>> {
loop {
let msg = self.msg.take().unwrap();
// Attempt to send a message.
match self.sender.try_send(msg) {
Ok(()) => return Poll::Ready(Ok(())),
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => self.msg = Some(m),
}
// Sending failed - now start listening for notifications or wait for one.
match self.listener.take() {
None => {
// Start listening and then try sending again.
self.listener = Some(self.sender.channel.send_ops.listen());
}
Some(l) => {
// Poll using the given strategy
if let Err(l) = S::poll(l, cx) {
self.listener = Some(l);
return Poll::Pending;
}
}
}
}
}
/// Run using the blocking strategy.
fn wait(mut self) -> Result<(), SendError<T>> {
match self.run_with_strategy::<Blocking>(&mut ()) {
Poll::Ready(res) => res,
Poll::Pending => unreachable!(),
}
}
}
impl<'a, T> Unpin for Send<'a, T> {}
impl<'a, T> Future for Send<'a, T> {
type Output = Result<(), SendError<T>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.run_with_strategy::<NonBlocking<'_>>(cx)
}
}
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Recv<'a, T> {
receiver: &'a Receiver<T>,
listener: Option<EventListener>,
}
impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T> Recv<'a, T> {
/// Run this future with the given `Strategy`.
fn run_with_strategy<S: Strategy>(
&mut self,
cx: &mut S::Context,
) -> Poll<Result<T, RecvError>> {
loop {
// Attempt to receive a message.
match self.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match self.listener.take() {
None => {
// Start listening and then try receiving again.
self.listener = Some(self.receiver.channel.recv_ops.listen());
}
Some(l) => {
// Poll using the given strategy.
if let Err(l) = S::poll(l, cx) {
self.listener = Some(l);
return Poll::Pending;
}
}
}
}
}
/// Run with the blocking strategy.
fn wait(mut self) -> Result<T, RecvError> {
match self.run_with_strategy::<Blocking>(&mut ()) {
Poll::Ready(res) => res,
Poll::Pending => unreachable!(),
}
}
}
impl<'a, T> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.run_with_strategy::<NonBlocking<'_>>(cx)
}
}
/// A strategy used to poll an `EventListener`.
trait Strategy {
/// Context needed to be provided to the `poll` method.
type Context;
/// Polls the given `EventListener`.
///
/// Returns the `EventListener` back if it was not completed; otherwise,
/// returns `Ok(())`.
fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>;
}
/// Non-blocking strategy for use in asynchronous code.
struct NonBlocking<'a>(&'a mut ());
impl<'a> Strategy for NonBlocking<'a> {
type Context = Context<'a>;
fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> {
match Pin::new(&mut evl).poll(cx) {
Poll::Ready(()) => Ok(()),
Poll::Pending => Err(evl),
}
}
}
/// Blocking strategy for use in synchronous code.
struct Blocking;
impl Strategy for Blocking {
type Context = ();
fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> {
evl.wait();
Ok(())
}
}