| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::EHandle; |
| use futures::future::RemoteHandle; |
| use futures::prelude::*; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| /// A handle to a future that is owned and polled by the executor. |
| /// |
| /// Once a task is created, the executor will poll it until done, |
| /// even if the task handle itself is not polled. |
| /// |
| /// When a task is dropped its future will no longer be polled by the |
| /// executor. See [`Task::cancel`] for cancellation semantics. |
| /// |
| /// Polling (or attempting to extract the value from) a task after the |
| /// executor is dropped may trigger a panic. |
| #[must_use] |
| #[derive(Debug)] |
| pub struct Task<T> { |
| remote_handle: RemoteHandle<T>, |
| } |
| |
| impl Task<()> { |
| /// Detach this task so that it can run independently in the background. |
| /// |
| /// *Note*: this is usually not what you want. This API severs the control flow from the |
| /// caller, making it impossible to return values (including errors). If your goal is to run |
| /// multiple futures concurrently, consider using [`TaskGroup`] or other futures combinators |
| /// such as: |
| /// |
| /// * [`futures::future::join`] |
| /// * [`futures::future::select`] |
| /// * [`futures::select`] |
| /// |
| /// or their error-aware variants |
| /// |
| /// * [`futures::future::try_join`] |
| /// * [`futures::future::try_select`] |
| /// |
| /// or their stream counterparts |
| /// |
| /// * [`futures::stream::StreamExt::for_each`] |
| /// * [`futures::stream::StreamExt::for_each_concurrent`] |
| /// * [`futures::stream::TryStreamExt::try_for_each`] |
| /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`] |
| /// |
| /// can meet your needs. |
| pub fn detach(self) { |
| self.remote_handle.forget(); |
| } |
| } |
| |
| impl<T: Send> Task<T> { |
| /// Spawn a new task on the current executor. |
| /// |
| /// The task may be executed on any thread(s) owned by the current executor. |
| /// See [`Task::local`] for an equivalent that ensures locality. |
| /// |
| /// The passed future will live until either (a) the future completes, |
| /// (b) the returned [`Task`] is dropped while the executor is running, or |
| /// (c) the executor is destroyed; whichever comes first. |
| /// |
| /// # Panics |
| /// |
| /// `spawn` may panic if not called in the context of an executor (e.g. |
| /// within a call to `run` or `run_singlethreaded`). |
| #[cfg_attr(trace_level_logging, track_caller)] |
| pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> { |
| // Fuse is a combinator that will drop the underlying future as soon as it has been |
| // completed to ensure resources are reclaimed as soon as possible. That gives callers that |
| // await on the Task the guarantee that the future has been dropped. |
| // |
| // Note that it is safe to pass in a future that has already been fused. Double fusing |
| // a future does not change the expected behavior. |
| let future = future.fuse(); |
| let (future, remote_handle) = future.remote_handle(); |
| EHandle::local().spawn_detached(future); |
| Task { remote_handle } |
| } |
| } |
| |
| impl<T> Task<T> { |
| /// Spawn a new task on the thread local executor. |
| /// |
| /// The passed future will live until either (a) the future completes, |
| /// (b) the returned [`Task`] is dropped while the executor is running, or |
| /// (c) the executor is destroyed; whichever comes first. |
| /// |
| /// NOTE: This is not supported with a [`SendExecutor`] and will cause a |
| /// runtime panic. Use [`Task::spawn`] instead. |
| /// |
| /// # Panics |
| /// |
| /// `local` may panic if not called in the context of a local executor (e.g. |
| /// within a call to `run` or `run_singlethreaded`). |
| #[cfg_attr(trace_level_logging, track_caller)] |
| pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> { |
| // Fuse is a combinator that will drop the underlying future as soon as it has been |
| // completed to ensure resources are reclaimed as soon as possible. That gives callers that |
| // await on the Task the guarantee that the future has been dropped. |
| // |
| // Note that it is safe to pass in a future that has already been fused. Double fusing |
| // a future does not change the expected behavior. |
| let future = future.fuse(); |
| let (future, remote_handle) = future.remote_handle(); |
| EHandle::local().spawn_local_detached(future); |
| Task { remote_handle } |
| } |
| } |
| |
| impl<T: 'static> Task<T> { |
| /// Initiate cancellation of this task. |
| /// |
| /// Returns the tasks output if it was available prior to cancelation. |
| /// |
| /// NOTE: If `None` is returned, the underlying future may continue executing for a |
| /// short period before getting dropped. If so, do not assume any resources held |
| /// by the task's future are released. If `Some(..)` is returned, such resources |
| /// are guaranteed to be released. |
| pub async fn cancel(self) -> Option<T> { |
| self.remote_handle.now_or_never() |
| } |
| } |
| |
| impl<T: 'static> Future for Task<T> { |
| type Output = T; |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| self.remote_handle.poll_unpin(cx) |
| } |
| } |
| |
| /// Offload a blocking function call onto a different thread. |
| /// |
| /// This function can be called from an asynchronous function without blocking |
| /// it, returning a future that can be `.await`ed normally. The provided |
| /// function should contain at least one blocking operation, such as: |
| /// |
| /// - A synchronous syscall that does not yet have an async counterpart. |
| /// - A compute operation which risks blocking the executor for an unacceptable |
| /// amount of time. |
| /// |
| /// If neither of these conditions are satisfied, just call the function normally, |
| /// as synchronous functions themselves are allowed within an async context, |
| /// as long as they are not blocking. |
| /// |
| /// If you have an async function that may block, refactor the function such that |
| /// the blocking operations are offloaded onto the function passed to [`unblock`]. |
| /// |
| /// NOTE: |
| /// |
| /// - The input function should not interact with the executor. Attempting to do so |
| /// can cause runtime errors. This includes spawning, creating new executors, |
| /// passing futures between the input function and the calling context, and |
| /// in some cases constructing async-aware types (such as IO-, IPC- and timer objects). |
| /// - Synchronous functions cannot be cancelled and may keep running after |
| /// the returned future is dropped. As a result, resources held by the function |
| /// should be assumed to be held until the returned future completes. |
| /// - This function assumes panic=abort semantics, so if the input function panics, |
| /// the process aborts. Behavior for panic=unwind is not defined. |
| // TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of |
| // spawning new threads if this proves to be a bottleneck. |
| pub fn unblock<T: 'static + Send>( |
| f: impl 'static + Send + FnOnce() -> T, |
| ) -> impl 'static + Send + Future<Output = T> { |
| let (tx, rx) = futures::channel::oneshot::channel(); |
| std::thread::spawn(move || { |
| let _ = tx.send(f()); |
| }); |
| rx.map(|r| r.unwrap()) |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::super::executor::{LocalExecutor, SendExecutor}; |
| use super::*; |
| use std::sync::{Arc, Mutex}; |
| |
| /// This struct holds a thread-safe mutable boolean and |
| /// sets its value to true when dropped. |
| #[derive(Clone)] |
| struct SetsBoolTrueOnDrop { |
| value: Arc<Mutex<bool>>, |
| } |
| |
| impl SetsBoolTrueOnDrop { |
| fn new() -> (Self, Arc<Mutex<bool>>) { |
| let value = Arc::new(Mutex::new(false)); |
| let sets_bool_true_on_drop = Self { value: value.clone() }; |
| (sets_bool_true_on_drop, value) |
| } |
| } |
| |
| impl Drop for SetsBoolTrueOnDrop { |
| fn drop(&mut self) { |
| let mut lock = self.value.lock().unwrap(); |
| *lock = true; |
| } |
| } |
| |
| #[test] |
| #[should_panic] |
| fn spawn_from_unblock_fails() { |
| // no executor in the off-thread, so spawning fails |
| SendExecutor::new(2).run(async move { |
| unblock(|| { |
| let _ = Task::spawn(async {}); |
| }) |
| .await; |
| }); |
| } |
| |
| #[test] |
| fn future_destroyed_before_await_returns() { |
| LocalExecutor::new().run_singlethreaded(async { |
| let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new(); |
| |
| // Move the switch into a different thread. |
| // Once we return from this await, that switch should have been dropped. |
| unblock(move || { |
| let lock = sets_bool_true_on_drop.value.lock().unwrap(); |
| assert_eq!(*lock, false); |
| }) |
| .await; |
| |
| // Switch moved into the future should have been dropped at this point. |
| // The value of the boolean should now be true. |
| let lock = value.lock().unwrap(); |
| assert_eq!(*lock, true); |
| }); |
| } |
| } |