blob: 3caa4b5d5cb5e7290b345176c36664c25954752c [file] [log] [blame]
use super::chan;
use futures::{Poll, Sink, StartSend, Stream};
use loom::sync::atomic::AtomicUsize;
use std::fmt;
/// Send values to the associated `UnboundedReceiver`.
///
/// Instances are created by the
/// [`unbounded_channel`](fn.unbounded_channel.html) function.
pub struct UnboundedSender<T> {
chan: chan::Tx<T, Semaphore>,
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
UnboundedSender {
chan: self.chan.clone(),
}
}
}
impl<T> fmt::Debug for UnboundedSender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("UnboundedSender")
.field("chan", &self.chan)
.finish()
}
}
/// Receive values from the associated `UnboundedSender`.
///
/// Instances are created by the
/// [`unbounded_channel`](fn.unbounded_channel.html) function.
pub struct UnboundedReceiver<T> {
/// The channel receiver
chan: chan::Rx<T, Semaphore>,
}
impl<T> fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("UnboundedReceiver")
.field("chan", &self.chan)
.finish()
}
}
/// Error returned by the `UnboundedSender`.
#[derive(Debug)]
pub struct UnboundedSendError(());
/// Returned by `UnboundedSender::try_send` when the channel has been closed.
#[derive(Debug)]
pub struct UnboundedTrySendError<T>(T);
/// Error returned by `UnboundedReceiver`.
#[derive(Debug)]
pub struct UnboundedRecvError(());
/// Create an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
/// A `send` on this channel will always succeed as long as the receive half has
/// not been closed. If the receiver falls behind, messages will be arbitrarily
/// buffered.
///
/// **Note** that the amount of available system memory is an implicit bound to
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = chan::channel(AtomicUsize::new(0));
let tx = UnboundedSender::new(tx);
let rx = UnboundedReceiver::new(rx);
(tx, rx)
}
/// No capacity
type Semaphore = AtomicUsize;
impl<T> UnboundedReceiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.chan.close();
}
}
impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
type Error = UnboundedRecvError;
fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
self.chan.recv().map_err(|_| UnboundedRecvError(()))
}
}
impl<T> UnboundedSender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
UnboundedSender { chan }
}
/// Attempts to send a message on this `UnboundedSender` without blocking.
pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
self.chan.try_send(message)?;
Ok(())
}
}
impl<T> Sink for UnboundedSender<T> {
type SinkItem = T;
type SinkError = UnboundedSendError;
fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
use futures::AsyncSink;
self.try_send(msg).map_err(|_| UnboundedSendError(()))?;
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
}
}
// ===== impl UnboundedSendError =====
impl fmt::Display for UnboundedSendError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl ::std::error::Error for UnboundedSendError {
fn description(&self) -> &str {
"channel closed"
}
}
// ===== impl TrySendError =====
impl<T> UnboundedTrySendError<T> {
/// Get the inner value.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {
fn description(&self) -> &str {
"channel closed"
}
}
impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
assert_eq!(chan::TrySendError::Closed, err);
UnboundedTrySendError(value)
}
}
// ===== impl UnboundedRecvError =====
impl fmt::Display for UnboundedRecvError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use std::error::Error;
write!(fmt, "{}", self.description())
}
}
impl ::std::error::Error for UnboundedRecvError {
fn description(&self) -> &str {
"channel closed"
}
}