blob: 7839399d68029ba2ef4b1f0050927efb42ca5257 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![feature(await_macro, async_await)]
use {
futures::{
channel::mpsc::{unbounded, TrySendError, UnboundedSender},
Future, StreamExt,
},
std::pin::Pin,
};
/// A struct used to execute Futures in the order they are sent.
///
/// Any cloned `SequentialSender` shares the same underlying channel.
#[derive(Clone)]
pub struct SequentialSender {
/// The sender which is used to send futures for execution.
sender: UnboundedSender<Pin<Box<Future<Output = ()>>>>,
}
/// The result type for a call to `SequentialSender.send`. The `TrySendError` will contain the
/// future which failed to be sent on error.
pub type SendResult = Result<(), TrySendError<Pin<Box<Future<Output = ()>>>>>;
impl SequentialSender {
/// Creates a new `SequentialSender` and a `Future` which drives the execution of sent futures.
///
/// # Example
///
/// ```
/// let (sender, future) = SequentialSender::new();
/// spawn_local(future);
/// sender.send(
/// async move {
/// println!("Running first future!");
/// },
/// ).expect("Sent the future.");
/// ```
///
/// # Returns
/// A new `SequentialSender` and a `Future` which executes the sent futures.
pub fn new() -> (SequentialSender, impl Future<Output = ()>) {
let (sender, mut receiver) = unbounded();
(
SequentialSender { sender: sender },
async move {
while let Some(next_future) = await!(receiver.next()) {
await!(next_future);
}
},
)
}
/// Sends a future for execution.
///
/// All sent futures are executed in the order they were sent.
///
/// # Parameters
/// - `future`: The future to be queued for execution.
///
/// # Returns
/// A `SendResult` will contain a `TrySendError` with the future which failed to be sent on
/// error.
pub fn send<Fut>(&self, future: Fut) -> SendResult
where
Fut: Future<Output = ()> + 'static,
{
self.sender.unbounded_send(Pin::from(Box::new(future)))
}
}
#[cfg(test)]
mod tests {
use {
super::SequentialSender,
fuchsia_async::spawn_local,
futures::{channel::mpsc::channel, channel::oneshot, task::Poll, SinkExt, StreamExt},
pin_utils::pin_mut,
};
/// Tests that two futures which can be completed immediately are executed in the order they
/// are sent.
#[fuchsia_async::run_singlethreaded]
#[test]
async fn sequential_immediate() {
let (sequential_sender, sender_future) = SequentialSender::new();
// The channel which is used to verify the execution order of the futures.
let (result_sender, result_receiver) = channel::<&str>(0);
// The first future will send `first_expected_message` via `first_sender`.
let mut first_sender = result_sender.clone();
let first_expected_message = "first";
// The second future will send `second_expected_message` via `second_sender`.
let mut second_sender = result_sender.clone();
let second_expected_message = "second";
spawn_local(sender_future);
sequential_sender
.send(
async move {
await!(first_sender.send(first_expected_message))
.expect("Could not send first completion.");
},
)
.expect("Could not send first future.");
sequential_sender
.send(
async move {
await!(second_sender.send(second_expected_message))
.expect("Could not send second completion.");
},
)
.expect("Could not send second future.");
// Await the completion of the sent futures.
let (first_message, stream_tail) = await!(result_receiver.into_future());
let (second_message, _) = await!(stream_tail.into_future());
// Assert the messages were received in the correct order.
assert_eq!(first_message, Some(first_expected_message));
assert_eq!(second_message, Some(second_expected_message));
}
/// Tests that futures are executed in order by delaying the first future's execution until the
/// second future has been sent successfully.
#[test]
fn sequential_delayed() {
let (sequential_sender, sender_future) = SequentialSender::new();
pin_mut!(sender_future);
// This channel is used to delay the completion of the first sent future to make sure it
// does not complete before the second future has been sent and given a chance to execute
// (i.e. it helps verify that the second future does not begin executing until the first
// one is complete).
let (delay_sender, delay_receiver) = oneshot::channel();
// The channel which is used to verify the execution order of the futures.
let (result_sender, result_receiver) = channel::<&str>(0);
// The first future will send `first_future_started_message` when it begins executing, and
// `first_future_completed_message` when it completes, via `first_sender`.
let mut first_sender = result_sender.clone();
let first_future_started_message = "first";
let first_future_completed_message = "first_complete";
// The second future will send `second_future_completed_message` via `second_sender` when
// it has executed.
let mut second_sender = result_sender.clone();
let second_future_completed_message = "second";
let mut exec = fuchsia_async::Executor::new().expect("failed to create an executor");
// Send both the futures in order, and wait for them to have been sent successfully.
sequential_sender
.send(
async move {
await!(first_sender.send(first_future_started_message))
.expect("Could not send first started.");
await!(delay_receiver).unwrap();
await!(first_sender.send(first_future_completed_message))
.expect("Could not send first completion.");
},
)
.expect("Could not send first future.");
sequential_sender
.send(
async move {
await!(second_sender.send(second_future_completed_message))
.expect("Could not send second completion.");
},
)
.expect("Could not send second future.");
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Capture the first message, which should be the first future indicating that it began
// executing.
let (first_message, result_receiver) =
exec.run_singlethreaded(result_receiver.into_future());
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Allow the first future to continue executing.
delay_sender.send(()).unwrap();
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Await the sent futures having completed execution.
let (second_message, result_receiver) =
exec.run_singlethreaded(result_receiver.into_future());
let (third_message, _) = exec.run_singlethreaded(result_receiver.into_future());
// Assert the messages were received in the correct order.
assert_eq!(first_message, Some(first_future_started_message));
assert_eq!(second_message, Some(first_future_completed_message));
assert_eq!(third_message, Some(second_future_completed_message));
}
/// Tests that futures are executed in order when sent on a cloned sender.
#[test]
fn sequential_cloned() {
let (sequential_sender, sender_future) = SequentialSender::new();
let second_sequential_sender = sequential_sender.clone();
pin_mut!(sender_future);
// This channel is used to delay the execution of the first future which is sent, to make
// sure it does not execute prior to the second future being sent.
let (delay_sender, delay_receiver) = oneshot::channel();
// The channel which is used to verify the execution order of the futures.
let (result_sender, result_receiver) = channel::<&str>(0);
// The first future will send `first_expected_message` via `first_sender`.
let mut first_sender = result_sender.clone();
let first_future_started_message = "first";
let first_future_completed_message = "first_complete";
// The second future will send `second_expected_message` via `second_sender`.
let mut second_sender = result_sender.clone();
let second_future_completed_message = "second";
let mut exec = fuchsia_async::Executor::new().expect("failed to create an executor");
// Send both the futures in order, and wait for them to have been sent successfully.
sequential_sender
.send(
async move {
await!(first_sender.send(first_future_started_message))
.expect("Could not send first started.");
await!(delay_receiver).unwrap();
await!(first_sender.send(first_future_completed_message))
.expect("Could not send first completion.");
},
)
.expect("Could not send first future.");
second_sequential_sender
.send(
async move {
await!(second_sender.send(second_future_completed_message))
.expect("Could not send second completion.");
},
)
.expect("Could not send second future.");
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Capture the first message, which should be the first future indicating that it began
// executing.
let (first_message, result_receiver) =
exec.run_singlethreaded(result_receiver.into_future());
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Let the first future complete execution.
delay_sender.send(()).unwrap();
assert_eq!(exec.run_until_stalled(&mut sender_future), Poll::Pending);
// Await the sent futures having completed execution.
let (second_message, result_receiver) =
exec.run_singlethreaded(result_receiver.into_future());
let (third_message, _) = exec.run_singlethreaded(result_receiver.into_future());
// Assert the messages were received in the correct order.
assert_eq!(first_message, Some(first_future_started_message));
assert_eq!(second_message, Some(first_future_completed_message));
assert_eq!(third_message, Some(second_future_completed_message));
}
}