| //! An async multi-producer multi-consumer channel. | |
| //! | |
| //! There are two kinds of channels: | |
| //! | |
| //! 1. [Bounded][`bounded()`] channel with limited capacity. | |
| //! 2. [Unbounded][`unbounded()`] channel with unlimited capacity. | |
| //! | |
| //! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared | |
| //! among multiple threads. | |
| //! | |
| //! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a | |
| //! channel is closed, no more messages can be sent, but remaining messages can still be received. | |
| //! | |
| //! The channel can also be closed manually by calling [`Sender::close()`] or | |
| //! [`Receiver::close()`]. | |
| //! | |
| //! # Examples | |
| //! | |
| //! ``` | |
| //! # futures_lite::future::block_on(async { | |
| //! let (s, r) = async_channel::unbounded(); | |
| //! | |
| //! assert_eq!(s.send("Hello").await, Ok(())); | |
| //! assert_eq!(r.recv().await, Ok("Hello")); | |
| //! # }); | |
| //! ``` | |
| #![forbid(unsafe_code)] | |
| #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] | |
| use std::error; | |
| use std::fmt; | |
| use std::future::Future; | |
| use std::pin::Pin; | |
| use std::process; | |
| use std::sync::atomic::{AtomicUsize, Ordering}; | |
| use std::sync::Arc; | |
| use std::task::{Context, Poll}; | |
| use std::usize; | |
| use concurrent_queue::{ConcurrentQueue, PopError, PushError}; | |
| use event_listener::{Event, EventListener}; | |
| use futures_core::stream::Stream; | |
| struct Channel<T> { | |
| /// Inner message queue. | |
| queue: ConcurrentQueue<T>, | |
| /// Send operations waiting while the channel is full. | |
| send_ops: Event, | |
| /// Receive operations waiting while the channel is empty and not closed. | |
| recv_ops: Event, | |
| /// Stream operations while the channel is empty and not closed. | |
| stream_ops: Event, | |
| /// The number of currently active `Sender`s. | |
| sender_count: AtomicUsize, | |
| /// The number of currently active `Receivers`s. | |
| receiver_count: AtomicUsize, | |
| } | |
| impl<T> Channel<T> { | |
| /// Closes the channel and notifies all blocked operations. | |
| /// | |
| /// Returns `true` if this call has closed the channel and it was not closed already. | |
| fn close(&self) -> bool { | |
| if self.queue.close() { | |
| // Notify all send operations. | |
| self.send_ops.notify(usize::MAX); | |
| // Notify all receive and stream operations. | |
| self.recv_ops.notify(usize::MAX); | |
| self.stream_ops.notify(usize::MAX); | |
| true | |
| } else { | |
| false | |
| } | |
| } | |
| } | |
| /// Creates a bounded channel. | |
| /// | |
| /// The created channel has space to hold at most `cap` messages at a time. | |
| /// | |
| /// # Panics | |
| /// | |
| /// Capacity must be a positive number. If `cap` is zero, this function will panic. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{bounded, TryRecvError, TrySendError}; | |
| /// | |
| /// let (s, r) = bounded(1); | |
| /// | |
| /// assert_eq!(s.send(10).await, Ok(())); | |
| /// assert_eq!(s.try_send(20), Err(TrySendError::Full(20))); | |
| /// | |
| /// assert_eq!(r.recv().await, Ok(10)); | |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
| /// # }); | |
| pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { | |
| assert!(cap > 0, "capacity cannot be zero"); | |
| let channel = Arc::new(Channel { | |
| queue: ConcurrentQueue::bounded(cap), | |
| send_ops: Event::new(), | |
| recv_ops: Event::new(), | |
| stream_ops: Event::new(), | |
| sender_count: AtomicUsize::new(1), | |
| receiver_count: AtomicUsize::new(1), | |
| }); | |
| let s = Sender { | |
| channel: channel.clone(), | |
| }; | |
| let r = Receiver { | |
| channel, | |
| listener: None, | |
| }; | |
| (s, r) | |
| } | |
| /// Creates an unbounded channel. | |
| /// | |
| /// The created channel can hold an unlimited number of messages. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, TryRecvError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// | |
| /// assert_eq!(s.send(10).await, Ok(())); | |
| /// assert_eq!(s.send(20).await, Ok(())); | |
| /// | |
| /// assert_eq!(r.recv().await, Ok(10)); | |
| /// assert_eq!(r.recv().await, Ok(20)); | |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
| /// # }); | |
| pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { | |
| let channel = Arc::new(Channel { | |
| queue: ConcurrentQueue::unbounded(), | |
| send_ops: Event::new(), | |
| recv_ops: Event::new(), | |
| stream_ops: Event::new(), | |
| sender_count: AtomicUsize::new(1), | |
| receiver_count: AtomicUsize::new(1), | |
| }); | |
| let s = Sender { | |
| channel: channel.clone(), | |
| }; | |
| let r = Receiver { | |
| channel, | |
| listener: None, | |
| }; | |
| (s, r) | |
| } | |
| /// The sending side of a channel. | |
| /// | |
| /// Senders can be cloned and shared among threads. When all senders associated with a channel are | |
| /// dropped, the channel becomes closed. | |
| /// | |
| /// The channel can also be closed manually by calling [`Sender::close()`]. | |
| pub struct Sender<T> { | |
| /// Inner channel state. | |
| channel: Arc<Channel<T>>, | |
| } | |
| impl<T> Sender<T> { | |
| /// Attempts to send a message into the channel. | |
| /// | |
| /// If the channel is full or closed, this method returns an error. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// use async_channel::{bounded, TrySendError}; | |
| /// | |
| /// let (s, r) = bounded(1); | |
| /// | |
| /// assert_eq!(s.try_send(1), Ok(())); | |
| /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); | |
| /// | |
| /// drop(r); | |
| /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3))); | |
| /// ``` | |
| pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { | |
| match self.channel.queue.push(msg) { | |
| Ok(()) => { | |
| // Notify a single blocked receive operation. If the notified operation then | |
| // receives a message or gets canceled, it will notify another blocked receive | |
| // operation. | |
| self.channel.recv_ops.notify(1); | |
| // Notify all blocked streams. | |
| self.channel.stream_ops.notify(usize::MAX); | |
| Ok(()) | |
| } | |
| Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)), | |
| Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)), | |
| } | |
| } | |
| /// Sends a message into the channel. | |
| /// | |
| /// If the channel is full, this method waits until there is space for a message. | |
| /// | |
| /// If the channel is closed, this method returns an error. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, SendError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// | |
| /// assert_eq!(s.send(1).await, Ok(())); | |
| /// drop(r); | |
| /// assert_eq!(s.send(2).await, Err(SendError(2))); | |
| /// # }); | |
| /// ``` | |
| pub fn send(&self, msg: T) -> Send<'_, T> { | |
| Send { | |
| sender: self, | |
| listener: None, | |
| msg: Some(msg), | |
| } | |
| } | |
| /// Closes the channel. | |
| /// | |
| /// Returns `true` if this call has closed the channel and it was not closed already. | |
| /// | |
| /// The remaining messages can still be received. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, RecvError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// assert_eq!(s.send(1).await, Ok(())); | |
| /// assert!(s.close()); | |
| /// | |
| /// assert_eq!(r.recv().await, Ok(1)); | |
| /// assert_eq!(r.recv().await, Err(RecvError)); | |
| /// # }); | |
| /// ``` | |
| pub fn close(&self) -> bool { | |
| self.channel.close() | |
| } | |
| /// Returns `true` if the channel is closed. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, RecvError}; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert!(!s.is_closed()); | |
| /// | |
| /// drop(r); | |
| /// assert!(s.is_closed()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_closed(&self) -> bool { | |
| self.channel.queue.is_closed() | |
| } | |
| /// Returns `true` if the channel is empty. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// | |
| /// assert!(s.is_empty()); | |
| /// s.send(1).await; | |
| /// assert!(!s.is_empty()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_empty(&self) -> bool { | |
| self.channel.queue.is_empty() | |
| } | |
| /// Returns `true` if the channel is full. | |
| /// | |
| /// Unbounded channels are never full. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::bounded; | |
| /// | |
| /// let (s, r) = bounded(1); | |
| /// | |
| /// assert!(!s.is_full()); | |
| /// s.send(1).await; | |
| /// assert!(s.is_full()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_full(&self) -> bool { | |
| self.channel.queue.is_full() | |
| } | |
| /// Returns the number of messages in the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// assert_eq!(s.len(), 0); | |
| /// | |
| /// s.send(1).await; | |
| /// s.send(2).await; | |
| /// assert_eq!(s.len(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn len(&self) -> usize { | |
| self.channel.queue.len() | |
| } | |
| /// Returns the channel capacity if it's bounded. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// use async_channel::{bounded, unbounded}; | |
| /// | |
| /// let (s, r) = bounded::<i32>(5); | |
| /// assert_eq!(s.capacity(), Some(5)); | |
| /// | |
| /// let (s, r) = unbounded::<i32>(); | |
| /// assert_eq!(s.capacity(), None); | |
| /// ``` | |
| pub fn capacity(&self) -> Option<usize> { | |
| self.channel.queue.capacity() | |
| } | |
| /// Returns the number of receivers for the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert_eq!(s.receiver_count(), 1); | |
| /// | |
| /// let r2 = r.clone(); | |
| /// assert_eq!(s.receiver_count(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn receiver_count(&self) -> usize { | |
| self.channel.receiver_count.load(Ordering::SeqCst) | |
| } | |
| /// Returns the number of senders for the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert_eq!(s.sender_count(), 1); | |
| /// | |
| /// let s2 = s.clone(); | |
| /// assert_eq!(s.sender_count(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn sender_count(&self) -> usize { | |
| self.channel.sender_count.load(Ordering::SeqCst) | |
| } | |
| } | |
| impl<T> Drop for Sender<T> { | |
| fn drop(&mut self) { | |
| // Decrement the sender count and close the channel if it drops down to zero. | |
| if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 { | |
| self.channel.close(); | |
| } | |
| } | |
| } | |
| impl<T> fmt::Debug for Sender<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| write!(f, "Sender {{ .. }}") | |
| } | |
| } | |
| impl<T> Clone for Sender<T> { | |
| fn clone(&self) -> Sender<T> { | |
| let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed); | |
| // Make sure the count never overflows, even if lots of sender clones are leaked. | |
| if count > usize::MAX / 2 { | |
| process::abort(); | |
| } | |
| Sender { | |
| channel: self.channel.clone(), | |
| } | |
| } | |
| } | |
| /// The receiving side of a channel. | |
| /// | |
| /// Receivers can be cloned and shared among threads. When all receivers associated with a channel | |
| /// are dropped, the channel becomes closed. | |
| /// | |
| /// The channel can also be closed manually by calling [`Receiver::close()`]. | |
| /// | |
| /// Receivers implement the [`Stream`] trait. | |
| pub struct Receiver<T> { | |
| /// Inner channel state. | |
| channel: Arc<Channel<T>>, | |
| /// Listens for a send or close event to unblock this stream. | |
| listener: Option<EventListener>, | |
| } | |
| impl<T> Receiver<T> { | |
| /// Attempts to receive a message from the channel. | |
| /// | |
| /// If the channel is empty or closed, this method returns an error. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, TryRecvError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// assert_eq!(s.send(1).await, Ok(())); | |
| /// | |
| /// assert_eq!(r.try_recv(), Ok(1)); | |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); | |
| /// | |
| /// drop(s); | |
| /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed)); | |
| /// # }); | |
| /// ``` | |
| pub fn try_recv(&self) -> Result<T, TryRecvError> { | |
| match self.channel.queue.pop() { | |
| Ok(msg) => { | |
| // Notify a single blocked send operation. If the notified operation then sends a | |
| // message or gets canceled, it will notify another blocked send operation. | |
| self.channel.send_ops.notify(1); | |
| Ok(msg) | |
| } | |
| Err(PopError::Empty) => Err(TryRecvError::Empty), | |
| Err(PopError::Closed) => Err(TryRecvError::Closed), | |
| } | |
| } | |
| /// Receives a message from the channel. | |
| /// | |
| /// If the channel is empty, this method waits until there is a message. | |
| /// | |
| /// If the channel is closed, this method receives a message or returns an error if there are | |
| /// no more messages. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, RecvError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// | |
| /// assert_eq!(s.send(1).await, Ok(())); | |
| /// drop(s); | |
| /// | |
| /// assert_eq!(r.recv().await, Ok(1)); | |
| /// assert_eq!(r.recv().await, Err(RecvError)); | |
| /// # }); | |
| /// ``` | |
| pub fn recv(&self) -> Recv<'_, T> { | |
| Recv { | |
| receiver: self, | |
| listener: None, | |
| } | |
| } | |
| /// Closes the channel. | |
| /// | |
| /// Returns `true` if this call has closed the channel and it was not closed already. | |
| /// | |
| /// The remaining messages can still be received. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, RecvError}; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// assert_eq!(s.send(1).await, Ok(())); | |
| /// | |
| /// assert!(r.close()); | |
| /// assert_eq!(r.recv().await, Ok(1)); | |
| /// assert_eq!(r.recv().await, Err(RecvError)); | |
| /// # }); | |
| /// ``` | |
| pub fn close(&self) -> bool { | |
| self.channel.close() | |
| } | |
| /// Returns `true` if the channel is closed. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::{unbounded, RecvError}; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert!(!r.is_closed()); | |
| /// | |
| /// drop(s); | |
| /// assert!(r.is_closed()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_closed(&self) -> bool { | |
| self.channel.queue.is_closed() | |
| } | |
| /// Returns `true` if the channel is empty. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// | |
| /// assert!(s.is_empty()); | |
| /// s.send(1).await; | |
| /// assert!(!s.is_empty()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_empty(&self) -> bool { | |
| self.channel.queue.is_empty() | |
| } | |
| /// Returns `true` if the channel is full. | |
| /// | |
| /// Unbounded channels are never full. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::bounded; | |
| /// | |
| /// let (s, r) = bounded(1); | |
| /// | |
| /// assert!(!r.is_full()); | |
| /// s.send(1).await; | |
| /// assert!(r.is_full()); | |
| /// # }); | |
| /// ``` | |
| pub fn is_full(&self) -> bool { | |
| self.channel.queue.is_full() | |
| } | |
| /// Returns the number of messages in the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded(); | |
| /// assert_eq!(r.len(), 0); | |
| /// | |
| /// s.send(1).await; | |
| /// s.send(2).await; | |
| /// assert_eq!(r.len(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn len(&self) -> usize { | |
| self.channel.queue.len() | |
| } | |
| /// Returns the channel capacity if it's bounded. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// use async_channel::{bounded, unbounded}; | |
| /// | |
| /// let (s, r) = bounded::<i32>(5); | |
| /// assert_eq!(r.capacity(), Some(5)); | |
| /// | |
| /// let (s, r) = unbounded::<i32>(); | |
| /// assert_eq!(r.capacity(), None); | |
| /// ``` | |
| pub fn capacity(&self) -> Option<usize> { | |
| self.channel.queue.capacity() | |
| } | |
| /// Returns the number of receivers for the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert_eq!(r.receiver_count(), 1); | |
| /// | |
| /// let r2 = r.clone(); | |
| /// assert_eq!(r.receiver_count(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn receiver_count(&self) -> usize { | |
| self.channel.receiver_count.load(Ordering::SeqCst) | |
| } | |
| /// Returns the number of senders for the channel. | |
| /// | |
| /// # Examples | |
| /// | |
| /// ``` | |
| /// # futures_lite::future::block_on(async { | |
| /// use async_channel::unbounded; | |
| /// | |
| /// let (s, r) = unbounded::<()>(); | |
| /// assert_eq!(r.sender_count(), 1); | |
| /// | |
| /// let s2 = s.clone(); | |
| /// assert_eq!(r.sender_count(), 2); | |
| /// # }); | |
| /// ``` | |
| pub fn sender_count(&self) -> usize { | |
| self.channel.sender_count.load(Ordering::SeqCst) | |
| } | |
| } | |
| impl<T> Drop for Receiver<T> { | |
| fn drop(&mut self) { | |
| // Decrement the receiver count and close the channel if it drops down to zero. | |
| if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { | |
| self.channel.close(); | |
| } | |
| } | |
| } | |
| impl<T> fmt::Debug for Receiver<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| write!(f, "Receiver {{ .. }}") | |
| } | |
| } | |
| impl<T> Clone for Receiver<T> { | |
| fn clone(&self) -> Receiver<T> { | |
| let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed); | |
| // Make sure the count never overflows, even if lots of receiver clones are leaked. | |
| if count > usize::MAX / 2 { | |
| process::abort(); | |
| } | |
| Receiver { | |
| channel: self.channel.clone(), | |
| listener: None, | |
| } | |
| } | |
| } | |
| impl<T> Stream for Receiver<T> { | |
| type Item = T; | |
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
| loop { | |
| // If this stream is listening for events, first wait for a notification. | |
| if let Some(listener) = self.listener.as_mut() { | |
| futures_core::ready!(Pin::new(listener).poll(cx)); | |
| self.listener = None; | |
| } | |
| loop { | |
| // Attempt to receive a message. | |
| match self.try_recv() { | |
| Ok(msg) => { | |
| // The stream is not blocked on an event - drop the listener. | |
| self.listener = None; | |
| return Poll::Ready(Some(msg)); | |
| } | |
| Err(TryRecvError::Closed) => { | |
| // The stream is not blocked on an event - drop the listener. | |
| self.listener = None; | |
| return Poll::Ready(None); | |
| } | |
| Err(TryRecvError::Empty) => {} | |
| } | |
| // Receiving failed - now start listening for notifications or wait for one. | |
| match self.listener.as_mut() { | |
| None => { | |
| // Create a listener and try sending the message again. | |
| self.listener = Some(self.channel.stream_ops.listen()); | |
| } | |
| Some(_) => { | |
| // Go back to the outer loop to poll the listener. | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| impl<T> futures_core::stream::FusedStream for Receiver<T> { | |
| fn is_terminated(&self) -> bool { | |
| self.channel.queue.is_closed() && self.channel.queue.is_empty() | |
| } | |
| } | |
| /// An error returned from [`Sender::send()`]. | |
| /// | |
| /// Received because the channel is closed. | |
| #[derive(PartialEq, Eq, Clone, Copy)] | |
| pub struct SendError<T>(pub T); | |
| impl<T> SendError<T> { | |
| /// Unwraps the message that couldn't be sent. | |
| pub fn into_inner(self) -> T { | |
| self.0 | |
| } | |
| } | |
| impl<T> error::Error for SendError<T> {} | |
| impl<T> fmt::Debug for SendError<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| write!(f, "SendError(..)") | |
| } | |
| } | |
| impl<T> fmt::Display for SendError<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| write!(f, "sending into a closed channel") | |
| } | |
| } | |
| /// An error returned from [`Sender::try_send()`]. | |
| #[derive(PartialEq, Eq, Clone, Copy)] | |
| pub enum TrySendError<T> { | |
| /// The channel is full but not closed. | |
| Full(T), | |
| /// The channel is closed. | |
| Closed(T), | |
| } | |
| impl<T> TrySendError<T> { | |
| /// Unwraps the message that couldn't be sent. | |
| pub fn into_inner(self) -> T { | |
| match self { | |
| TrySendError::Full(t) => t, | |
| TrySendError::Closed(t) => t, | |
| } | |
| } | |
| /// Returns `true` if the channel is full but not closed. | |
| pub fn is_full(&self) -> bool { | |
| match self { | |
| TrySendError::Full(_) => true, | |
| TrySendError::Closed(_) => false, | |
| } | |
| } | |
| /// Returns `true` if the channel is closed. | |
| pub fn is_closed(&self) -> bool { | |
| match self { | |
| TrySendError::Full(_) => false, | |
| TrySendError::Closed(_) => true, | |
| } | |
| } | |
| } | |
| impl<T> error::Error for TrySendError<T> {} | |
| impl<T> fmt::Debug for TrySendError<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| match *self { | |
| TrySendError::Full(..) => write!(f, "Full(..)"), | |
| TrySendError::Closed(..) => write!(f, "Closed(..)"), | |
| } | |
| } | |
| } | |
| impl<T> fmt::Display for TrySendError<T> { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| match *self { | |
| TrySendError::Full(..) => write!(f, "sending into a full channel"), | |
| TrySendError::Closed(..) => write!(f, "sending into a closed channel"), | |
| } | |
| } | |
| } | |
| /// An error returned from [`Receiver::recv()`]. | |
| /// | |
| /// Received because the channel is empty and closed. | |
| #[derive(PartialEq, Eq, Clone, Copy, Debug)] | |
| pub struct RecvError; | |
| impl error::Error for RecvError {} | |
| impl fmt::Display for RecvError { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| write!(f, "receiving from an empty and closed channel") | |
| } | |
| } | |
| /// An error returned from [`Receiver::try_recv()`]. | |
| #[derive(PartialEq, Eq, Clone, Copy, Debug)] | |
| pub enum TryRecvError { | |
| /// The channel is empty but not closed. | |
| Empty, | |
| /// The channel is empty and closed. | |
| Closed, | |
| } | |
| impl TryRecvError { | |
| /// Returns `true` if the channel is empty but not closed. | |
| pub fn is_empty(&self) -> bool { | |
| match self { | |
| TryRecvError::Empty => true, | |
| TryRecvError::Closed => false, | |
| } | |
| } | |
| /// Returns `true` if the channel is empty and closed. | |
| pub fn is_closed(&self) -> bool { | |
| match self { | |
| TryRecvError::Empty => false, | |
| TryRecvError::Closed => true, | |
| } | |
| } | |
| } | |
| impl error::Error for TryRecvError {} | |
| impl fmt::Display for TryRecvError { | |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
| match *self { | |
| TryRecvError::Empty => write!(f, "receiving from an empty channel"), | |
| TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"), | |
| } | |
| } | |
| } | |
| /// A future returned by [`Sender::send()`]. | |
| #[derive(Debug)] | |
| #[must_use = "futures do nothing unless .awaited"] | |
| pub struct Send<'a, T> { | |
| sender: &'a Sender<T>, | |
| listener: Option<EventListener>, | |
| msg: Option<T>, | |
| } | |
| impl<'a, T> Unpin for Send<'a, T> {} | |
| impl<'a, T> Future for Send<'a, T> { | |
| type Output = Result<(), SendError<T>>; | |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
| let mut this = Pin::new(self); | |
| loop { | |
| let msg = this.msg.take().unwrap(); | |
| // Attempt to send a message. | |
| match this.sender.try_send(msg) { | |
| Ok(()) => { | |
| // If the capacity is larger than 1, notify another blocked send operation. | |
| match this.sender.channel.queue.capacity() { | |
| Some(1) => {} | |
| Some(_) | None => this.sender.channel.send_ops.notify(1), | |
| } | |
| return Poll::Ready(Ok(())); | |
| } | |
| Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), | |
| Err(TrySendError::Full(m)) => this.msg = Some(m), | |
| } | |
| // Sending failed - now start listening for notifications or wait for one. | |
| match &mut this.listener { | |
| None => { | |
| // Start listening and then try receiving again. | |
| this.listener = Some(this.sender.channel.send_ops.listen()); | |
| } | |
| Some(l) => { | |
| // Wait for a notification. | |
| match Pin::new(l).poll(cx) { | |
| Poll::Ready(_) => { | |
| this.listener = None; | |
| continue; | |
| } | |
| Poll::Pending => return Poll::Pending, | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| /// A future returned by [`Receiver::recv()`]. | |
| #[derive(Debug)] | |
| #[must_use = "futures do nothing unless .awaited"] | |
| pub struct Recv<'a, T> { | |
| receiver: &'a Receiver<T>, | |
| listener: Option<EventListener>, | |
| } | |
| impl<'a, T> Unpin for Recv<'a, T> {} | |
| impl<'a, T> Future for Recv<'a, T> { | |
| type Output = Result<T, RecvError>; | |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
| let mut this = Pin::new(self); | |
| loop { | |
| // Attempt to receive a message. | |
| match this.receiver.try_recv() { | |
| Ok(msg) => { | |
| // If the capacity is larger than 1, notify another blocked receive operation. | |
| // There is no need to notify stream operations because all of them get | |
| // notified every time a message is sent into the channel. | |
| match this.receiver.channel.queue.capacity() { | |
| Some(1) => {} | |
| Some(_) | None => this.receiver.channel.recv_ops.notify(1), | |
| } | |
| return Poll::Ready(Ok(msg)); | |
| } | |
| Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), | |
| Err(TryRecvError::Empty) => {} | |
| } | |
| // Receiving failed - now start listening for notifications or wait for one. | |
| match &mut this.listener { | |
| None => { | |
| // Start listening and then try receiving again. | |
| this.listener = Some(this.receiver.channel.recv_ops.listen()); | |
| } | |
| Some(l) => { | |
| // Wait for a notification. | |
| match Pin::new(l).poll(cx) { | |
| Poll::Ready(_) => { | |
| this.listener = None; | |
| continue; | |
| } | |
| Poll::Pending => return Poll::Pending, | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } |