[mock_cobalt] check errors and avoid spawn_local

...preferring returning futures explicitly so that they can be
scheduled by application code in main and in tests.

Test:
  fx run-test netstack_integration_tests -t \
    netstack_cobalt_integration_test

Change-Id: I5f287efe618406579f80ded4387ec7a899671ad8
diff --git a/src/cobalt/bin/testing/mock_cobalt/src/main.rs b/src/cobalt/bin/testing/mock_cobalt/src/main.rs
index 83430f1..bf42c02 100644
--- a/src/cobalt/bin/testing/mock_cobalt/src/main.rs
+++ b/src/cobalt/bin/testing/mock_cobalt/src/main.rs
@@ -62,234 +62,253 @@
 
 /// Create a new Logger. Accepts all `project_id` values.
 async fn run_cobalt_service(
-    mut stream: cobalt::LoggerFactoryRequestStream,
+    stream: cobalt::LoggerFactoryRequestStream,
     loggers: LoggersHandle,
-) -> Result<(), Error> {
-    while let Some(event) = stream.try_next().await? {
-        if let CreateLoggerFromProjectId { project_id, logger, responder } = event {
-            let log =
-                loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
-            fasync::spawn_local(handle_cobalt_logger(logger.into_stream()?, log));
-            responder.send(cobalt::Status::Ok)?;
-        } else {
-            unimplemented!(
-                "Logger factory request of type {:?} not supported by mock cobalt service",
-                event
-            );
-        }
-    }
-    Ok(())
+) -> Result<(), fidl::Error> {
+    stream
+        .try_for_each_concurrent(None, |event| async {
+            if let CreateLoggerFromProjectId { project_id, logger, responder } = event {
+                let log =
+                    loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
+                let handler = handle_cobalt_logger(logger.into_stream()?, log);
+                let () = responder.send(cobalt::Status::Ok)?;
+                handler.await
+            } else {
+                unimplemented!(
+                    "Logger factory request of type {:?} not supported by mock cobalt service",
+                    event
+                );
+            }
+        })
+        .await
 }
 
 /// Accepts all incoming log requests and records them in an in-memory store
-async fn handle_cobalt_logger(mut stream: cobalt::LoggerRequestStream, log: EventsLogHandle) {
+async fn handle_cobalt_logger(
+    stream: cobalt::LoggerRequestStream,
+    log: EventsLogHandle,
+) -> Result<(), fidl::Error> {
     use cobalt::LoggerRequest::*;
-    while let Some(Ok(event)) = stream.next().await {
-        let mut log = log.lock().await;
-        let log_state = match event {
-            LogEvent { metric_id, event_code, responder } => {
-                let state = &mut log.log_event;
-                state
-                    .log
-                    .push(CobaltEvent::builder(metric_id).with_event_code(event_code).as_event());
-                let _ = responder.send(cobalt::Status::Ok);
-                state
+    stream
+        .try_for_each_concurrent(None, |event| async {
+            let mut log = log.lock().await;
+            let log_state = match event {
+                LogEvent { metric_id, event_code, responder } => {
+                    let state = &mut log.log_event;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id).with_event_code(event_code).as_event(),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogEventCount {
+                    metric_id,
+                    event_code,
+                    component,
+                    period_duration_micros,
+                    count,
+                    responder,
+                } => {
+                    let state = &mut log.log_event_count;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id)
+                            .with_event_code(event_code)
+                            .with_component(component)
+                            .as_count_event(period_duration_micros, count),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogElapsedTime { metric_id, event_code, component, elapsed_micros, responder } => {
+                    let state = &mut log.log_elapsed_time;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id)
+                            .with_event_code(event_code)
+                            .with_component(component)
+                            .as_elapsed_time(elapsed_micros),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogFrameRate { metric_id, event_code, component, fps, responder } => {
+                    let state = &mut log.log_frame_rate;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id)
+                            .with_event_code(event_code)
+                            .with_component(component)
+                            .as_frame_rate(fps),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogMemoryUsage { metric_id, event_code, component, bytes, responder } => {
+                    let state = &mut log.log_memory_usage;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id)
+                            .with_event_code(event_code)
+                            .with_component(component)
+                            .as_memory_usage(bytes),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogIntHistogram { metric_id, event_code, component, histogram, responder } => {
+                    let state = &mut log.log_int_histogram;
+                    state.log.push(
+                        CobaltEvent::builder(metric_id)
+                            .with_event_code(event_code)
+                            .with_component(component)
+                            .as_int_histogram(histogram),
+                    );
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogCobaltEvent { event, responder } => {
+                    let state = &mut log.log_cobalt_event;
+                    state.log.push(event);
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                LogCobaltEvents { mut events, responder } => {
+                    let state = &mut log.log_cobalt_events;
+                    state.log.append(&mut events);
+                    let () = responder.send(cobalt::Status::Ok)?;
+                    state
+                }
+                e => unimplemented!("Event {:?} is not supported by the mock cobalt server", e),
+            };
+            while let Some(hanging_get_state) = log_state.hanging.pop() {
+                let mut last_observed = hanging_get_state.last_observed.lock().await;
+                let events = (&mut log_state.log)
+                    .iter()
+                    .skip(*last_observed)
+                    .take(MAX_QUERY_LENGTH)
+                    .map(Clone::clone)
+                    .collect();
+                *last_observed = log_state.log.len();
+                let () = hanging_get_state.responder.send(&mut Ok((events, false)))?;
             }
-            LogEventCount {
-                metric_id,
-                event_code,
-                component,
-                period_duration_micros,
-                count,
-                responder,
-            } => {
-                let state = &mut log.log_event_count;
-                state.log.push(
-                    CobaltEvent::builder(metric_id)
-                        .with_event_code(event_code)
-                        .with_component(component)
-                        .as_count_event(period_duration_micros, count),
-                );
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogElapsedTime { metric_id, event_code, component, elapsed_micros, responder } => {
-                let state = &mut log.log_elapsed_time;
-                state.log.push(
-                    CobaltEvent::builder(metric_id)
-                        .with_event_code(event_code)
-                        .with_component(component)
-                        .as_elapsed_time(elapsed_micros),
-                );
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogFrameRate { metric_id, event_code, component, fps, responder } => {
-                let state = &mut log.log_frame_rate;
-                state.log.push(
-                    CobaltEvent::builder(metric_id)
-                        .with_event_code(event_code)
-                        .with_component(component)
-                        .as_frame_rate(fps),
-                );
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogMemoryUsage { metric_id, event_code, component, bytes, responder } => {
-                let state = &mut log.log_memory_usage;
-                state.log.push(
-                    CobaltEvent::builder(metric_id)
-                        .with_event_code(event_code)
-                        .with_component(component)
-                        .as_memory_usage(bytes),
-                );
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogIntHistogram { metric_id, event_code, component, histogram, responder } => {
-                let state = &mut log.log_int_histogram;
-                state.log.push(
-                    CobaltEvent::builder(metric_id)
-                        .with_event_code(event_code)
-                        .with_component(component)
-                        .as_int_histogram(histogram),
-                );
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogCobaltEvent { event, responder } => {
-                let state = &mut log.log_cobalt_event;
-                state.log.push(event);
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            LogCobaltEvents { mut events, responder } => {
-                let state = &mut log.log_cobalt_events;
-                state.log.append(&mut events);
-                let _ = responder.send(cobalt::Status::Ok);
-                state
-            }
-            e => unimplemented!("Event {:?} is not supported by the mock cobalt server", e),
-        };
-
-        while let Some(hanging_get_state) = log_state.hanging.pop() {
-            let mut last_observed = hanging_get_state.last_observed.lock().await;
-            let events = (&mut log_state.log)
-                .iter()
-                .skip(*last_observed)
-                .take(MAX_QUERY_LENGTH)
-                .map(Clone::clone)
-                .collect();
-            *last_observed = log_state.log.len();
-            let _ = hanging_get_state.responder.send(&mut Ok((events, false)));
-        }
-    }
+            Ok(())
+        })
+        .await
 }
 
 /// Handles requests to query the state of the mock.
 async fn run_cobalt_query_service(
-    mut stream: cobalt_test::LoggerQuerierRequestStream,
+    stream: cobalt_test::LoggerQuerierRequestStream,
     loggers: LoggersHandle,
-) -> Result<(), Error> {
-    let mut client_state: HashMap<
-        u32,
-        HashMap<fidl_fuchsia_cobalt_test::LogMethod, Arc<Mutex<usize>>>,
-    > = HashMap::new();
+) -> Result<(), fidl::Error> {
     use cobalt_test::LogMethod::*;
 
-    while let Some(event) = stream.try_next().await? {
-        match event {
-            cobalt_test::LoggerQuerierRequest::WatchLogs { project_id, method, responder } => {
-                if let Some(state) = loggers.lock().await.get(&project_id) {
-                    let mut state = state.lock().await;
-                    let log_state = match method {
-                        LogEvent => &mut state.log_event,
-                        LogEventCount => &mut state.log_event_count,
-                        LogElapsedTime => &mut state.log_elapsed_time,
-                        LogFrameRate => &mut state.log_frame_rate,
-                        LogMemoryUsage => &mut state.log_memory_usage,
-                        LogIntHistogram => &mut state.log_int_histogram,
-                        LogCobaltEvent => &mut state.log_cobalt_event,
-                        LogCobaltEvents => &mut state.log_cobalt_events,
-                    };
-                    let last_observed = client_state
-                        .entry(project_id)
-                        .or_insert_with(Default::default)
-                        .entry(method)
-                        .or_insert_with(Default::default);
-                    let mut last_observed_len = last_observed.lock().await;
-                    let current_len = log_state.log.len();
-                    if current_len != *last_observed_len {
-                        let events = &mut log_state.log;
-                        let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
-                        let events = events
-                            .iter()
-                            .skip(*last_observed_len)
-                            .take(MAX_QUERY_LENGTH)
-                            .map(Clone::clone)
-                            .collect();
-                        *last_observed_len = current_len;
-                        responder.send(&mut Ok((events, more)))?;
-                    } else {
-                        log_state.hanging.push(HangingGetState {
-                            responder: responder,
-                            last_observed: last_observed.clone(),
-                        });
+    let _client_state: HashMap<_, _> = stream
+        .try_fold(
+            HashMap::new(),
+            |mut client_state: HashMap<
+                u32,
+                HashMap<fidl_fuchsia_cobalt_test::LogMethod, Arc<Mutex<usize>>>,
+            >,
+             event| async {
+                match event {
+                    cobalt_test::LoggerQuerierRequest::WatchLogs {
+                        project_id,
+                        method,
+                        responder,
+                    } => {
+                        if let Some(state) = loggers.lock().await.get(&project_id) {
+                            let mut state = state.lock().await;
+                            let log_state = match method {
+                                LogEvent => &mut state.log_event,
+                                LogEventCount => &mut state.log_event_count,
+                                LogElapsedTime => &mut state.log_elapsed_time,
+                                LogFrameRate => &mut state.log_frame_rate,
+                                LogMemoryUsage => &mut state.log_memory_usage,
+                                LogIntHistogram => &mut state.log_int_histogram,
+                                LogCobaltEvent => &mut state.log_cobalt_event,
+                                LogCobaltEvents => &mut state.log_cobalt_events,
+                            };
+                            let last_observed = client_state
+                                .entry(project_id)
+                                .or_insert_with(Default::default)
+                                .entry(method)
+                                .or_insert_with(Default::default);
+                            let mut last_observed_len = last_observed.lock().await;
+                            let current_len = log_state.log.len();
+                            if current_len != *last_observed_len {
+                                let events = &mut log_state.log;
+                                let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
+                                let events = events
+                                    .iter()
+                                    .skip(*last_observed_len)
+                                    .take(MAX_QUERY_LENGTH)
+                                    .map(Clone::clone)
+                                    .collect();
+                                *last_observed_len = current_len;
+                                let () = responder.send(&mut Ok((events, more)))?;
+                            } else {
+                                let () = log_state.hanging.push(HangingGetState {
+                                    responder,
+                                    last_observed: last_observed.clone(),
+                                });
+                            }
+                        } else {
+                            let () = responder
+                                .send(&mut Err(cobalt_test::QueryError::LoggerNotFound))?;
+                        }
                     }
-                } else {
-                    responder.send(&mut Err(cobalt_test::QueryError::LoggerNotFound))?;
-                }
-            }
-            cobalt_test::LoggerQuerierRequest::ResetLogger {
-                project_id,
-                method,
-                control_handle: _,
-            } => {
-                if let Some(log) = loggers.lock().await.get(&project_id) {
-                    let mut state = log.lock().await;
-                    match method {
-                        LogEvent => state.log_event.log.clear(),
-                        LogEventCount => state.log_event_count.log.clear(),
-                        LogElapsedTime => state.log_elapsed_time.log.clear(),
-                        LogFrameRate => state.log_frame_rate.log.clear(),
-                        LogMemoryUsage => state.log_memory_usage.log.clear(),
-                        LogIntHistogram => state.log_int_histogram.log.clear(),
-                        LogCobaltEvent => state.log_cobalt_event.log.clear(),
-                        LogCobaltEvents => state.log_cobalt_events.log.clear(),
+                    cobalt_test::LoggerQuerierRequest::ResetLogger {
+                        project_id,
+                        method,
+                        control_handle: _,
+                    } => {
+                        if let Some(log) = loggers.lock().await.get(&project_id) {
+                            let mut state = log.lock().await;
+                            match method {
+                                LogEvent => state.log_event.log.clear(),
+                                LogEventCount => state.log_event_count.log.clear(),
+                                LogElapsedTime => state.log_elapsed_time.log.clear(),
+                                LogFrameRate => state.log_frame_rate.log.clear(),
+                                LogMemoryUsage => state.log_memory_usage.log.clear(),
+                                LogIntHistogram => state.log_int_histogram.log.clear(),
+                                LogCobaltEvent => state.log_cobalt_event.log.clear(),
+                                LogCobaltEvents => state.log_cobalt_events.log.clear(),
+                            }
+                        }
                     }
                 }
-            }
-        }
-    }
+                Ok(client_state)
+            },
+        )
+        .await?;
     Ok(())
 }
 
+enum IncomingService {
+    Cobalt(fidl_fuchsia_cobalt::LoggerFactoryRequestStream),
+    Query(fidl_fuchsia_cobalt_test::LoggerQuerierRequestStream),
+}
+
 #[fasync::run_singlethreaded]
 async fn main() -> Result<(), Error> {
-    syslog::init_with_tags(&["mock-cobalt"]).expect("Can't init logger");
+    syslog::init_with_tags(&["mock-cobalt"])?;
     fx_log_info!("Starting mock cobalt service...");
 
     let loggers = LoggersHandle::default();
-    let loggers_copy = loggers.clone();
 
     let mut fs = fuchsia_component::server::ServiceFs::new_local();
     fs.dir("svc")
-        .add_fidl_service(move |stream| {
-            let loggers = loggers.clone();
-            fasync::spawn_local(async move {
-                run_cobalt_service(stream, loggers).await.expect("failed to run cobalt service");
-            })
-        })
-        .add_fidl_service(move |stream| {
-            let loggers = loggers_copy.clone();
-            fasync::spawn_local(async move {
-                run_cobalt_query_service(stream, loggers)
-                    .await
-                    .expect("failed to run cobalt query service");
-            })
-        });
+        .add_fidl_service(IncomingService::Cobalt)
+        .add_fidl_service(IncomingService::Query);
     fs.take_and_serve_directory_handle()?;
-    fs.collect::<()>().await;
+    fs.then(futures::future::ok)
+        .try_for_each_concurrent(None, |client_request| async {
+            let loggers = loggers.clone();
+            match client_request {
+                IncomingService::Cobalt(stream) => run_cobalt_service(stream, loggers).await,
+                IncomingService::Query(stream) => run_cobalt_query_service(stream, loggers).await,
+            }
+        })
+        .await?;
+
     Ok(())
 }