blob: 0009a7b54d4d861f666e6c89d26a706fddbdc0cf [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Additional functionality for use with asynchronous channels (futures::channel::mpsc).
use {
core::{
pin::Pin,
task::{Context, Poll},
},
futures::{channel::mpsc, ready, Future},
};
/// Extends the functionality of a channel to include `try_send_fut`.
pub trait TrySend<Item> {
/// Returns a future that will complete successfully when the item has been buffered in the
/// channel or unsuccessfully when the receiving end has been dropped.
///
/// The item is returned to the sender if it could not be sent. This is distinct from the
/// functionality found in the `futures` library which will consume the item regardless of
/// whether the item was successfully sent.
///
/// NOTE: even in the event of successful completion, there is no guarantee that the receiver
/// has consumed the message. It is possible for the receiver to drop its end of the channel
/// without consuming all queued items.
fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>;
}
impl<Item> TrySend<Item> for mpsc::Sender<Item> {
fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> {
TrySendFut::new(item, self)
}
}
/// A Future that represents an ongoing send over a channel.
/// It completes when the send is complete or the send failed due to channel closure.
#[must_use]
pub struct TrySendFut<'a, Item> {
// item must always be constructed with Some value to prevent a panic.
item: Option<Item>,
channel: &'a mut mpsc::Sender<Item>,
}
/// Returns a future that will complete successfully when the PeerTask has received the
/// message or unsuccessfully when the PeerTask has dropped its receiver.
impl<'a, Item> TrySendFut<'a, Item> {
/// Construct a new `TrySendFut`.
fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self {
Self { item: Some(item), channel }
}
}
impl<'a, Item> Unpin for TrySendFut<'a, Item> {}
impl<'a, Item> Future for TrySendFut<'a, Item> {
type Output = Result<(), Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let ready = ready!(self.channel.poll_ready(cx));
// `self.item` cannot be `None` because it is initialized as `Some`,
// and the value is reinserted before continuing below.
let item = self.item.take().expect("Cannot poll without `Some` item");
match ready {
Err(e) => {
// `mpsc::Sender::poll_ready` only errors on disconnection.
assert!(e.is_disconnected(), "{}", e);
return Poll::Ready(Err(item));
}
Ok(()) => {}
}
match self.channel.try_send(item) {
Ok(()) => return Poll::Ready(Ok(())),
Err(e) => {
if e.is_disconnected() {
return Poll::Ready(Err(e.into_inner()));
} else {
// We raced with a competing sender; reset `self.item`
// and wait again.
assert!(e.is_full(), "{}", e);
self.item = Some(e.into_inner());
continue;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fuchsia_async as fasync,
futures::{future::join, StreamExt},
};
#[fasync::run_until_stalled(test)]
async fn item_future_completes_on_receive() {
// Vec is chosen as the item type because it is !Copy and the primary use of this
// method is for items that are not implicitly copiable.
let (mut sender, mut receiver) = mpsc::channel(0);
let (send_result, receive_result) =
join(sender.try_send_fut(vec![1]), receiver.next()).await;
assert_eq!(send_result, Ok(()));
assert_eq!(receive_result, Some(vec![1]));
}
#[fasync::run_until_stalled(test)]
async fn item_future_errors_on_receiver_closed() {
let (mut sender, receiver) = mpsc::channel(0);
// Drop receiving end to force an error.
drop(receiver);
let send_result = sender.try_send_fut(vec![1]).await;
assert_eq!(send_result, Err(vec![1]));
}
#[test]
fn item_future_pending_on_buffer_full() {
let mut exec = fasync::TestExecutor::new();
let (mut sender, mut receiver) = mpsc::channel(0);
let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1]));
assert_eq!(send_result, Ok(()));
// Send a second item while the first is still in the channel.
let mut send_fut = sender.try_send_fut(vec![2]);
let send_poll_result = exec.run_until_stalled(&mut send_fut);
assert_eq!(send_poll_result, Poll::Pending);
// Consume the first item;
let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1])));
// Now there is room in the channel for the second item
let send_poll_result = exec.run_until_stalled(&mut send_fut);
assert_eq!(send_poll_result, Poll::Ready(Ok(())));
// The second item is received
let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2])));
}
}