[ffx] reduce the scope of threaded sub-tasks

The onet subtasks here were taking on the responsibility of all of
the overnet client IO, reducing multiplexing effectiveness of the
root reactor, causing a lot more cross-thread wakeups than necessary
in normal operation. Move the overnet tasks back onto the main
reactor thread and only perform the wait2() on the "blocking"
reactor.

Change-Id: I5060fb7cdad0e45868629465bda9310fe27af574
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/475740
Fuchsia-Auto-Submit: James Tucker <raggi@google.com>
Reviewed-by: Andrew Davies <awdavies@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/developer/ffx/daemon/src/onet.rs b/src/developer/ffx/daemon/src/onet.rs
index 984c74f..2b6d732 100644
--- a/src/developer/ffx/daemon/src/onet.rs
+++ b/src/developer/ffx/daemon/src/onet.rs
@@ -145,49 +145,47 @@
         HostPipeConnection::new_with_cmd(target, HostPipeChild::new, RETRY_DELAY)
     }
 
-    fn new_with_cmd<F>(
+    async fn new_with_cmd<F>(
         target: WeakTarget,
         cmd_func: impl FnOnce(Arc<Ascendd>, Vec<TargetAddr>) -> F + Send + Copy + 'static,
         relaunch_command_delay: Duration,
-    ) -> impl Future<Output = Result<(), String>> + Send
+    ) -> Result<(), String>
     where
         F: futures::Future<Output = Result<HostPipeChild>> + Send,
     {
-        Task::blocking(async move {
-            loop {
-                log::debug!("Spawning new host-pipe instance");
-                let target = target.upgrade().ok_or("parent Arc<> lost. exiting".to_owned())?;
-                let addrs = target.addrs().await;
-                let mut cmd =
-                    cmd_func(target.ascendd().clone(), addrs).await.map_err(|e| e.to_string())?;
+        loop {
+            log::debug!("Spawning new host-pipe instance");
+            let target = target.upgrade().ok_or("parent Arc<> lost. exiting".to_owned())?;
+            let addrs = target.addrs().await;
+            let mut cmd =
+                cmd_func(target.ascendd().clone(), addrs).await.map_err(|e| e.to_string())?;
 
-                // Attempts to run the command. If it exits successfully (disconnect due to peer
-                // dropping) then will set the target to disconnected state. If
-                // there was an error running the command for some reason, then
-                // continue and attempt to run the command again.
-                match cmd
-                    .wait()
-                    .map_err(|e| format!("host-pipe error running try-wait: {}", e.to_string()))
-                {
-                    Ok(_) => {
-                        target
-                            .update_connection_state(|s| match s {
-                                ConnectionState::Rcs(_) => ConnectionState::Disconnected,
-                                _ => s,
-                            })
-                            .await;
-                        log::debug!("rcs disconnected, exiting");
-                        return Ok(());
-                    }
-                    Err(e) => log::debug!("running cmd: {:#?}", e),
+            // Attempts to run the command. If it exits successfully (disconnect due to peer
+            // dropping) then will set the target to disconnected state. If
+            // there was an error running the command for some reason, then
+            // continue and attempt to run the command again.
+            match Task::blocking(async move { cmd.wait() })
+                .await
+                .map_err(|e| format!("host-pipe error running try-wait: {}", e.to_string()))
+            {
+                Ok(_) => {
+                    target
+                        .update_connection_state(|s| match s {
+                            ConnectionState::Rcs(_) => ConnectionState::Disconnected,
+                            _ => s,
+                        })
+                        .await;
+                    log::debug!("rcs disconnected, exiting");
+                    return Ok(());
                 }
-
-                // TODO(fxbug.dev/52038): Want an exponential backoff that
-                // is sync'd with an explicit "try to start this again
-                // anyway" channel using a select! between the two of them.
-                Timer::new(relaunch_command_delay).await;
+                Err(e) => log::debug!("running cmd: {:#?}", e),
             }
-        })
+
+            // TODO(fxbug.dev/52038): Want an exponential backoff that
+            // is sync'd with an explicit "try to start this again
+            // anyway" channel using a select! between the two of them.
+            Timer::new(relaunch_command_delay).await;
+        }
     }
 }