| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::events::types::*, |
| fuchsia_inspect::{self as inspect, NumericProperty}, |
| fuchsia_inspect_contrib::{inspect_log, nodes::BoundedListNode}, |
| futures::{channel::mpsc, StreamExt}, |
| }; |
| |
| pub struct EventStream { |
| sources: Vec<(String, Box<dyn EventSource>)>, |
| |
| // Inspect stats |
| node: inspect::Node, |
| components_started: inspect::UintProperty, |
| components_stopped: inspect::UintProperty, |
| components_seen_running: inspect::UintProperty, |
| diagnostics_directories_seen: inspect::UintProperty, |
| component_log_node: BoundedListNode, |
| } |
| |
| const RECENT_EVENT_LIMIT: usize = 200; |
| |
| impl EventStream { |
| /// Creates a new event listener. |
| pub fn new(node: inspect::Node) -> Self { |
| let components_started = node.create_uint("components_started", 0); |
| let components_seen_running = node.create_uint("components_seen_running", 0); |
| let components_stopped = node.create_uint("components_stopped", 0); |
| let diagnostics_directories_seen = node.create_uint("diagnostics_directories_seen", 0); |
| let component_log_node = |
| BoundedListNode::new(node.create_child("recent_events"), RECENT_EVENT_LIMIT); |
| Self { |
| sources: Vec::new(), |
| node, |
| components_started, |
| components_stopped, |
| components_seen_running, |
| diagnostics_directories_seen, |
| component_log_node, |
| } |
| } |
| |
| /// Adds an event source to listen from. |
| pub fn add_source(&mut self, name: impl Into<String>, source: Box<dyn EventSource>) { |
| self.sources.push((name.into(), source)); |
| } |
| |
| /// Subscribe to component lifecycle events. |
| /// |node| is the node where stats about events seen will be recorded. |
| pub async fn listen(mut self) -> ComponentEventStream { |
| let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY); |
| let sources_node = self.node.create_child("sources"); |
| for (name, source) in &self.sources { |
| let source_node = sources_node.create_child(name); |
| match source.listen(sender.clone()).await { |
| Ok(()) => {} |
| Err(err) => source_node.record_string("error", err.to_string()), |
| } |
| sources_node.record(source_node); |
| } |
| self.node.record(sources_node); |
| Box::pin(receiver.boxed().map(move |event| { |
| self.log_event(&event); |
| event |
| })) |
| } |
| |
| fn log_event(&mut self, event: &ComponentEvent) { |
| match event { |
| ComponentEvent::Start(start) => { |
| self.components_started.add(1); |
| self.log_inspect("START", &start.metadata.component_id); |
| } |
| ComponentEvent::Stop(stop) => { |
| self.components_stopped.add(1); |
| self.log_inspect("STOP", &stop.metadata.component_id); |
| } |
| ComponentEvent::Running(running) => { |
| self.components_seen_running.add(1); |
| self.log_inspect("RUNNING", &running.metadata.component_id); |
| } |
| ComponentEvent::DiagnosticsReady(diagnostics_ready) => { |
| self.diagnostics_directories_seen.add(1); |
| self.log_inspect("DIAGNOSTICS_DIR_READY", &diagnostics_ready.metadata.component_id); |
| } |
| } |
| } |
| |
| fn log_inspect(&mut self, event_name: &str, identifier: &ComponentIdentifier) { |
| inspect_log!(self.component_log_node, |
| event: event_name, |
| moniker: identifier.to_string(), |
| ); |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::*, |
| anyhow::{format_err, Error}, |
| async_trait::async_trait, |
| fuchsia_async as fasync, |
| fuchsia_inspect::{self as inspect, assert_inspect_tree}, |
| fuchsia_zircon as zx, |
| futures::SinkExt, |
| lazy_static::lazy_static, |
| }; |
| |
| struct FakeEventSource {} |
| struct FakeLegacyProvider {} |
| struct FakeFutureProvider {} |
| |
| lazy_static! { |
| static ref LEGACY_ID: ComponentIdentifier = ComponentIdentifier::Legacy(LegacyIdentifier { |
| component_name: "foo.cmx".to_string(), |
| instance_id: "12345".to_string(), |
| realm_path: RealmPath(vec!["a".to_string(), "b".to_string()]), |
| }); |
| static ref LEGACY_URL: String = "NO-OP URL".to_string(); |
| static ref MONIKER_ID: ComponentIdentifier = |
| ComponentIdentifier::Moniker("a:0/b:1".to_string()); |
| } |
| |
| #[async_trait] |
| impl EventSource for FakeEventSource { |
| async fn listen(&self, mut sender: mpsc::Sender<ComponentEvent>) -> Result<(), Error> { |
| let shared_data = EventMetadata { |
| component_id: MONIKER_ID.clone(), |
| component_url: LEGACY_URL.clone(), |
| timestamp: zx::Time::get_monotonic(), |
| }; |
| |
| sender |
| .send(ComponentEvent::Start(StartEvent { metadata: shared_data.clone() })) |
| .await |
| .expect("send start"); |
| sender |
| .send(ComponentEvent::DiagnosticsReady(DiagnosticsReadyEvent { |
| metadata: shared_data.clone(), |
| directory: None, |
| })) |
| .await |
| .expect("send diagnostics ready"); |
| sender |
| .send(ComponentEvent::Stop(StopEvent { metadata: shared_data.clone() })) |
| .await |
| .expect("send stop"); |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl EventSource for FakeLegacyProvider { |
| async fn listen(&self, mut sender: mpsc::Sender<ComponentEvent>) -> Result<(), Error> { |
| let shared_data = EventMetadata { |
| component_id: LEGACY_ID.clone(), |
| component_url: LEGACY_URL.clone(), |
| timestamp: zx::Time::get_monotonic(), |
| }; |
| |
| sender |
| .send(ComponentEvent::Start(StartEvent { metadata: shared_data.clone() })) |
| .await |
| .expect("send start"); |
| sender |
| .send(ComponentEvent::DiagnosticsReady(DiagnosticsReadyEvent { |
| metadata: shared_data.clone(), |
| directory: None, |
| })) |
| .await |
| .expect("send diagnostics ready"); |
| sender |
| .send(ComponentEvent::Stop(StopEvent { metadata: shared_data.clone() })) |
| .await |
| .expect("send stop"); |
| Ok(()) |
| } |
| } |
| |
| #[async_trait] |
| impl EventSource for FakeFutureProvider { |
| async fn listen(&self, _sender: mpsc::Sender<ComponentEvent>) -> Result<(), Error> { |
| Err(format_err!("not implemented yet")) |
| } |
| } |
| |
| async fn validate_events(stream: &mut ComponentEventStream, expected_id: &ComponentIdentifier) { |
| for i in 0..3 { |
| let event = stream.next().await.expect("received event"); |
| match (i, &event) { |
| (0, ComponentEvent::Start(StartEvent { metadata, .. })) |
| | (1, ComponentEvent::DiagnosticsReady(DiagnosticsReadyEvent { metadata, .. })) |
| | (2, ComponentEvent::Stop(StopEvent { metadata, .. })) => { |
| assert_eq!(metadata.component_id, *expected_id); |
| } |
| _ => panic!("unexpected event: {:?}", event), |
| } |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn joint_event_channel() { |
| let inspector = inspect::Inspector::new(); |
| let node = inspector.root().create_child("events"); |
| let mut stream = EventStream::new(node); |
| stream.add_source("v1", Box::new(FakeLegacyProvider {})); |
| stream.add_source("v2", Box::new(FakeEventSource {})); |
| stream.add_source("v3", Box::new(FakeFutureProvider {})); |
| let mut stream = stream.listen().await; |
| |
| validate_events(&mut stream, &LEGACY_ID).await; |
| validate_events(&mut stream, &MONIKER_ID).await; |
| |
| assert_inspect_tree!(inspector, root: { |
| events: { |
| sources: { |
| v1: { |
| }, |
| v2: { |
| }, |
| v3: { |
| error: "not implemented yet" |
| } |
| }, |
| components_started: 2u64, |
| components_stopped: 2u64, |
| components_seen_running: 0u64, |
| diagnostics_directories_seen: 2u64, |
| recent_events: { |
| "0": { |
| "@time": inspect::testing::AnyProperty, |
| event: "START", |
| moniker: "a/b/foo.cmx:12345" |
| }, |
| "1": { |
| "@time": inspect::testing::AnyProperty, |
| event: "DIAGNOSTICS_DIR_READY", |
| moniker: "a/b/foo.cmx:12345" |
| }, |
| "2": { |
| "@time": inspect::testing::AnyProperty, |
| event: "STOP", |
| moniker: "a/b/foo.cmx:12345" |
| }, |
| "3": { |
| "@time": inspect::testing::AnyProperty, |
| event: "START", |
| moniker: "a:0/b:1" |
| }, |
| "4": { |
| "@time": inspect::testing::AnyProperty, |
| event: "DIAGNOSTICS_DIR_READY", |
| moniker: "a:0/b:1" |
| }, |
| "5": { |
| "@time": inspect::testing::AnyProperty, |
| event: "STOP", |
| moniker: "a:0/b:1" |
| } |
| } |
| } |
| }); |
| } |
| } |