| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! Additional functionality for use with asynchronous channels (futures::channel::mpsc). |
| |
| use { |
| core::{ |
| pin::Pin, |
| task::{Context, Poll}, |
| }, |
| futures::{channel::mpsc, ready, Future}, |
| }; |
| |
| /// Extends the functionality of a channel to include `try_send_fut`. |
| pub trait TrySend<Item> { |
| /// Returns a future that will complete successfully when the item has been buffered in the |
| /// channel or unsuccessfully when the receiving end has been dropped. |
| /// |
| /// The item is returned to the sender if it could not be sent. This is distinct from the |
| /// functionality found in the `futures` library which will consume the item regardless of |
| /// whether the item was successfully sent. |
| /// |
| /// NOTE: even in the event of successful completion, there is no guarantee that the receiver |
| /// has consumed the message. It is possible for the receiver to drop its end of the channel |
| /// without consuming all queued items. |
| fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>; |
| } |
| |
| impl<Item> TrySend<Item> for mpsc::Sender<Item> { |
| fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> { |
| TrySendFut::new(item, self) |
| } |
| } |
| |
| /// A Future that represents an ongoing send over a channel. |
| /// It completes when the send is complete or the send failed due to channel closure. |
| #[must_use] |
| pub struct TrySendFut<'a, Item> { |
| // item must always be constructed with Some value to prevent a panic. |
| item: Option<Item>, |
| channel: &'a mut mpsc::Sender<Item>, |
| } |
| |
| /// Returns a future that will complete successfully when the PeerTask has received the |
| /// message or unsuccessfully when the PeerTask has dropped its receiver. |
| impl<'a, Item> TrySendFut<'a, Item> { |
| /// Construct a new `TrySendFut`. |
| fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self { |
| Self { item: Some(item), channel } |
| } |
| } |
| |
| impl<'a, Item> Unpin for TrySendFut<'a, Item> {} |
| |
| impl<'a, Item> Future for TrySendFut<'a, Item> { |
| type Output = Result<(), Item>; |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| loop { |
| let ready = ready!(self.channel.poll_ready(cx)); |
| // `self.item` cannot be `None` because it is initialized as `Some`, |
| // and the value is reinserted before continuing below. |
| let item = self.item.take().expect("Cannot poll without `Some` item"); |
| match ready { |
| Err(e) => { |
| // `mpsc::Sender::poll_ready` only errors on disconnection. |
| assert!(e.is_disconnected(), "{}", e); |
| return Poll::Ready(Err(item)); |
| } |
| Ok(()) => {} |
| } |
| match self.channel.try_send(item) { |
| Ok(()) => return Poll::Ready(Ok(())), |
| Err(e) => { |
| if e.is_disconnected() { |
| return Poll::Ready(Err(e.into_inner())); |
| } else { |
| // We raced with a competing sender; reset `self.item` |
| // and wait again. |
| assert!(e.is_full(), "{}", e); |
| self.item = Some(e.into_inner()); |
| continue; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| fuchsia_async as fasync, |
| futures::{future::join, StreamExt}, |
| }; |
| |
| #[fasync::run_until_stalled(test)] |
| async fn item_future_completes_on_receive() { |
| // Vec is chosen as the item type because it is !Copy and the primary use of this |
| // method is for items that are not implicitly copiable. |
| let (mut sender, mut receiver) = mpsc::channel(0); |
| let (send_result, receive_result) = |
| join(sender.try_send_fut(vec![1]), receiver.next()).await; |
| assert_eq!(send_result, Ok(())); |
| assert_eq!(receive_result, Some(vec![1])); |
| } |
| |
| #[fasync::run_until_stalled(test)] |
| async fn item_future_errors_on_receiver_closed() { |
| let (mut sender, receiver) = mpsc::channel(0); |
| // Drop receiving end to force an error. |
| drop(receiver); |
| let send_result = sender.try_send_fut(vec![1]).await; |
| assert_eq!(send_result, Err(vec![1])); |
| } |
| |
| #[test] |
| fn item_future_pending_on_buffer_full() { |
| let mut exec = fasync::TestExecutor::new(); |
| let (mut sender, mut receiver) = mpsc::channel(0); |
| |
| let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1])); |
| assert_eq!(send_result, Ok(())); |
| |
| // Send a second item while the first is still in the channel. |
| let mut send_fut = sender.try_send_fut(vec![2]); |
| let send_poll_result = exec.run_until_stalled(&mut send_fut); |
| assert_eq!(send_poll_result, Poll::Pending); |
| |
| // Consume the first item; |
| let receive_poll_result = exec.run_until_stalled(&mut receiver.next()); |
| assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1]))); |
| |
| // Now there is room in the channel for the second item |
| let send_poll_result = exec.run_until_stalled(&mut send_fut); |
| assert_eq!(send_poll_result, Poll::Ready(Ok(()))); |
| |
| // The second item is received |
| let receive_poll_result = exec.run_until_stalled(&mut receiver.next()); |
| assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2]))); |
| } |
| } |