blob: d68a78edde19d4b895d6769ea427c43b16398ea5 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::Error,
fidl_fuchsia_cobalt::{
self as cobalt, CobaltEvent, LoggerFactoryRequest::CreateLoggerFromProjectId,
LoggerFactoryRequest::CreateLoggerFromProjectSpec,
},
fidl_fuchsia_cobalt_test as cobalt_test, fuchsia_async as fasync,
fuchsia_cobalt::CobaltEventExt,
fuchsia_syslog::{self as syslog, fx_log_info},
fuchsia_zircon_status as zx_status,
futures::{lock::Mutex, StreamExt, TryStreamExt},
std::{collections::HashMap, sync::Arc},
};
/// MAX_QUERY_LENGTH is used as a usize in this component
const MAX_QUERY_LENGTH: usize = cobalt_test::MAX_QUERY_LENGTH as usize;
struct LogState {
log: Vec<CobaltEvent>,
hanging: Vec<HangingGetState>,
}
/// Send a success response on a watch logs responder. Used to abstract over the WatchLogs and
/// WatchLogs2 fidl methods.
trait WatchLogsResponder {
fn send_ok(self: Box<Self>, events: Vec<CobaltEvent>, more: bool) -> Result<(), fidl::Error>;
}
impl WatchLogsResponder for cobalt_test::LoggerQuerierWatchLogsResponder {
fn send_ok(self: Box<Self>, events: Vec<CobaltEvent>, more: bool) -> Result<(), fidl::Error> {
self.send(&mut Ok((events, more)))
}
}
impl WatchLogsResponder for cobalt_test::LoggerQuerierWatchLogs2Responder {
fn send_ok(
self: Box<Self>,
mut events: Vec<CobaltEvent>,
more: bool,
) -> Result<(), fidl::Error> {
self.send(&mut events.iter_mut(), more)
}
}
#[derive(Default)]
// Does not record StartTimer, EndTimer, and LogCustomEvent requests
struct EventsLog {
log_event: LogState,
log_event_count: LogState,
log_elapsed_time: LogState,
log_frame_rate: LogState,
log_memory_usage: LogState,
log_int_histogram: LogState,
log_cobalt_event: LogState,
log_cobalt_events: LogState,
}
impl Default for LogState {
fn default() -> LogState {
LogState { log: vec![], hanging: vec![] }
}
}
struct HangingGetState {
// last_observed is concurrently mutated by calls to run_cobalt_query_service (one for each
// client of fuchsia.cobalt.test.LoggerQuerier) and calls to handle_cobalt_logger (one for each
// client of fuchsia.cobalt.Logger).
last_observed: Arc<Mutex<usize>>,
responder: Box<dyn WatchLogsResponder>,
}
// The LogState#log vectors in EventsLog are mutated by handle_cobalt_logger and
// concurrently observed by run_cobalt_query_service.
//
// The LogState#hanging vectors in EventsLog are concurrently mutated by run_cobalt_query_service
// (new values pushed) and handle_cobalt_logger (new values popped).
type EventsLogHandle = Arc<Mutex<EventsLog>>;
// Entries in the HashMap are concurrently added by run_cobalt_service and
// looked up by run_cobalt_query_service.
type LoggersHandle = Arc<Mutex<HashMap<u32, EventsLogHandle>>>;
/// Create a new Logger. Accepts all `project_id` values and `customer_id` values.
async fn run_cobalt_service(
stream: cobalt::LoggerFactoryRequestStream,
loggers: LoggersHandle,
) -> Result<(), fidl::Error> {
stream
.try_for_each_concurrent(None, |event| async {
if let CreateLoggerFromProjectId { project_id, logger, responder } = event {
let log =
loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
let handler = handle_cobalt_logger(logger.into_stream()?, log);
let () = responder.send(cobalt::Status::Ok)?;
handler.await
} else if let CreateLoggerFromProjectSpec { project_id, logger, responder, .. } = event
{
let log =
loggers.lock().await.entry(project_id).or_insert_with(Default::default).clone();
let handler = handle_cobalt_logger(logger.into_stream()?, log);
let () = responder.send(cobalt::Status::Ok)?;
handler.await
} else {
unimplemented!(
"Logger factory request of type {:?} not supported by mock cobalt service",
event
);
}
})
.await
}
/// Accepts all incoming log requests and records them in an in-memory store
async fn handle_cobalt_logger(
stream: cobalt::LoggerRequestStream,
log: EventsLogHandle,
) -> Result<(), fidl::Error> {
use cobalt::LoggerRequest::*;
let fut = stream.try_for_each_concurrent(None, |event| async {
let mut log = log.lock().await;
let log_state = match event {
LogEvent { metric_id, event_code, responder } => {
let state = &mut log.log_event;
state
.log
.push(CobaltEvent::builder(metric_id).with_event_code(event_code).as_event());
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogEventCount {
metric_id,
event_code,
component,
period_duration_micros,
count,
responder,
} => {
let state = &mut log.log_event_count;
state.log.push(
CobaltEvent::builder(metric_id)
.with_event_code(event_code)
.with_component(component)
.as_count_event(period_duration_micros, count),
);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogElapsedTime { metric_id, event_code, component, elapsed_micros, responder } => {
let state = &mut log.log_elapsed_time;
state.log.push(
CobaltEvent::builder(metric_id)
.with_event_code(event_code)
.with_component(component)
.as_elapsed_time(elapsed_micros),
);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogFrameRate { metric_id, event_code, component, fps, responder } => {
let state = &mut log.log_frame_rate;
state.log.push(
CobaltEvent::builder(metric_id)
.with_event_code(event_code)
.with_component(component)
.as_frame_rate(fps),
);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogMemoryUsage { metric_id, event_code, component, bytes, responder } => {
let state = &mut log.log_memory_usage;
state.log.push(
CobaltEvent::builder(metric_id)
.with_event_code(event_code)
.with_component(component)
.as_memory_usage(bytes),
);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogIntHistogram { metric_id, event_code, component, histogram, responder } => {
let state = &mut log.log_int_histogram;
state.log.push(
CobaltEvent::builder(metric_id)
.with_event_code(event_code)
.with_component(component)
.as_int_histogram(histogram),
);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogCobaltEvent { event, responder } => {
let state = &mut log.log_cobalt_event;
state.log.push(event);
let () = responder.send(cobalt::Status::Ok)?;
state
}
LogCobaltEvents { mut events, responder } => {
let state = &mut log.log_cobalt_events;
state.log.append(&mut events);
let () = responder.send(cobalt::Status::Ok)?;
state
}
e => unimplemented!("Event {:?} is not supported by the mock cobalt server", e),
};
while let Some(hanging_get_state) = log_state.hanging.pop() {
let mut last_observed = hanging_get_state.last_observed.lock().await;
let events = (&mut log_state.log)
.iter()
.skip(*last_observed)
.take(MAX_QUERY_LENGTH)
.map(Clone::clone)
.collect();
*last_observed = log_state.log.len();
let () = hanging_get_state.responder.send_ok(events, false)?;
}
Ok(())
});
match fut.await {
// Don't consider PEER_CLOSED to be an error.
Err(fidl::Error::ServerResponseWrite(zx_status::Status::PEER_CLOSED)) => Ok(()),
other => other,
}
}
/// Handles requests to query the state of the mock.
async fn run_cobalt_query_service(
stream: cobalt_test::LoggerQuerierRequestStream,
loggers: LoggersHandle,
) -> Result<(), fidl::Error> {
use cobalt_test::LogMethod::*;
let _client_state: HashMap<_, _> = stream
.try_fold(
HashMap::new(),
|mut client_state: HashMap<
u32,
HashMap<fidl_fuchsia_cobalt_test::LogMethod, Arc<Mutex<usize>>>,
>,
event| async {
match event {
cobalt_test::LoggerQuerierRequest::WatchLogs2 {
project_id,
method,
responder,
} => {
let state = loggers
.lock()
.await
.entry(project_id)
.or_insert_with(Default::default)
.clone();
let mut state = state.lock().await;
let log_state = match method {
LogEvent => &mut state.log_event,
LogEventCount => &mut state.log_event_count,
LogElapsedTime => &mut state.log_elapsed_time,
LogFrameRate => &mut state.log_frame_rate,
LogMemoryUsage => &mut state.log_memory_usage,
LogIntHistogram => &mut state.log_int_histogram,
LogCobaltEvent => &mut state.log_cobalt_event,
LogCobaltEvents => &mut state.log_cobalt_events,
};
let last_observed = client_state
.entry(project_id)
.or_insert_with(Default::default)
.entry(method)
.or_insert_with(Default::default);
let mut last_observed_len = last_observed.lock().await;
let current_len = log_state.log.len();
if current_len != *last_observed_len {
let events = &mut log_state.log;
let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
let mut events: Vec<_> = events
.iter()
.skip(*last_observed_len)
.take(MAX_QUERY_LENGTH)
.cloned()
.collect();
*last_observed_len = current_len;
let () = responder.send(&mut events.iter_mut(), more)?;
} else {
let () = log_state.hanging.push(HangingGetState {
responder: Box::new(responder),
last_observed: last_observed.clone(),
});
}
}
cobalt_test::LoggerQuerierRequest::WatchLogs {
project_id,
method,
responder,
} => {
if let Some(state) = loggers.lock().await.get(&project_id) {
let mut state = state.lock().await;
let log_state = match method {
LogEvent => &mut state.log_event,
LogEventCount => &mut state.log_event_count,
LogElapsedTime => &mut state.log_elapsed_time,
LogFrameRate => &mut state.log_frame_rate,
LogMemoryUsage => &mut state.log_memory_usage,
LogIntHistogram => &mut state.log_int_histogram,
LogCobaltEvent => &mut state.log_cobalt_event,
LogCobaltEvents => &mut state.log_cobalt_events,
};
let last_observed = client_state
.entry(project_id)
.or_insert_with(Default::default)
.entry(method)
.or_insert_with(Default::default);
let mut last_observed_len = last_observed.lock().await;
let current_len = log_state.log.len();
if current_len != *last_observed_len {
let events = &mut log_state.log;
let more = events.len() > cobalt_test::MAX_QUERY_LENGTH as usize;
let events = events
.iter()
.skip(*last_observed_len)
.take(MAX_QUERY_LENGTH)
.cloned()
.collect();
*last_observed_len = current_len;
let () = responder.send(&mut Ok((events, more)))?;
} else {
let () = log_state.hanging.push(HangingGetState {
responder: Box::new(responder),
last_observed: last_observed.clone(),
});
}
} else {
let () = responder
.send(&mut Err(cobalt_test::QueryError::LoggerNotFound))?;
}
}
cobalt_test::LoggerQuerierRequest::ResetLogger {
project_id,
method,
control_handle: _,
} => {
if let Some(log) = loggers.lock().await.get(&project_id) {
let mut state = log.lock().await;
match method {
LogEvent => state.log_event.log.clear(),
LogEventCount => state.log_event_count.log.clear(),
LogElapsedTime => state.log_elapsed_time.log.clear(),
LogFrameRate => state.log_frame_rate.log.clear(),
LogMemoryUsage => state.log_memory_usage.log.clear(),
LogIntHistogram => state.log_int_histogram.log.clear(),
LogCobaltEvent => state.log_cobalt_event.log.clear(),
LogCobaltEvents => state.log_cobalt_events.log.clear(),
}
}
}
}
Ok(client_state)
},
)
.await?;
Ok(())
}
enum IncomingService {
Cobalt(fidl_fuchsia_cobalt::LoggerFactoryRequestStream),
Query(fidl_fuchsia_cobalt_test::LoggerQuerierRequestStream),
}
#[fasync::run_singlethreaded]
async fn main() -> Result<(), Error> {
syslog::init_with_tags(&["mock-cobalt"])?;
fx_log_info!("Starting mock cobalt service...");
let loggers = LoggersHandle::default();
let mut fs = fuchsia_component::server::ServiceFs::new_local();
fs.dir("svc")
.add_fidl_service(IncomingService::Cobalt)
.add_fidl_service(IncomingService::Query);
fs.take_and_serve_directory_handle()?;
fs.then(futures::future::ok)
.try_for_each_concurrent(None, |client_request| async {
let loggers = loggers.clone();
match client_request {
IncomingService::Cobalt(stream) => run_cobalt_service(stream, loggers).await,
IncomingService::Query(stream) => run_cobalt_query_service(stream, loggers).await,
}
})
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use fidl::endpoints::{create_proxy, create_proxy_and_stream};
use fidl_fuchsia_cobalt::*;
use fidl_fuchsia_cobalt_test::{LogMethod, LoggerQuerierMarker, QueryError};
use fuchsia_async as fasync;
use futures::FutureExt;
#[fasync::run_until_stalled(test)]
async fn mock_logger_factory() {
let loggers = LoggersHandle::default();
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (_logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
assert!(loggers.lock().await.is_empty());
factory_proxy
.create_logger_from_project_id(1234, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
assert!(loggers.lock().await.get(&1234).is_some());
}
#[fasync::run_until_stalled(test)]
async fn mock_logger_and_query_interface_single_event() {
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
factory_proxy
.create_logger_from_project_id(123, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
// Log a single event.
logger_proxy.log_event(1, 2).await.expect("log_event fidl call to succeed");
assert_eq!(
Ok((vec![CobaltEvent::builder(1).with_event_code(2).as_event()], false)),
querier_proxy
.watch_logs(123, LogMethod::LogEvent)
.await
.expect("log_event fidl call to succeed")
);
}
#[fasync::run_until_stalled(test)]
async fn mock_logger_and_query_interface_multiple_events() {
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
factory_proxy
.create_logger_from_project_id(12, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
// Log 1 more than the maximum number of events that can be stored and assert that
// `more` flag is true on logger query request.
for i in 0..(MAX_QUERY_LENGTH as u32 + 1) {
logger_proxy
.log_event(i, i + 1)
.await
.expect("repeated log_event fidl call to succeed");
}
let (events, more) = querier_proxy
.watch_logs(12, LogMethod::LogEvent)
.await
.expect("watch_logs fidl call to succeed")
.expect("logger to exist and have recorded events");
assert_eq!(CobaltEvent::builder(0).with_event_code(1).as_event(), events[0]);
assert_eq!(MAX_QUERY_LENGTH, events.len());
assert!(more);
}
#[fasync::run_until_stalled(test)]
async fn mock_query_interface_no_logger_error() {
let loggers = LoggersHandle::default();
// Create channel.
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handler. Any failures in the service spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
// Assert on initial state.
assert_eq!(
Err(QueryError::LoggerNotFound),
querier_proxy
.watch_logs(1, LogMethod::LogEvent)
.await
.expect("watch_logs fidl call to succeed")
);
}
#[fasync::run_until_stalled(test)]
async fn mock_logger_and_query_interface_single_event2() {
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
factory_proxy
.create_logger_from_project_id(123, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
// Log a single event.
logger_proxy.log_event(1, 2).await.expect("log_event fidl call to succeed");
assert_eq!(
(vec![CobaltEvent::builder(1).with_event_code(2).as_event()], false),
querier_proxy
.watch_logs2(123, LogMethod::LogEvent)
.await
.expect("log_event fidl call to succeed")
);
}
#[fasync::run_until_stalled(test)]
async fn mock_logger_and_query_interface_multiple_events2() {
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
factory_proxy
.create_logger_from_project_id(12, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
// Log 1 more than the maximum number of events that can be stored and assert that
// `more` flag is true on logger query request.
for i in 0..(MAX_QUERY_LENGTH as u32 + 1) {
logger_proxy
.log_event(i, i + 1)
.await
.expect("repeated log_event fidl call to succeed");
}
let (events, more) = querier_proxy
.watch_logs2(12, LogMethod::LogEvent)
.await
.expect("watch_logs2 fidl call to succeed");
assert_eq!(CobaltEvent::builder(0).with_event_code(1).as_event(), events[0]);
assert_eq!(MAX_QUERY_LENGTH, events.len());
assert!(more);
}
#[test]
fn mock_query_interface_no_logger_never_completes() {
let mut exec = fasync::Executor::new().unwrap();
let loggers = LoggersHandle::default();
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers).map(|_| ())).detach();
// watch_logs2 query does not complete without a logger for the requested project id.
let watch_logs_fut = querier_proxy.watch_logs2(123, LogMethod::LogEvent);
futures::pin_mut!(watch_logs_fut);
assert!(exec.run_until_stalled(&mut watch_logs_fut).is_pending());
// Create a new logger for the requested project id
let create_logger_fut = factory_proxy.create_logger_from_project_id(123, server);
futures::pin_mut!(create_logger_fut);
exec.run_until_stalled(&mut create_logger_fut)
.expect("logger creation future to complete")
.expect("create_logger_from_project_id fidl call to succeed");
// watch_logs2 query still does not complete without a LogEvent for the requested project
// id.
assert!(exec.run_until_stalled(&mut watch_logs_fut).is_pending());
// Log a single event
let log_event_fut = logger_proxy.log_event(1, 2);
futures::pin_mut!(log_event_fut);
exec.run_until_stalled(&mut log_event_fut)
.expect("log event future to complete")
.expect("log_event fidl call to succeed");
// finally, now that a logger and log event have been created, watch_logs2 query will
// succeed.
let result = exec
.run_until_stalled(&mut watch_logs_fut)
.expect("log_event future to complete")
.expect("log_event fidl call to succeed");
assert_eq!((vec![CobaltEvent::builder(1).with_event_code(2).as_event()], false), result);
}
#[test]
fn mock_query_interface_no_events_never_completes() {
let mut exec = fasync::Executor::new().unwrap();
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
let (_logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
let test = async move {
factory_proxy
.create_logger_from_project_id(123, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
assert_eq!(
(vec![CobaltEvent::builder(1).with_event_code(2).as_event()], false),
querier_proxy
.watch_logs2(123, LogMethod::LogEvent)
.await
.expect("log_event fidl call to succeed")
);
};
futures::pin_mut!(test);
assert!(exec.run_until_stalled(&mut test).is_pending());
}
#[fasync::run_until_stalled(test)]
async fn mock_logger_logger_type_tracking() -> Result<(), fidl::Error> {
let loggers = LoggersHandle::default();
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
let project_id = 1;
factory_proxy
.create_logger_from_project_id(project_id, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
let metric_id = 1;
let event_code = 2;
let component_name = "component";
let period_duration_micros = 0;
let count = 3;
let frame_rate: f32 = 59.9;
logger_proxy.log_event(metric_id, event_code).await?;
logger_proxy
.log_event_count(metric_id, event_code, component_name, period_duration_micros, count)
.await?;
logger_proxy
.log_elapsed_time(metric_id, event_code, component_name, period_duration_micros)
.await?;
logger_proxy.log_memory_usage(metric_id, event_code, component_name, count).await?;
logger_proxy.log_frame_rate(metric_id, event_code, component_name, frame_rate).await?;
logger_proxy
.log_int_histogram(metric_id, event_code, component_name, &mut vec![].into_iter())
.await?;
logger_proxy
.log_cobalt_event(&mut cobalt::CobaltEvent {
metric_id,
event_codes: vec![event_code],
component: Some(component_name.to_string()),
payload: cobalt::EventPayload::Event(cobalt::Event {}),
})
.await?;
logger_proxy.log_cobalt_events(&mut vec![].into_iter()).await?;
let log = loggers.lock().await;
let log = log.get(&project_id).expect("project should have been created");
let state = log.lock().await;
assert_eq!(state.log_event.log.len(), 1);
assert_eq!(state.log_event_count.log.len(), 1);
assert_eq!(state.log_elapsed_time.log.len(), 1);
assert_eq!(state.log_memory_usage.log.len(), 1);
assert_eq!(state.log_frame_rate.log.len(), 1);
assert_eq!(state.log_int_histogram.log.len(), 1);
assert_eq!(state.log_cobalt_event.log.len(), 1);
Ok(())
}
#[fasync::run_until_stalled(test)]
async fn mock_query_interface_reset_state() {
let loggers = LoggersHandle::default();
// Create channels.
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, server) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
// Spawn service handlers. Any failures in the services spawned here will trigger panics
// via expect method calls below.
fasync::Task::local(run_cobalt_service(factory_stream, loggers.clone()).map(|_| ()))
.detach();
fasync::Task::local(run_cobalt_query_service(query_stream, loggers.clone()).map(|_| ()))
.detach();
factory_proxy
.create_logger_from_project_id(987, server)
.await
.expect("create_logger_from_project_id fidl call to succeed");
// Log a single event.
logger_proxy.log_event(1, 2).await.expect("log_event fidl call to succeed");
assert_eq!(
(vec![CobaltEvent::builder(1).with_event_code(2).as_event()], false),
querier_proxy
.watch_logs2(987, LogMethod::LogEvent)
.await
.expect("log_event fidl call to succeed")
);
// Clear logger state.
querier_proxy
.reset_logger(987, LogMethod::LogEvent)
.expect("reset_logger fidl call to succeed");
assert_eq!(
(vec![], false),
querier_proxy
.watch_logs2(987, LogMethod::LogEvent)
.await
.expect("watch_logs2 fidl call to succeed")
);
}
#[test]
fn mock_query_interface_hanging_get() {
let mut executor = fuchsia_async::Executor::new().unwrap();
let loggers = LoggersHandle::default();
let (factory_proxy, factory_stream) = create_proxy_and_stream::<LoggerFactoryMarker>()
.expect("create logger factroy proxy and stream to succeed");
let (logger_proxy, logger_proxy_server_end) =
create_proxy::<LoggerMarker>().expect("create logger proxy and server end to succeed");
let (querier_proxy, query_stream) = create_proxy_and_stream::<LoggerQuerierMarker>()
.expect("create logger querier proxy and stream to succeed");
let cobalt_service = run_cobalt_service(factory_stream, loggers.clone());
let cobalt_query_service = run_cobalt_query_service(query_stream, loggers.clone());
futures::pin_mut!(cobalt_service);
futures::pin_mut!(cobalt_query_service);
// Neither of these futures should ever return if there no errors, so the joined future
// will never return.
let services = futures::future::join(cobalt_service, cobalt_query_service);
let project_id = 765;
let mut create_logger = futures::future::select(
services,
factory_proxy.create_logger_from_project_id(project_id, logger_proxy_server_end),
);
let create_logger_poll = executor.run_until_stalled(&mut create_logger);
assert!(create_logger_poll.is_ready());
let mut continuation = match create_logger_poll {
core::task::Poll::Pending => {
unreachable!("we asserted that create_logger_poll was ready")
}
core::task::Poll::Ready(either) => match either {
futures::future::Either::Left(_services_future_returned) => unreachable!(
"unexpected services future return (cannot be formatted with default formatter)"
),
futures::future::Either::Right((create_logger_status, services_continuation)) => {
assert_eq!(
create_logger_status.expect("fidl call failed"),
fidl_fuchsia_cobalt::Status::Ok
);
services_continuation
}
},
};
// Resolve two hanging gets and ensure that only the novel data (the same data both times)
// is returned.
for _ in 0..2 {
let watch_logs_hanging_get = querier_proxy.watch_logs2(project_id, LogMethod::LogEvent);
let mut watch_logs_hanging_get =
futures::future::select(continuation, watch_logs_hanging_get);
let watch_logs_poll = executor.run_until_stalled(&mut watch_logs_hanging_get);
assert!(watch_logs_poll.is_pending());
let event_metric_id = 1;
let event_code = 2;
let log_event = logger_proxy.log_event(event_metric_id, event_code);
let mut resolved_hanging_get = futures::future::join(watch_logs_hanging_get, log_event);
let resolved_hanging_get = executor.run_until_stalled(&mut resolved_hanging_get);
assert!(resolved_hanging_get.is_ready());
continuation = match resolved_hanging_get {
core::task::Poll::Pending => {
unreachable!("we asserted that resolved_hanging_get was ready")
}
core::task::Poll::Ready((watch_logs_result, log_event_result)) => {
assert_eq!(
log_event_result.expect("expected log event to succeed"),
fidl_fuchsia_cobalt::Status::Ok
);
match watch_logs_result {
futures::future::Either::Left(_services_future_returned) => unreachable!(
"unexpected services future return (cannot be formatted with the \
default formatter)"
),
futures::future::Either::Right((
cobalt_query_result,
services_continuation,
)) => {
let (mut logged_events, more) = cobalt_query_result
.expect("expect cobalt query FIDL call to succeed");
assert_eq!(logged_events.len(), 1);
let mut logged_event = logged_events.pop().unwrap();
assert_eq!(logged_event.metric_id, event_metric_id);
assert_eq!(logged_event.event_codes.len(), 1);
assert_eq!(logged_event.event_codes.pop().unwrap(), event_code);
assert_eq!(more, false);
services_continuation
}
}
}
};
assert!(executor.run_until_stalled(&mut continuation).is_pending());
}
}
}