| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| pub mod task { |
| use core::task::{Context, Poll}; |
| use std::future::Future; |
| use std::pin::Pin; |
| |
| /// A handle to a task. |
| /// |
| /// A task can be polled for the output of the future it is executing. A |
| /// dropped task will be cancelled after dropping. To immediately cancel a |
| /// task, call the cancel() method. To run a task to completion without |
| /// retaining the Task handle, call the detach() method. |
| #[derive(Debug)] |
| pub struct Task<T>(async_executor::Task<T>); |
| |
| impl<T: 'static> Task<T> { |
| /// spawn a new `Send` task onto the executor. |
| pub fn spawn(fut: impl Future<Output = T> + Send + 'static) -> Self |
| where |
| T: Send, |
| { |
| Self(super::executor::spawn(fut)) |
| } |
| |
| /// spawn a new non-`Send` task onto the single threaded executor. |
| pub fn local<'a>(fut: impl Future<Output = T> + 'static) -> Self { |
| Self(super::executor::local(fut)) |
| } |
| |
| /// detach the Task handle. The contained future will be polled until completion. |
| pub fn detach(self) { |
| self.0.detach() |
| } |
| |
| /// cancel a task and wait for cancellation to complete. |
| pub async fn cancel(self) -> Option<T> { |
| self.0.cancel().await |
| } |
| } |
| |
| impl<T> Future for Task<T> { |
| type Output = T; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| use futures_lite::FutureExt; |
| self.0.poll(cx) |
| } |
| } |
| |
| /// Offload a blocking function call onto a different thread. |
| /// |
| /// This function can be called from an asynchronous function without blocking |
| /// it, returning a future that can be `.await`ed normally. The provided |
| /// function should contain at least one blocking operation, such as: |
| /// |
| /// - A synchronous syscall that does not yet have an async counterpart. |
| /// - A compute operation which risks blocking the executor for an unacceptable |
| /// amount of time. |
| /// |
| /// If neither of these conditions are satisfied, just call the function normally, |
| /// as synchronous functions themselves are allowed within an async context, |
| /// as long as they are not blocking. |
| /// |
| /// If you have an async function that may block, refactor the function such that |
| /// the blocking operations are offloaded onto the function passed to [`unblock`]. |
| /// |
| /// NOTE: Synchronous functions cannot be cancelled and may keep running after |
| /// the returned future is dropped. As a result, resources held by the function |
| /// should be assumed to be held until the returned future completes. |
| /// |
| /// For details on performance characteristics and edge cases, see [`blocking::unblock`]. |
| pub fn unblock<T: 'static + Send>( |
| f: impl 'static + Send + FnOnce() -> T, |
| ) -> impl 'static + Send + Future<Output = T> { |
| blocking::unblock(f) |
| } |
| } |
| |
| pub mod executor { |
| use crate::runtime::WakeupTime; |
| use easy_parallel::Parallel; |
| use fuchsia_zircon_status as zx_status; |
| use std::future::Future; |
| |
| pub use std::time::Duration; |
| /// A time relative to the executor's clock. |
| pub use std::time::Instant as Time; |
| |
| impl WakeupTime for Time { |
| fn into_time(self) -> Time { |
| self |
| } |
| } |
| |
| pub(crate) fn spawn<T: 'static>( |
| fut: impl Future<Output = T> + Send + 'static, |
| ) -> async_executor::Task<T> |
| where |
| T: Send, |
| { |
| GLOBAL.spawn(fut) |
| } |
| |
| pub(crate) fn local<T>(fut: impl Future<Output = T> + 'static) -> async_executor::Task<T> |
| where |
| T: 'static, |
| { |
| LOCAL.with(|local| local.spawn(fut)) |
| } |
| |
| thread_local! { |
| static LOCAL: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new(); |
| } |
| |
| static GLOBAL: async_executor::Executor<'_> = async_executor::Executor::new(); |
| |
| /// A multi-threaded executor. |
| /// |
| /// API-compatible with the Fuchsia variant. |
| /// |
| /// The current implementation of Executor does not isolate work |
| /// (as the underlying executor is not yet capable of this). |
| pub struct SendExecutor { |
| num_threads: usize, |
| } |
| |
| impl SendExecutor { |
| /// Create a new executor running with actual time. |
| pub fn new(num_threads: usize) -> Result<Self, zx_status::Status> { |
| Ok(Self { num_threads }) |
| } |
| |
| /// Run a single future to completion using multiple threads. |
| pub fn run<F>(&mut self, main_future: F) -> F::Output |
| where |
| F: Future + Send + 'static, |
| F::Output: Send + 'static, |
| { |
| let (signal, shutdown) = async_channel::unbounded::<()>(); |
| |
| let (_, res) = Parallel::new() |
| .each(0..self.num_threads, |_| { |
| LOCAL.with(|local| { |
| let _ = async_io::block_on(local.run(GLOBAL.run(shutdown.recv()))); |
| }) |
| }) |
| .finish(|| { |
| LOCAL.with(|local| { |
| async_io::block_on(local.run(GLOBAL.run(async { |
| let res = main_future.await; |
| drop(signal); |
| res |
| }))) |
| }) |
| }); |
| res |
| } |
| } |
| |
| /// A single-threaded executor. |
| /// |
| /// API-compatible with the Fuchsia variant with the exception of testing APIs. |
| /// |
| /// The current implementation of Executor does not isolate work |
| /// (as the underlying executor is not yet capable of this). |
| pub struct LocalExecutor {} |
| |
| impl LocalExecutor { |
| /// Create a new executor. |
| pub fn new() -> Result<Self, zx_status::Status> { |
| Ok(Self {}) |
| } |
| |
| /// Run a single future to completion on a single thread. |
| pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output |
| where |
| F: Future, |
| { |
| LOCAL.with(|local| async_io::block_on(GLOBAL.run(local.run(main_future)))) |
| } |
| } |
| |
| /// A single-threaded executor for testing. |
| /// |
| /// The current implementation of Executor does not isolate work |
| /// (as the underlying executor is not yet capable of this). |
| pub struct TestExecutor {} |
| |
| impl TestExecutor { |
| /// Create a new executor for testing. |
| pub fn new() -> Result<Self, zx_status::Status> { |
| Ok(Self {}) |
| } |
| |
| /// Run a single future to completion on a single thread. |
| pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output |
| where |
| F: Future, |
| { |
| LocalExecutor {}.run_singlethreaded(main_future) |
| } |
| } |
| } |
| |
| pub mod timer { |
| use crate::runtime::WakeupTime; |
| use futures::prelude::*; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| /// An asynchronous timer. |
| #[derive(Debug)] |
| #[must_use = "futures do nothing unless polled"] |
| pub struct Timer(async_io::Timer); |
| |
| impl Timer { |
| /// Create a new timer scheduled to fire at `time`. |
| pub fn new<WT>(time: WT) -> Self |
| where |
| WT: WakeupTime, |
| { |
| Timer(async_io::Timer::at(time.into_time())) |
| } |
| } |
| |
| impl Future for Timer { |
| type Output = (); |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| self.0.poll_unpin(cx).map(drop) |
| } |
| } |
| } |