WIP [fuchsia-async] use the tokio runtime
Change-Id: Ic049862abf17be06373b8a1a36fb9d1acd29c1c8
diff --git a/src/lib/fuchsia-async/BUILD.gn b/src/lib/fuchsia-async/BUILD.gn
index 07af96f..6baaa3a 100644
--- a/src/lib/fuchsia-async/BUILD.gn
+++ b/src/lib/fuchsia-async/BUILD.gn
@@ -74,12 +74,11 @@
if (current_toolchain != unknown_wasm32_toolchain) {
deps += [
"//third_party/rust_crates:async-channel",
- "//third_party/rust_crates:async-executor",
"//third_party/rust_crates:async-io",
"//third_party/rust_crates:async-net",
"//third_party/rust_crates:blocking",
- "//third_party/rust_crates:easy-parallel",
"//third_party/rust_crates:futures-lite",
+ "//third_party/rust_crates:tokio",
]
sources += [ "src/net/portable/udp.rs" ]
}
diff --git a/src/lib/fuchsia-async/src/runtime/portable.rs b/src/lib/fuchsia-async/src/runtime/portable.rs
index f6f89de..19899b0 100644
--- a/src/lib/fuchsia-async/src/runtime/portable.rs
+++ b/src/lib/fuchsia-async/src/runtime/portable.rs
@@ -14,7 +14,7 @@
/// task, call the cancel() method. To run a task to completion without
/// retaining the Task handle, call the detach() method.
#[derive(Debug)]
- pub struct Task<T>(async_executor::Task<T>);
+ pub struct Task<T>(pub(crate) Option<tokio::task::JoinHandle<T>>);
impl<T: 'static> Task<T> {
/// spawn a new `Send` task onto the executor.
@@ -22,22 +22,28 @@
where
T: Send,
{
- Self(super::executor::spawn(fut))
+ Self(Some(super::executor::spawn(fut)))
}
/// spawn a new non-`Send` task onto the single threaded executor.
pub fn local<'a>(fut: impl Future<Output = T> + 'static) -> Self {
- Self(super::executor::local(fut))
+ Self(Some(super::executor::local(fut)))
}
/// detach the Task handle. The contained future will be polled until completion.
- pub fn detach(self) {
- self.0.detach()
+ pub fn detach(mut self) {
+ self.0.take();
}
/// cancel a task and wait for cancellation to complete.
pub async fn cancel(self) -> Option<T> {
- self.0.cancel().await
+ match self.0 {
+ None => None,
+ Some(join_handle) => {
+ join_handle.abort();
+ join_handle.await.ok()
+ }
+ }
}
}
@@ -45,8 +51,12 @@
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // TODO: spawning a task onto a task may leak, never resolving
use futures_lite::FutureExt;
- self.0.poll(cx)
+ self.0.as_mut().map_or(Poll::Pending, |jh| match jh.poll(cx) {
+ Poll::Ready(Ok(r)) => Poll::Ready(r),
+ _ => Poll::Pending,
+ })
}
}
@@ -72,16 +82,16 @@
/// should be assumed to be held until the returned future completes.
///
/// For details on performance characteristics and edge cases, see [`blocking::unblock`].
+ // TODO: redo docs
pub fn unblock<T: 'static + Send>(
f: impl 'static + Send + FnOnce() -> T,
) -> impl 'static + Send + Future<Output = T> {
- blocking::unblock(f)
+ crate::Task(Some(tokio::task::spawn_blocking(f)))
}
}
pub mod executor {
use crate::runtime::WakeupTime;
- use easy_parallel::Parallel;
use fuchsia_zircon_status as zx_status;
use std::future::Future;
@@ -97,26 +107,24 @@
pub(crate) fn spawn<T: 'static>(
fut: impl Future<Output = T> + Send + 'static,
- ) -> async_executor::Task<T>
+ ) -> tokio::task::JoinHandle<T>
where
T: Send,
{
- GLOBAL.spawn(fut)
+ tokio::task::spawn(fut)
}
- pub(crate) fn local<T>(fut: impl Future<Output = T> + 'static) -> async_executor::Task<T>
+ pub(crate) fn local<T>(fut: impl Future<Output = T> + 'static) -> tokio::task::JoinHandle<T>
where
T: 'static,
{
- LOCAL.with(|local| local.spawn(fut))
+ LOCAL.with(|local| local.spawn_local(fut))
}
thread_local! {
- static LOCAL: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
+ static LOCAL: tokio::task::LocalSet = tokio::task::LocalSet::new();
}
- static GLOBAL: async_executor::Executor<'_> = async_executor::Executor::new();
-
/// A multi-threaded executor.
///
/// API-compatible with the Fuchsia variant.
@@ -124,13 +132,20 @@
/// The current implementation of Executor does not isolate work
/// (as the underlying executor is not yet capable of this).
pub struct SendExecutor {
- num_threads: usize,
+ runtime: tokio::runtime::Runtime,
}
impl SendExecutor {
/// Create a new executor running with actual time.
pub fn new(num_threads: usize) -> Result<Self, zx_status::Status> {
- Ok(Self { num_threads })
+ Ok(Self {
+ runtime: tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(num_threads)
+ .enable_all()
+ .build()
+ // TODO: how to better report errors given the API constraints?
+ .map_err(|_e| zx_status::Status::IO)?,
+ })
}
/// Run a single future to completion using multiple threads.
@@ -139,24 +154,7 @@
F: Future + Send + 'static,
F::Output: Send + 'static,
{
- let (signal, shutdown) = async_channel::unbounded::<()>();
-
- let (_, res) = Parallel::new()
- .each(0..self.num_threads, |_| {
- LOCAL.with(|local| {
- let _ = async_io::block_on(local.run(GLOBAL.run(shutdown.recv())));
- })
- })
- .finish(|| {
- LOCAL.with(|local| {
- async_io::block_on(local.run(GLOBAL.run(async {
- let res = main_future.await;
- drop(signal);
- res
- })))
- })
- });
- res
+ LOCAL.with(|local| local.block_on(&self.runtime, main_future))
}
}
@@ -179,7 +177,12 @@
where
F: Future,
{
- LOCAL.with(|local| async_io::block_on(GLOBAL.run(local.run(main_future))))
+ LOCAL.with(|local| {
+ local.block_on(
+ &tokio::runtime::Builder::new_current_thread().build().unwrap(),
+ main_future,
+ )
+ })
}
}
diff --git a/third_party/rust_crates/BUILD.gn b/third_party/rust_crates/BUILD.gn
index 0c239b3..b9c22c4 100644
--- a/third_party/rust_crates/BUILD.gn
+++ b/third_party/rust_crates/BUILD.gn
@@ -9734,6 +9734,7 @@
deps = []
deps += [ ":bytes-v1_0_1" ]
deps += [ ":memchr-v2_4_0" ]
+ deps += [ ":num_cpus-v1_13_0" ]
deps += [ ":pin-project-lite-v0_2_4" ]
rustenv = []
@@ -9747,6 +9748,9 @@
"--cfg=feature=\"default\"",
"--cfg=feature=\"io-util\"",
"--cfg=feature=\"memchr\"",
+ "--cfg=feature=\"num_cpus\"",
+ "--cfg=feature=\"rt\"",
+ "--cfg=feature=\"rt-multi-thread\"",
"--cfg=feature=\"sync\"",
"--cfg=tokio_track_caller",
]
diff --git a/third_party/rust_crates/Cargo.lock b/third_party/rust_crates/Cargo.lock
index 9688029..35e4a78 100644
--- a/third_party/rust_crates/Cargo.lock
+++ b/third_party/rust_crates/Cargo.lock
@@ -3675,6 +3675,7 @@
"autocfg 1.0.0",
"bytes",
"memchr",
+ "num_cpus",
"pin-project-lite",
]
diff --git a/third_party/rust_crates/Cargo.toml b/third_party/rust_crates/Cargo.toml
index 4758626..4dec8c8 100644
--- a/third_party/rust_crates/Cargo.toml
+++ b/third_party/rust_crates/Cargo.toml
@@ -145,7 +145,7 @@
textwrap = "0.11.0"
thiserror = "1.0.23"
timebomb = "0.1.2"
-tokio = { version = "1.10.0", default-features = false }
+tokio = { version = "1.10.0", default-features = false, features = ["rt", "rt-multi-thread"] }
toml = "0.5"
toml_edit = "0.2.1"
tracing = { version = "0.1.25", features = ["log"] }