blob: a375ed5baf430682b49810c869d17923052705f7 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Module providing the TaskGroup type, which manages the life cycle of a group of asynchronous
//! tasks on a multi-threaded Fuchsia executor.
use failure::Fail;
use fuchsia_async as fasync;
use futures::channel::oneshot;
use futures::future::{BoxFuture, FusedFuture, RemoteHandle, Shared};
use futures::lock::Mutex;
use futures::prelude::*;
use futures::select;
use futures::stream::FuturesUnordered;
use std::sync::Arc;
/// TaskGroupError is the failure type for this module.
#[derive(Debug, Fail)]
pub enum TaskGroupError {
/// AlreadyCancelled is returned when cancellation is in progress or cancellation is complete.
#[fail(display = "task group is already cancelled")]
AlreadyCancelled,
}
/// TaskGroup manages spawning and gracefully terminating asynchronous tasks on a multi-threaded
/// Fuchsia executor, where individual tasks receive cancellation signals and are in control of
/// terminating themselves. This allows the TaskGroup owner to await for completion of the tasks.
/// TaskGroups can also be nested, allowing for independent cancellation of a child TaskGroup, or
/// cancelling parent and child TaskGroups together (through cancelling the parent).
/// Note that a dropped TaskGroup will cancel its own tasks, but *not* its childrens' tasks. Hence,
/// it is not recommended to rely on std::ops::drop, but rather to always use cancel() prior to
/// destruction. (This might be changed in the future).
///
/// A TaskGroup has three implicit states: (1) active, (2) cancellation in progress and (3)
/// complete. State (1) is reflected in the success of `spawn`, and (3) is reflected as the success
/// of the `cancel` method. The possible state transitions are (1)->(2) and (2)->(3); note that
/// once a TaskGroup is cancelled new tasks can never be spawned on it again.
#[derive(Clone)]
pub struct TaskGroup {
/// The cancellation future, which completes when the cancellation begins.
cancel_receiver: TaskGroupCancel,
/// Mutable state about this TaskGroup, see TaskGroupState.
state: Arc<Mutex<TaskGroupState>>,
}
/// TaskGroupCancel is a future which each task is able to poll for cancellation intent.
pub type TaskGroupCancel = Shared<oneshot::Receiver<()>>;
/// TaskGroupState represents the TaskGroup's mutable state.
enum TaskGroupState {
/// The TaskGroup is ready to spawn tasks, representing state (1).
Active {
/// cancel_sender is a one-time trigger, used to begin cancellation.
cancel_sender: oneshot::Sender<()>,
/// tasks is a collection of all tasks that have been spawned on this TaskGroup.
tasks: FuturesUnordered<RemoteHandle<()>>,
/// collection of all children that have been created directly from this TaskGroup.
children: Vec<TaskGroup>,
},
/// The TaskGroup is cancelled, representing state (2) and (3).
Cancelled,
}
impl TaskGroup {
/// Create a new blank task group, ready for task spawning.
pub fn new() -> Self {
let (sender, receiver) = oneshot::channel();
let state = TaskGroupState::Active {
cancel_sender: sender,
tasks: FuturesUnordered::new(),
children: Vec::new(),
};
Self { cancel_receiver: receiver.shared(), state: Arc::new(Mutex::new(state)) }
}
/// Spawn a task on the Fuchsia executor, with a handle to the cancellation future for this
/// TaskGroup. Tasks are themselves responsible for polling the future, and if they do not
/// complete as a response, the TaskGroup will not be able to cancel at all, so well-behaved
/// tasks should poll or select over the cancellation signal whenever it is able to terminate
/// itself gracefully.
///
/// If a TaskGroup cancellation is (2) already in progress or (3) completed,
/// `Err(TaskGroupError::AlreadyCancelled)` will be returned.
pub async fn spawn<F, Fut>(&self, f: F) -> Result<(), TaskGroupError>
where
F: (FnOnce(TaskGroupCancel) -> Fut) + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let mut state_lock = self.state.lock().await;
let state = &mut *state_lock;
match state {
TaskGroupState::Cancelled => Err(TaskGroupError::AlreadyCancelled),
TaskGroupState::Active { tasks, .. } => {
// TODO(dnordstrom): Poll for and throw away already completed tasks.
let cancel_receiver = self.cancel_receiver.clone();
let inner_fn = f(cancel_receiver);
let (remote, remote_handle) = inner_fn.remote_handle();
tasks.push(remote_handle);
fasync::spawn(remote);
Ok(())
}
}
}
/// Cancel ongoing tasks and await for their completion. Note that all tasks that were
/// successfully added will be driven to completion. If more tasks are attempted to be spawn
/// during the lifetime of this method, they will be rejected with an AlreadyCancelled error.
///
/// If this TaskGroup has children, they will be cancelled (concurrently, in no particular
/// order) before the tasks of this TaskGroup are cancelled.
///
/// If a TaskGroup cancellation is (2) already in progress or (3) completed,
/// `Err(TaskGroupError::AlreadyCancelled)` will be returned.
pub fn cancel<'a>(&'a self) -> BoxFuture<'a, Result<(), TaskGroupError>> {
// Since this method is recursive, we cannot use `async fn` directly, hence the BoxFuture.
let state = self.state.clone();
async move {
let state = {
let mut state_lock = state.lock().await;
std::mem::replace(&mut *state_lock, TaskGroupState::Cancelled)
};
match state {
TaskGroupState::Cancelled => Err(TaskGroupError::AlreadyCancelled),
TaskGroupState::Active { cancel_sender, tasks, children } => {
let cancel_children: FuturesUnordered<_> =
children.iter().map(|child| child.cancel()).collect();
let _ = cancel_children.collect::<Vec<_>>().await;
let _ = cancel_sender.send(());
tasks.collect::<()>().await;
Ok(())
}
}
}
.boxed()
}
/// Create a child TaskGroup that will be automatically cancelled when the parent is cancelled.
pub async fn create_child(&self) -> Result<Self, TaskGroupError> {
let mut state_lock = self.state.lock().await;
let state = &mut *state_lock;
match state {
TaskGroupState::Cancelled => Err(TaskGroupError::AlreadyCancelled),
TaskGroupState::Active { children, .. } => {
let child = Self::new();
children.push(child.clone());
Ok(child)
}
}
}
}
/// Returns a future which resolves either when cancel or fut is ready. If both are ready, cancel
/// takes precedence. If the provided future wins, its output is returned. If cancel wins, None
/// is returned.
// TODO(dnordstrom): Consider removing once there is a well-ordered select function or macro in
// futures-rs.
pub async fn cancel_or<Fut, T>(cancel: &TaskGroupCancel, mut fut: Fut) -> Option<T>
where
Fut: Future<Output = T> + FusedFuture + std::marker::Unpin,
{
let mut cancel = cancel.clone();
if cancel.peek().is_some() {
return None;
}
select! {
_ = cancel => None,
value = fut => Some(value),
}
}
#[cfg(test)]
mod test {
use super::*;
// A TaskGroup without tasks should be cancellable.
#[fuchsia_async::run_until_stalled(test)]
async fn empty_test() {
let tg = TaskGroup::new();
assert!(tg.cancel().await.is_ok());
assert!(tg.cancel().await.is_err());
}
// Tasks without await-points
#[fuchsia_async::run_until_stalled(test)]
async fn trivial_test() {
let tg = TaskGroup::new();
let fut = future::ready(());
assert!(tg.spawn(move |_cancel| fut).await.is_ok());
assert!(tg.spawn(|_cancel| async move {}).await.is_ok());
assert!(tg.cancel().await.is_ok());
// Can't cancel or spawn now
assert!(tg.spawn(|_cancel| async move {}).await.is_err());
assert!(tg.cancel().await.is_err());
}
#[fuchsia_async::run_until_stalled(test)]
async fn complete_before_cancel_test() {
// Checks that a task completes by itself without being cancelled
let (sender, receiver) = oneshot::channel();
let a_task = |_cancel| {
async {
sender.send(10).expect("sending failed");
}
};
let tg = TaskGroup::new();
tg.spawn(a_task).await.expect("spawning failed");
assert_eq!(receiver.await, Ok(10));
assert!(tg.cancel().await.is_ok());
}
#[fuchsia_async::run_until_stalled(test)]
async fn cancel_test() {
// Checks that a task completes by itself without being cancelled
let (sender, receiver) = oneshot::channel();
let a_task = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender.send(10).expect("sending failed");
}
};
let tg = TaskGroup::new();
tg.spawn(a_task).await.expect("spawning failed");
tg.cancel().await.expect("cancelling failed");
assert_eq!(receiver.await, Ok(10));
}
#[fuchsia_async::run_until_stalled(test)]
async fn clone_test() {
// Add a task to a task group, clone it, add another task through the cloned task group and
// finally cancel through the cloned task group.
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, receiver_2) = oneshot::channel();
let task_1 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_1.send(1).expect("sending failed");
}
};
let task_2 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_2.send(2).expect("sending failed");
}
};
let tg = TaskGroup::new();
assert!(tg.spawn(task_1).await.is_ok());
let tg_clone = tg.clone();
assert!(tg_clone.spawn(task_2).await.is_ok());
assert!(tg_clone.cancel().await.is_ok());
assert_eq!(receiver_1.await, Ok(1));
assert_eq!(receiver_2.await, Ok(2));
assert!(tg_clone.cancel().await.is_err());
assert!(tg.cancel().await.is_err());
assert!(tg.spawn(|_| future::ready(())).await.is_err());
}
#[fuchsia_async::run_until_stalled(test)]
async fn parent_cancels_children_test() {
// Create a child task group and cancel the parent.
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, receiver_2) = oneshot::channel();
let task_1 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_1.send(1).expect("sending failed");
}
};
let task_2 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_2.send(2).expect("sending failed");
}
};
let tg_parent = TaskGroup::new();
assert!(tg_parent.spawn(task_1).await.is_ok());
let tg_child = tg_parent.create_child().await.expect("failed creating child task group");
assert!(tg_child.spawn(task_2).await.is_ok());
assert!(tg_parent.cancel().await.is_ok());
assert_eq!(receiver_1.await, Ok(1));
assert_eq!(receiver_2.await, Ok(2));
assert!(tg_child.cancel().await.is_err());
assert!(tg_parent.spawn(|_| future::ready(())).await.is_err());
assert!(tg_child.spawn(|_| future::ready(())).await.is_err());
}
#[fuchsia_async::run_until_stalled(test)]
async fn child_does_not_cancel_parent_test() {
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, receiver_2) = oneshot::channel();
let task_1 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_1.send(1).expect("sending failed");
}
};
let task_2 = |cancel: TaskGroupCancel| {
async {
cancel.await.expect("cancel signal not delivered properly");
sender_2.send(2).expect("sending failed");
}
};
let tg_parent = TaskGroup::new();
assert!(tg_parent.spawn(task_1).await.is_ok());
let tg_child = tg_parent.create_child().await.expect("failed creating child task group");
assert!(tg_child.spawn(task_2).await.is_ok());
assert!(tg_child.cancel().await.is_ok());
assert_eq!(receiver_2.await, Ok(2));
assert!(tg_child.cancel().await.is_err());
// Verify we can create another child task group.
let tg_child_2 =
tg_parent.create_child().await.expect("failed creating child task group");
assert!(tg_child.spawn(|_| future::ready(())).await.is_err());
assert!(tg_child.create_child().await.is_err());
assert!(tg_parent.cancel().await.is_ok());
assert!(tg_child_2.cancel().await.is_err());
assert!(tg_parent.create_child().await.is_err());
assert_eq!(receiver_1.await, Ok(1));
assert!(tg_parent.spawn(|_| future::ready(())).await.is_err());
}
#[test]
fn stalled_test() {
// Checks that if a task doesn't complete, cancel stalls forever
let mut executor = fasync::Executor::new().expect("Failed to create executor");
let complete = |_cancel| future::ready(());
let never_complete = |_cancel| future::pending();
let tg = TaskGroup::new();
let tg_clone = tg.clone();
let spawn_fut = &mut async move {
tg_clone.spawn(complete).await.expect("spawning failed");
tg_clone.spawn(never_complete).await.expect("spawning failed");
}
.boxed();
let cancel_fut = &mut tg.cancel().boxed();
assert!(executor.run_until_stalled(spawn_fut).is_ready());
assert!(executor.run_until_stalled(cancel_fut).is_pending());
}
#[fuchsia_async::run_until_stalled(test)]
async fn cancel_or_test() {
// Check that cancel_or function resolves the correct future for common cases
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, receiver_2) = oneshot::channel();
let a_task = |cancel: TaskGroupCancel| {
async move {
// if fut is ready and cancel is pending, fut wins
assert_eq!(cancel_or(&cancel, future::ready(9000)).await, Some(9000));
// this send triggers the cancellation
sender_1.send(10).expect("sending failed");
// if fut is pending, and cancel is (or soon becomes) ready, cancel wins
assert!(cancel_or(&cancel, future::pending::<i64>()).await.is_none());
// if both fut and cancel is ready, cancel wins
assert!(cancel_or(&cancel, future::ready(9001)).await.is_none());
sender_2.send(20).expect("sending failed");
}
};
let tg = TaskGroup::new();
tg.spawn(a_task).await.expect("spawning failed");
assert_eq!(receiver_1.await, Ok(10));
tg.cancel().await.expect("cancelling failed");
assert_eq!(receiver_2.await, Ok(20));
}
}