WIP [fuchsia-async] use the tokio runtime

Change-Id: Ic049862abf17be06373b8a1a36fb9d1acd29c1c8
diff --git a/src/lib/fuchsia-async/BUILD.gn b/src/lib/fuchsia-async/BUILD.gn
index 07af96f..6baaa3a 100644
--- a/src/lib/fuchsia-async/BUILD.gn
+++ b/src/lib/fuchsia-async/BUILD.gn
@@ -74,12 +74,11 @@
     if (current_toolchain != unknown_wasm32_toolchain) {
       deps += [
         "//third_party/rust_crates:async-channel",
-        "//third_party/rust_crates:async-executor",
         "//third_party/rust_crates:async-io",
         "//third_party/rust_crates:async-net",
         "//third_party/rust_crates:blocking",
-        "//third_party/rust_crates:easy-parallel",
         "//third_party/rust_crates:futures-lite",
+        "//third_party/rust_crates:tokio",
       ]
       sources += [ "src/net/portable/udp.rs" ]
     }
diff --git a/src/lib/fuchsia-async/src/runtime/portable.rs b/src/lib/fuchsia-async/src/runtime/portable.rs
index f6f89de..19899b0 100644
--- a/src/lib/fuchsia-async/src/runtime/portable.rs
+++ b/src/lib/fuchsia-async/src/runtime/portable.rs
@@ -14,7 +14,7 @@
     /// task, call the cancel() method. To run a task to completion without
     /// retaining the Task handle, call the detach() method.
     #[derive(Debug)]
-    pub struct Task<T>(async_executor::Task<T>);
+    pub struct Task<T>(pub(crate) Option<tokio::task::JoinHandle<T>>);
 
     impl<T: 'static> Task<T> {
         /// spawn a new `Send` task onto the executor.
@@ -22,22 +22,28 @@
         where
             T: Send,
         {
-            Self(super::executor::spawn(fut))
+            Self(Some(super::executor::spawn(fut)))
         }
 
         /// spawn a new non-`Send` task onto the single threaded executor.
         pub fn local<'a>(fut: impl Future<Output = T> + 'static) -> Self {
-            Self(super::executor::local(fut))
+            Self(Some(super::executor::local(fut)))
         }
 
         /// detach the Task handle. The contained future will be polled until completion.
-        pub fn detach(self) {
-            self.0.detach()
+        pub fn detach(mut self) {
+            self.0.take();
         }
 
         /// cancel a task and wait for cancellation to complete.
         pub async fn cancel(self) -> Option<T> {
-            self.0.cancel().await
+            match self.0 {
+                None => None,
+                Some(join_handle) => {
+                    join_handle.abort();
+                    join_handle.await.ok()
+                }
+            }
         }
     }
 
@@ -45,8 +51,12 @@
         type Output = T;
 
         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+            // TODO: spawning a task onto a task may leak, never resolving
             use futures_lite::FutureExt;
-            self.0.poll(cx)
+            self.0.as_mut().map_or(Poll::Pending, |jh| match jh.poll(cx) {
+                Poll::Ready(Ok(r)) => Poll::Ready(r),
+                _ => Poll::Pending,
+            })
         }
     }
 
@@ -72,16 +82,16 @@
     /// should be assumed to be held until the returned future completes.
     ///
     /// For details on performance characteristics and edge cases, see [`blocking::unblock`].
+    // TODO: redo docs
     pub fn unblock<T: 'static + Send>(
         f: impl 'static + Send + FnOnce() -> T,
     ) -> impl 'static + Send + Future<Output = T> {
-        blocking::unblock(f)
+        crate::Task(Some(tokio::task::spawn_blocking(f)))
     }
 }
 
 pub mod executor {
     use crate::runtime::WakeupTime;
-    use easy_parallel::Parallel;
     use fuchsia_zircon_status as zx_status;
     use std::future::Future;
 
@@ -97,26 +107,24 @@
 
     pub(crate) fn spawn<T: 'static>(
         fut: impl Future<Output = T> + Send + 'static,
-    ) -> async_executor::Task<T>
+    ) -> tokio::task::JoinHandle<T>
     where
         T: Send,
     {
-        GLOBAL.spawn(fut)
+        tokio::task::spawn(fut)
     }
 
-    pub(crate) fn local<T>(fut: impl Future<Output = T> + 'static) -> async_executor::Task<T>
+    pub(crate) fn local<T>(fut: impl Future<Output = T> + 'static) -> tokio::task::JoinHandle<T>
     where
         T: 'static,
     {
-        LOCAL.with(|local| local.spawn(fut))
+        LOCAL.with(|local| local.spawn_local(fut))
     }
 
     thread_local! {
-        static LOCAL: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
+        static LOCAL: tokio::task::LocalSet = tokio::task::LocalSet::new();
     }
 
-    static GLOBAL: async_executor::Executor<'_> = async_executor::Executor::new();
-
     /// A multi-threaded executor.
     ///
     /// API-compatible with the Fuchsia variant.
@@ -124,13 +132,20 @@
     /// The current implementation of Executor does not isolate work
     /// (as the underlying executor is not yet capable of this).
     pub struct SendExecutor {
-        num_threads: usize,
+        runtime: tokio::runtime::Runtime,
     }
 
     impl SendExecutor {
         /// Create a new executor running with actual time.
         pub fn new(num_threads: usize) -> Result<Self, zx_status::Status> {
-            Ok(Self { num_threads })
+            Ok(Self {
+                runtime: tokio::runtime::Builder::new_multi_thread()
+                    .worker_threads(num_threads)
+                    .enable_all()
+                    .build()
+                    // TODO: how to better report errors given the API constraints?
+                    .map_err(|_e| zx_status::Status::IO)?,
+            })
         }
 
         /// Run a single future to completion using multiple threads.
@@ -139,24 +154,7 @@
             F: Future + Send + 'static,
             F::Output: Send + 'static,
         {
-            let (signal, shutdown) = async_channel::unbounded::<()>();
-
-            let (_, res) = Parallel::new()
-                .each(0..self.num_threads, |_| {
-                    LOCAL.with(|local| {
-                        let _ = async_io::block_on(local.run(GLOBAL.run(shutdown.recv())));
-                    })
-                })
-                .finish(|| {
-                    LOCAL.with(|local| {
-                        async_io::block_on(local.run(GLOBAL.run(async {
-                            let res = main_future.await;
-                            drop(signal);
-                            res
-                        })))
-                    })
-                });
-            res
+            LOCAL.with(|local| local.block_on(&self.runtime, main_future))
         }
     }
 
@@ -179,7 +177,12 @@
         where
             F: Future,
         {
-            LOCAL.with(|local| async_io::block_on(GLOBAL.run(local.run(main_future))))
+            LOCAL.with(|local| {
+                local.block_on(
+                    &tokio::runtime::Builder::new_current_thread().build().unwrap(),
+                    main_future,
+                )
+            })
         }
     }
 
diff --git a/third_party/rust_crates/BUILD.gn b/third_party/rust_crates/BUILD.gn
index 0c239b3..b9c22c4 100644
--- a/third_party/rust_crates/BUILD.gn
+++ b/third_party/rust_crates/BUILD.gn
@@ -9734,6 +9734,7 @@
   deps = []
   deps += [ ":bytes-v1_0_1" ]
   deps += [ ":memchr-v2_4_0" ]
+  deps += [ ":num_cpus-v1_13_0" ]
   deps += [ ":pin-project-lite-v0_2_4" ]
 
   rustenv = []
@@ -9747,6 +9748,9 @@
     "--cfg=feature=\"default\"",
     "--cfg=feature=\"io-util\"",
     "--cfg=feature=\"memchr\"",
+    "--cfg=feature=\"num_cpus\"",
+    "--cfg=feature=\"rt\"",
+    "--cfg=feature=\"rt-multi-thread\"",
     "--cfg=feature=\"sync\"",
     "--cfg=tokio_track_caller",
   ]
diff --git a/third_party/rust_crates/Cargo.lock b/third_party/rust_crates/Cargo.lock
index 9688029..35e4a78 100644
--- a/third_party/rust_crates/Cargo.lock
+++ b/third_party/rust_crates/Cargo.lock
@@ -3675,6 +3675,7 @@
  "autocfg 1.0.0",
  "bytes",
  "memchr",
+ "num_cpus",
  "pin-project-lite",
 ]
 
diff --git a/third_party/rust_crates/Cargo.toml b/third_party/rust_crates/Cargo.toml
index 4758626..4dec8c8 100644
--- a/third_party/rust_crates/Cargo.toml
+++ b/third_party/rust_crates/Cargo.toml
@@ -145,7 +145,7 @@
 textwrap = "0.11.0"
 thiserror = "1.0.23"
 timebomb = "0.1.2"
-tokio = { version = "1.10.0", default-features = false }
+tokio = { version = "1.10.0", default-features = false, features = ["rt", "rt-multi-thread"] }
 toml = "0.5"
 toml_edit = "0.2.1"
 tracing = { version = "0.1.25", features = ["log"] }