[ffx] reduce the scope of threaded sub-tasks
The onet subtasks here were taking on the responsibility of all of
the overnet client IO, reducing multiplexing effectiveness of the
root reactor, causing a lot more cross-thread wakeups than necessary
in normal operation. Move the overnet tasks back onto the main
reactor thread and only perform the wait2() on the "blocking"
reactor.
Change-Id: I5060fb7cdad0e45868629465bda9310fe27af574
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/475740
Fuchsia-Auto-Submit: James Tucker <raggi@google.com>
Reviewed-by: Andrew Davies <awdavies@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/developer/ffx/daemon/src/onet.rs b/src/developer/ffx/daemon/src/onet.rs
index 984c74f..2b6d732 100644
--- a/src/developer/ffx/daemon/src/onet.rs
+++ b/src/developer/ffx/daemon/src/onet.rs
@@ -145,49 +145,47 @@
HostPipeConnection::new_with_cmd(target, HostPipeChild::new, RETRY_DELAY)
}
- fn new_with_cmd<F>(
+ async fn new_with_cmd<F>(
target: WeakTarget,
cmd_func: impl FnOnce(Arc<Ascendd>, Vec<TargetAddr>) -> F + Send + Copy + 'static,
relaunch_command_delay: Duration,
- ) -> impl Future<Output = Result<(), String>> + Send
+ ) -> Result<(), String>
where
F: futures::Future<Output = Result<HostPipeChild>> + Send,
{
- Task::blocking(async move {
- loop {
- log::debug!("Spawning new host-pipe instance");
- let target = target.upgrade().ok_or("parent Arc<> lost. exiting".to_owned())?;
- let addrs = target.addrs().await;
- let mut cmd =
- cmd_func(target.ascendd().clone(), addrs).await.map_err(|e| e.to_string())?;
+ loop {
+ log::debug!("Spawning new host-pipe instance");
+ let target = target.upgrade().ok_or("parent Arc<> lost. exiting".to_owned())?;
+ let addrs = target.addrs().await;
+ let mut cmd =
+ cmd_func(target.ascendd().clone(), addrs).await.map_err(|e| e.to_string())?;
- // Attempts to run the command. If it exits successfully (disconnect due to peer
- // dropping) then will set the target to disconnected state. If
- // there was an error running the command for some reason, then
- // continue and attempt to run the command again.
- match cmd
- .wait()
- .map_err(|e| format!("host-pipe error running try-wait: {}", e.to_string()))
- {
- Ok(_) => {
- target
- .update_connection_state(|s| match s {
- ConnectionState::Rcs(_) => ConnectionState::Disconnected,
- _ => s,
- })
- .await;
- log::debug!("rcs disconnected, exiting");
- return Ok(());
- }
- Err(e) => log::debug!("running cmd: {:#?}", e),
+ // Attempts to run the command. If it exits successfully (disconnect due to peer
+ // dropping) then will set the target to disconnected state. If
+ // there was an error running the command for some reason, then
+ // continue and attempt to run the command again.
+ match Task::blocking(async move { cmd.wait() })
+ .await
+ .map_err(|e| format!("host-pipe error running try-wait: {}", e.to_string()))
+ {
+ Ok(_) => {
+ target
+ .update_connection_state(|s| match s {
+ ConnectionState::Rcs(_) => ConnectionState::Disconnected,
+ _ => s,
+ })
+ .await;
+ log::debug!("rcs disconnected, exiting");
+ return Ok(());
}
-
- // TODO(fxbug.dev/52038): Want an exponential backoff that
- // is sync'd with an explicit "try to start this again
- // anyway" channel using a select! between the two of them.
- Timer::new(relaunch_command_delay).await;
+ Err(e) => log::debug!("running cmd: {:#?}", e),
}
- })
+
+ // TODO(fxbug.dev/52038): Want an exponential backoff that
+ // is sync'd with an explicit "try to start this again
+ // anyway" channel using a select! between the two of them.
+ Timer::new(relaunch_command_delay).await;
+ }
}
}