[mock_cobalt] check errors and avoid spawn_local
...preferring returning futures explicitly so that they can be
scheduled by application code in main and in tests.
Test:
fx run-test netstack_integration_tests -t \
netstack_cobalt_integration_test
Change-Id: I5f287efe618406579f80ded4387ec7a899671ad8
diff --git a/src/cobalt/bin/testing/mock_cobalt/src/main.rs b/src/cobalt/bin/testing/mock_cobalt/src/main.rs
index 83430f1..bf42c02 100644
--- a/src/cobalt/bin/testing/mock_cobalt/src/main.rs
+++ b/src/cobalt/bin/testing/mock_cobalt/src/main.rs
@@ -62,234 +62,253 @@
/// Create a new Logger. Accepts all `project_id` values.
async fn run_cobalt_service(
- mut stream: cobalt::LoggerFactoryRequestStream,
+ stream: cobalt::LoggerFactoryRequestStream,
loggers: LoggersHandle,
-) -> Result<(), Error> {
- while let Some(event) = stream.try_next().await? {
- if let CreateLoggerFromProjectId { project_id, logger, responder } = event {
- let log =
- loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
- fasync::spawn_local(handle_cobalt_logger(logger.into_stream()?, log));
- responder.send(cobalt::Status::Ok)?;
- } else {
- unimplemented!(
- "Logger factory request of type {:?} not supported by mock cobalt service",
- event
- );
- }
- }
- Ok(())
+) -> Result<(), fidl::Error> {
+ stream
+ .try_for_each_concurrent(None, |event| async {
+ if let CreateLoggerFromProjectId { project_id, logger, responder } = event {
+ let log =
+ loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
+ let handler = handle_cobalt_logger(logger.into_stream()?, log);
+ let () = responder.send(cobalt::Status::Ok)?;
+ handler.await
+ } else {
+ unimplemented!(
+ "Logger factory request of type {:?} not supported by mock cobalt service",
+ event
+ );
+ }
+ })
+ .await
}
/// Accepts all incoming log requests and records them in an in-memory store
-async fn handle_cobalt_logger(mut stream: cobalt::LoggerRequestStream, log: EventsLogHandle) {
+async fn handle_cobalt_logger(
+ stream: cobalt::LoggerRequestStream,
+ log: EventsLogHandle,
+) -> Result<(), fidl::Error> {
use cobalt::LoggerRequest::*;
- while let Some(Ok(event)) = stream.next().await {
- let mut log = log.lock().await;
- let log_state = match event {
- LogEvent { metric_id, event_code, responder } => {
- let state = &mut log.log_event;
- state
- .log
- .push(CobaltEvent::builder(metric_id).with_event_code(event_code).as_event());
- let _ = responder.send(cobalt::Status::Ok);
- state
+ stream
+ .try_for_each_concurrent(None, |event| async {
+ let mut log = log.lock().await;
+ let log_state = match event {
+ LogEvent { metric_id, event_code, responder } => {
+ let state = &mut log.log_event;
+ state.log.push(
+ CobaltEvent::builder(metric_id).with_event_code(event_code).as_event(),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogEventCount {
+ metric_id,
+ event_code,
+ component,
+ period_duration_micros,
+ count,
+ responder,
+ } => {
+ let state = &mut log.log_event_count;
+ state.log.push(
+ CobaltEvent::builder(metric_id)
+ .with_event_code(event_code)
+ .with_component(component)
+ .as_count_event(period_duration_micros, count),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogElapsedTime { metric_id, event_code, component, elapsed_micros, responder } => {
+ let state = &mut log.log_elapsed_time;
+ state.log.push(
+ CobaltEvent::builder(metric_id)
+ .with_event_code(event_code)
+ .with_component(component)
+ .as_elapsed_time(elapsed_micros),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogFrameRate { metric_id, event_code, component, fps, responder } => {
+ let state = &mut log.log_frame_rate;
+ state.log.push(
+ CobaltEvent::builder(metric_id)
+ .with_event_code(event_code)
+ .with_component(component)
+ .as_frame_rate(fps),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogMemoryUsage { metric_id, event_code, component, bytes, responder } => {
+ let state = &mut log.log_memory_usage;
+ state.log.push(
+ CobaltEvent::builder(metric_id)
+ .with_event_code(event_code)
+ .with_component(component)
+ .as_memory_usage(bytes),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogIntHistogram { metric_id, event_code, component, histogram, responder } => {
+ let state = &mut log.log_int_histogram;
+ state.log.push(
+ CobaltEvent::builder(metric_id)
+ .with_event_code(event_code)
+ .with_component(component)
+ .as_int_histogram(histogram),
+ );
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogCobaltEvent { event, responder } => {
+ let state = &mut log.log_cobalt_event;
+ state.log.push(event);
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ LogCobaltEvents { mut events, responder } => {
+ let state = &mut log.log_cobalt_events;
+ state.log.append(&mut events);
+ let () = responder.send(cobalt::Status::Ok)?;
+ state
+ }
+ e => unimplemented!("Event {:?} is not supported by the mock cobalt server", e),
+ };
+ while let Some(hanging_get_state) = log_state.hanging.pop() {
+ let mut last_observed = hanging_get_state.last_observed.lock().await;
+ let events = (&mut log_state.log)
+ .iter()
+ .skip(*last_observed)
+ .take(MAX_QUERY_LENGTH)
+ .map(Clone::clone)
+ .collect();
+ *last_observed = log_state.log.len();
+ let () = hanging_get_state.responder.send(&mut Ok((events, false)))?;
}
- LogEventCount {
- metric_id,
- event_code,
- component,
- period_duration_micros,
- count,
- responder,
- } => {
- let state = &mut log.log_event_count;
- state.log.push(
- CobaltEvent::builder(metric_id)
- .with_event_code(event_code)
- .with_component(component)
- .as_count_event(period_duration_micros, count),
- );
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogElapsedTime { metric_id, event_code, component, elapsed_micros, responder } => {
- let state = &mut log.log_elapsed_time;
- state.log.push(
- CobaltEvent::builder(metric_id)
- .with_event_code(event_code)
- .with_component(component)
- .as_elapsed_time(elapsed_micros),
- );
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogFrameRate { metric_id, event_code, component, fps, responder } => {
- let state = &mut log.log_frame_rate;
- state.log.push(
- CobaltEvent::builder(metric_id)
- .with_event_code(event_code)
- .with_component(component)
- .as_frame_rate(fps),
- );
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogMemoryUsage { metric_id, event_code, component, bytes, responder } => {
- let state = &mut log.log_memory_usage;
- state.log.push(
- CobaltEvent::builder(metric_id)
- .with_event_code(event_code)
- .with_component(component)
- .as_memory_usage(bytes),
- );
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogIntHistogram { metric_id, event_code, component, histogram, responder } => {
- let state = &mut log.log_int_histogram;
- state.log.push(
- CobaltEvent::builder(metric_id)
- .with_event_code(event_code)
- .with_component(component)
- .as_int_histogram(histogram),
- );
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogCobaltEvent { event, responder } => {
- let state = &mut log.log_cobalt_event;
- state.log.push(event);
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- LogCobaltEvents { mut events, responder } => {
- let state = &mut log.log_cobalt_events;
- state.log.append(&mut events);
- let _ = responder.send(cobalt::Status::Ok);
- state
- }
- e => unimplemented!("Event {:?} is not supported by the mock cobalt server", e),
- };
-
- while let Some(hanging_get_state) = log_state.hanging.pop() {
- let mut last_observed = hanging_get_state.last_observed.lock().await;
- let events = (&mut log_state.log)
- .iter()
- .skip(*last_observed)
- .take(MAX_QUERY_LENGTH)
- .map(Clone::clone)
- .collect();
- *last_observed = log_state.log.len();
- let _ = hanging_get_state.responder.send(&mut Ok((events, false)));
- }
- }
+ Ok(())
+ })
+ .await
}
/// Handles requests to query the state of the mock.
async fn run_cobalt_query_service(
- mut stream: cobalt_test::LoggerQuerierRequestStream,
+ stream: cobalt_test::LoggerQuerierRequestStream,
loggers: LoggersHandle,
-) -> Result<(), Error> {
- let mut client_state: HashMap<
- u32,
- HashMap<fidl_fuchsia_cobalt_test::LogMethod, Arc<Mutex<usize>>>,
- > = HashMap::new();
+) -> Result<(), fidl::Error> {
use cobalt_test::LogMethod::*;
- while let Some(event) = stream.try_next().await? {
- match event {
- cobalt_test::LoggerQuerierRequest::WatchLogs { project_id, method, responder } => {
- if let Some(state) = loggers.lock().await.get(&project_id) {
- let mut state = state.lock().await;
- let log_state = match method {
- LogEvent => &mut state.log_event,
- LogEventCount => &mut state.log_event_count,
- LogElapsedTime => &mut state.log_elapsed_time,
- LogFrameRate => &mut state.log_frame_rate,
- LogMemoryUsage => &mut state.log_memory_usage,
- LogIntHistogram => &mut state.log_int_histogram,
- LogCobaltEvent => &mut state.log_cobalt_event,
- LogCobaltEvents => &mut state.log_cobalt_events,
- };
- let last_observed = client_state
- .entry(project_id)
- .or_insert_with(Default::default)
- .entry(method)
- .or_insert_with(Default::default);
- let mut last_observed_len = last_observed.lock().await;
- let current_len = log_state.log.len();
- if current_len != *last_observed_len {
- let events = &mut log_state.log;
- let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
- let events = events
- .iter()
- .skip(*last_observed_len)
- .take(MAX_QUERY_LENGTH)
- .map(Clone::clone)
- .collect();
- *last_observed_len = current_len;
- responder.send(&mut Ok((events, more)))?;
- } else {
- log_state.hanging.push(HangingGetState {
- responder: responder,
- last_observed: last_observed.clone(),
- });
+ let _client_state: HashMap<_, _> = stream
+ .try_fold(
+ HashMap::new(),
+ |mut client_state: HashMap<
+ u32,
+ HashMap<fidl_fuchsia_cobalt_test::LogMethod, Arc<Mutex<usize>>>,
+ >,
+ event| async {
+ match event {
+ cobalt_test::LoggerQuerierRequest::WatchLogs {
+ project_id,
+ method,
+ responder,
+ } => {
+ if let Some(state) = loggers.lock().await.get(&project_id) {
+ let mut state = state.lock().await;
+ let log_state = match method {
+ LogEvent => &mut state.log_event,
+ LogEventCount => &mut state.log_event_count,
+ LogElapsedTime => &mut state.log_elapsed_time,
+ LogFrameRate => &mut state.log_frame_rate,
+ LogMemoryUsage => &mut state.log_memory_usage,
+ LogIntHistogram => &mut state.log_int_histogram,
+ LogCobaltEvent => &mut state.log_cobalt_event,
+ LogCobaltEvents => &mut state.log_cobalt_events,
+ };
+ let last_observed = client_state
+ .entry(project_id)
+ .or_insert_with(Default::default)
+ .entry(method)
+ .or_insert_with(Default::default);
+ let mut last_observed_len = last_observed.lock().await;
+ let current_len = log_state.log.len();
+ if current_len != *last_observed_len {
+ let events = &mut log_state.log;
+ let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
+ let events = events
+ .iter()
+ .skip(*last_observed_len)
+ .take(MAX_QUERY_LENGTH)
+ .map(Clone::clone)
+ .collect();
+ *last_observed_len = current_len;
+ let () = responder.send(&mut Ok((events, more)))?;
+ } else {
+ let () = log_state.hanging.push(HangingGetState {
+ responder,
+ last_observed: last_observed.clone(),
+ });
+ }
+ } else {
+ let () = responder
+ .send(&mut Err(cobalt_test::QueryError::LoggerNotFound))?;
+ }
}
- } else {
- responder.send(&mut Err(cobalt_test::QueryError::LoggerNotFound))?;
- }
- }
- cobalt_test::LoggerQuerierRequest::ResetLogger {
- project_id,
- method,
- control_handle: _,
- } => {
- if let Some(log) = loggers.lock().await.get(&project_id) {
- let mut state = log.lock().await;
- match method {
- LogEvent => state.log_event.log.clear(),
- LogEventCount => state.log_event_count.log.clear(),
- LogElapsedTime => state.log_elapsed_time.log.clear(),
- LogFrameRate => state.log_frame_rate.log.clear(),
- LogMemoryUsage => state.log_memory_usage.log.clear(),
- LogIntHistogram => state.log_int_histogram.log.clear(),
- LogCobaltEvent => state.log_cobalt_event.log.clear(),
- LogCobaltEvents => state.log_cobalt_events.log.clear(),
+ cobalt_test::LoggerQuerierRequest::ResetLogger {
+ project_id,
+ method,
+ control_handle: _,
+ } => {
+ if let Some(log) = loggers.lock().await.get(&project_id) {
+ let mut state = log.lock().await;
+ match method {
+ LogEvent => state.log_event.log.clear(),
+ LogEventCount => state.log_event_count.log.clear(),
+ LogElapsedTime => state.log_elapsed_time.log.clear(),
+ LogFrameRate => state.log_frame_rate.log.clear(),
+ LogMemoryUsage => state.log_memory_usage.log.clear(),
+ LogIntHistogram => state.log_int_histogram.log.clear(),
+ LogCobaltEvent => state.log_cobalt_event.log.clear(),
+ LogCobaltEvents => state.log_cobalt_events.log.clear(),
+ }
+ }
}
}
- }
- }
- }
+ Ok(client_state)
+ },
+ )
+ .await?;
Ok(())
}
+enum IncomingService {
+ Cobalt(fidl_fuchsia_cobalt::LoggerFactoryRequestStream),
+ Query(fidl_fuchsia_cobalt_test::LoggerQuerierRequestStream),
+}
+
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
- syslog::init_with_tags(&["mock-cobalt"]).expect("Can't init logger");
+ syslog::init_with_tags(&["mock-cobalt"])?;
fx_log_info!("Starting mock cobalt service...");
let loggers = LoggersHandle::default();
- let loggers_copy = loggers.clone();
let mut fs = fuchsia_component::server::ServiceFs::new_local();
fs.dir("svc")
- .add_fidl_service(move |stream| {
- let loggers = loggers.clone();
- fasync::spawn_local(async move {
- run_cobalt_service(stream, loggers).await.expect("failed to run cobalt service");
- })
- })
- .add_fidl_service(move |stream| {
- let loggers = loggers_copy.clone();
- fasync::spawn_local(async move {
- run_cobalt_query_service(stream, loggers)
- .await
- .expect("failed to run cobalt query service");
- })
- });
+ .add_fidl_service(IncomingService::Cobalt)
+ .add_fidl_service(IncomingService::Query);
fs.take_and_serve_directory_handle()?;
- fs.collect::<()>().await;
+ fs.then(futures::future::ok)
+ .try_for_each_concurrent(None, |client_request| async {
+ let loggers = loggers.clone();
+ match client_request {
+ IncomingService::Cobalt(stream) => run_cobalt_service(stream, loggers).await,
+ IncomingService::Query(stream) => run_cobalt_query_service(stream, loggers).await,
+ }
+ })
+ .await?;
+
Ok(())
}