[ffx] Fix possible source of flakiness in doctor tests.

Multiply: ffx_doctor_lib_test
Bug: 60385
Change-Id: Ib3cabd95e8734e22bf4db7eea66de554e8efa0fc
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/434315
Commit-Queue: Jordon Wing <jwing@google.com>
Reviewed-by: Matthew Boetger <boetger@google.com>
Reviewed-by: Andrew Davies <awdavies@google.com>
Testability-Review: Matthew Boetger <boetger@google.com>
diff --git a/src/developer/ffx/plugins/doctor/src/lib.rs b/src/developer/ffx/plugins/doctor/src/lib.rs
index 1ab2ffb..9c76cb1 100644
--- a/src/developer/ffx/plugins/doctor/src/lib.rs
+++ b/src/developer/ffx/plugins/doctor/src/lib.rs
@@ -264,7 +264,7 @@
 mod test {
     use {
         super::*,
-        async_std::task,
+        async_std::sync::{Arc, Mutex},
         async_trait::async_trait,
         fidl::endpoints::{spawn_local_stream_handler, Request, ServerEnd, ServiceMarker},
         fidl_fuchsia_developer_bridge::{
@@ -274,10 +274,10 @@
             IdentifyHostResponse, RemoteControlMarker, RemoteControlRequest,
         },
         fuchsia_async as fasync,
+        futures::channel::oneshot::{self, Receiver},
+        futures::future::Shared,
         futures::{Future, FutureExt, TryFutureExt, TryStreamExt},
         std::io::BufWriter,
-        std::ops::Add,
-        std::sync::{Arc, Mutex},
     };
 
     const NODENAME: &str = "fake-nodename";
@@ -321,8 +321,8 @@
             };
         }
 
-        fn assert_no_leftover_calls(&self) {
-            let state = self.state_manager.lock().unwrap();
+        async fn assert_no_leftover_calls(&self) {
+            let state = self.state_manager.lock().await;
             assert!(
                 state.kill_results.is_empty(),
                 format!("too few calls to kill_all. remaining entries: {:?}", state.kill_results)
@@ -351,25 +351,25 @@
     #[async_trait]
     impl DaemonManager for FakeDaemonManager {
         async fn kill_all(&self) -> Result<bool> {
-            let mut state = self.state_manager.lock().unwrap();
+            let mut state = self.state_manager.lock().await;
             assert!(!state.kill_results.is_empty(), "too many calls to kill_all");
             state.kill_results.remove(0)
         }
 
         async fn is_running(&self) -> bool {
-            let mut state = self.state_manager.lock().unwrap();
+            let mut state = self.state_manager.lock().await;
             assert!(!state.daemons_running_results.is_empty(), "too many calls to is_running");
             state.daemons_running_results.remove(0)
         }
 
         async fn spawn(&self) -> Result<()> {
-            let mut state = self.state_manager.lock().unwrap();
+            let mut state = self.state_manager.lock().await;
             assert!(!state.spawn_results.is_empty(), "too many calls to spawn");
             state.spawn_results.remove(0)
         }
 
         async fn find_and_connect(&self) -> Result<DaemonProxy> {
-            let mut state = self.state_manager.lock().unwrap();
+            let mut state = self.state_manager.lock().await;
             assert!(
                 !state.find_and_connect_results.is_empty(),
                 "too many calls to find_and_connect"
@@ -434,81 +434,88 @@
             },
         );
     }
-    fn serve_unresponsive_rcs(server_end: ServerEnd<RemoteControlMarker>) {
-        serve_stream::<RemoteControlMarker, _, _>(
-            server_end.into_stream().unwrap(),
-            move |req| async move {
+    fn serve_unresponsive_rcs(
+        server_end: ServerEnd<RemoteControlMarker>,
+        waiter: Shared<Receiver<()>>,
+    ) {
+        serve_stream::<RemoteControlMarker, _, _>(server_end.into_stream().unwrap(), move |req| {
+            let waiter = waiter.clone();
+            async move {
                 match req {
                     RemoteControlRequest::IdentifyHost { responder: _ } => {
-                        task::sleep(DEFAULT_RETRY_DELAY.add(Duration::from_millis(1000))).await;
+                        waiter.await.unwrap();
                     }
                     _ => panic!("Unexpected request: {:?}", req),
                 }
-            },
-        );
+            }
+        });
     }
 
-    fn setup_responsive_daemon_server_with_targets() -> DaemonProxy {
-        spawn_local_stream_handler(move |req| async move {
-            match req {
-                DaemonRequest::GetRemoteControl { remote, target, responder } => {
-                    if target == NODENAME {
-                        serve_responsive_rcs(remote);
-                    } else if target == UNRESPONSIVE_NODENAME {
-                        serve_unresponsive_rcs(remote);
-                    } else {
-                        panic!("got unexpected target string: '{}'", target);
+    fn setup_responsive_daemon_server_with_targets(waiter: Shared<Receiver<()>>) -> DaemonProxy {
+        spawn_local_stream_handler(move |req| {
+            let waiter = waiter.clone();
+            async move {
+                match req {
+                    DaemonRequest::GetRemoteControl { remote, target, responder } => {
+                        if target == NODENAME {
+                            serve_responsive_rcs(remote);
+                        } else if target == UNRESPONSIVE_NODENAME {
+                            serve_unresponsive_rcs(remote, waiter);
+                        } else {
+                            panic!("got unexpected target string: '{}'", target);
+                        }
+                        responder.send(&mut Ok(())).unwrap();
                     }
-                    responder.send(&mut Ok(())).unwrap();
-                }
-                DaemonRequest::EchoString { value, responder } => {
-                    responder.send(&value).unwrap();
-                }
-                DaemonRequest::ListTargets { value, responder } => {
-                    if !value.is_empty() && value != NODENAME && value != UNRESPONSIVE_NODENAME {
-                        responder.send(&mut vec![].drain(..)).unwrap();
-                    } else if value == NODENAME {
-                        responder
-                            .send(
-                                &mut vec![Target {
-                                    nodename: Some(NODENAME.to_string()),
-                                    addresses: Some(vec![]),
-                                    age_ms: Some(0),
-                                    rcs_state: Some(RemoteControlState::Unknown),
-                                    target_type: Some(TargetType::Unknown),
-                                    target_state: Some(TargetState::Unknown),
-                                }]
-                                .drain(..),
-                            )
-                            .unwrap();
-                    } else {
-                        responder
-                            .send(
-                                &mut vec![
-                                    Target {
+                    DaemonRequest::EchoString { value, responder } => {
+                        responder.send(&value).unwrap();
+                    }
+                    DaemonRequest::ListTargets { value, responder } => {
+                        if !value.is_empty() && value != NODENAME && value != UNRESPONSIVE_NODENAME
+                        {
+                            responder.send(&mut vec![].drain(..)).unwrap();
+                        } else if value == NODENAME {
+                            responder
+                                .send(
+                                    &mut vec![Target {
                                         nodename: Some(NODENAME.to_string()),
                                         addresses: Some(vec![]),
                                         age_ms: Some(0),
                                         rcs_state: Some(RemoteControlState::Unknown),
                                         target_type: Some(TargetType::Unknown),
                                         target_state: Some(TargetState::Unknown),
-                                    },
-                                    Target {
-                                        nodename: Some(UNRESPONSIVE_NODENAME.to_string()),
-                                        addresses: Some(vec![]),
-                                        age_ms: Some(0),
-                                        rcs_state: Some(RemoteControlState::Unknown),
-                                        target_type: Some(TargetType::Unknown),
-                                        target_state: Some(TargetState::Unknown),
-                                    },
-                                ]
-                                .drain(..),
-                            )
-                            .unwrap();
+                                    }]
+                                    .drain(..),
+                                )
+                                .unwrap();
+                        } else {
+                            responder
+                                .send(
+                                    &mut vec![
+                                        Target {
+                                            nodename: Some(NODENAME.to_string()),
+                                            addresses: Some(vec![]),
+                                            age_ms: Some(0),
+                                            rcs_state: Some(RemoteControlState::Unknown),
+                                            target_type: Some(TargetType::Unknown),
+                                            target_state: Some(TargetState::Unknown),
+                                        },
+                                        Target {
+                                            nodename: Some(UNRESPONSIVE_NODENAME.to_string()),
+                                            addresses: Some(vec![]),
+                                            age_ms: Some(0),
+                                            rcs_state: Some(RemoteControlState::Unknown),
+                                            target_type: Some(TargetType::Unknown),
+                                            target_state: Some(TargetState::Unknown),
+                                        },
+                                    ]
+                                    .drain(..),
+                                )
+                                .unwrap();
+                        }
                     }
-                }
-                _ => {
-                    assert!(false, format!("got unexpected request: {:?}", req));
+                    _ => {
+                        assert!(false, format!("got unexpected request: {:?}", req));
+                    }
                 }
             }
         })
@@ -535,20 +542,23 @@
         .unwrap()
     }
 
-    fn setup_daemon_server_echo_times_out() -> DaemonProxy {
-        spawn_local_stream_handler(move |req| async move {
-            match req {
-                DaemonRequest::GetRemoteControl { remote: _, target: _, responder: _ } => {
-                    panic!("unexpected daemon call");
-                }
-                DaemonRequest::EchoString { value: _, responder: _ } => {
-                    task::sleep(DEFAULT_RETRY_DELAY.add(Duration::from_millis(10))).await;
-                }
-                DaemonRequest::ListTargets { value: _, responder: _ } => {
-                    panic!("unexpected daemon call");
-                }
-                _ => {
-                    assert!(false, format!("got unexpected request: {:?}", req));
+    fn setup_daemon_server_echo_hangs(waiter: Shared<Receiver<()>>) -> DaemonProxy {
+        spawn_local_stream_handler(move |req| {
+            let waiter = waiter.clone();
+            async move {
+                match req {
+                    DaemonRequest::GetRemoteControl { remote: _, target: _, responder: _ } => {
+                        panic!("unexpected daemon call");
+                    }
+                    DaemonRequest::EchoString { value: _, responder: _ } => {
+                        waiter.await.unwrap();
+                    }
+                    DaemonRequest::ListTargets { value: _, responder: _ } => {
+                        panic!("unexpected daemon call");
+                    }
+                    _ => {
+                        assert!(false, format!("got unexpected request: {:?}", req));
+                    }
                 }
             }
         })
@@ -605,7 +615,7 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
@@ -641,7 +651,7 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
@@ -693,22 +703,28 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_two_tries_no_daemon_running_echo_timeout() {
+        let (tx, rx) = oneshot::channel::<()>();
+
         let fake = FakeDaemonManager::new(
             vec![false, true],
             vec![Ok(false), Ok(true)],
             vec![Ok(())],
-            vec![Ok(setup_daemon_server_echo_times_out()), Ok(setup_responsive_daemon_server())],
+            vec![
+                Ok(setup_daemon_server_echo_hangs(rx.shared())),
+                Ok(setup_responsive_daemon_server()),
+            ],
         );
 
         let mut output = String::new();
         {
             let mut writer = unsafe { BufWriter::new(output.as_mut_vec()) };
             doctor(&mut writer, &fake, "", 2, DEFAULT_RETRY_DELAY, false).await.unwrap();
+            tx.send(()).unwrap();
         }
 
         print_full_output(&output);
@@ -737,22 +753,25 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_finds_target_connects_to_rcs() {
+        let (tx, rx) = oneshot::channel::<()>();
+
         let fake = FakeDaemonManager::new(
             vec![true],
             vec![],
             vec![],
-            vec![Ok(setup_responsive_daemon_server_with_targets())],
+            vec![Ok(setup_responsive_daemon_server_with_targets(rx.shared()))],
         );
 
         let mut output = String::new();
         {
             let mut writer = unsafe { BufWriter::new(output.as_mut_vec()) };
             doctor(&mut writer, &fake, "", 1, DEFAULT_RETRY_DELAY, false).await.unwrap();
+            tx.send(()).unwrap();
         }
 
         print_full_output(&output);
@@ -784,22 +803,25 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_finds_target_with_filter() {
+        let (tx, rx) = oneshot::channel::<()>();
+
         let fake = FakeDaemonManager::new(
             vec![true],
             vec![],
             vec![],
-            vec![Ok(setup_responsive_daemon_server_with_targets())],
+            vec![Ok(setup_responsive_daemon_server_with_targets(rx.shared()))],
         );
 
         let mut output = String::new();
         {
             let mut writer = unsafe { BufWriter::new(output.as_mut_vec()) };
             doctor(&mut writer, &fake, &NODENAME, 2, DEFAULT_RETRY_DELAY, false).await.unwrap();
+            tx.send(()).unwrap();
         }
 
         print_full_output(&output);
@@ -823,16 +845,18 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
     async fn test_invalid_filter_finds_no_targets() {
+        let (tx, rx) = oneshot::channel::<()>();
+
         let fake = FakeDaemonManager::new(
             vec![true],
             vec![],
             vec![],
-            vec![Ok(setup_responsive_daemon_server_with_targets())],
+            vec![Ok(setup_responsive_daemon_server_with_targets(rx.shared()))],
         );
 
         let mut output = String::new();
@@ -841,6 +865,7 @@
             doctor(&mut writer, &fake, &NON_EXISTENT_NODENAME, 1, DEFAULT_RETRY_DELAY, false)
                 .await
                 .unwrap();
+            tx.send(()).unwrap();
         }
 
         print_full_output(&output);
@@ -861,7 +886,7 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 
     #[fasync::run_singlethreaded(test)]
@@ -900,6 +925,6 @@
             ],
         );
 
-        fake.assert_no_leftover_calls();
+        fake.assert_no_leftover_calls().await;
     }
 }