| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::events::types::{ComponentEvent, ComponentEventChannel, EventSource}, |
| anyhow::{format_err, Context, Error}, |
| async_trait::async_trait, |
| fidl_fuchsia_sys2 as fsys, fuchsia_async as fasync, |
| futures::{channel::mpsc, SinkExt, TryStreamExt}, |
| std::convert::TryInto, |
| tracing::error, |
| }; |
| |
| #[async_trait] |
| impl EventSource for fsys::EventSourceProxy { |
| /// Subscribe to component lifecycle events. |
| /// |node| is the node where stats about events seen will be recorded. |
| async fn listen(&self, sender: mpsc::Sender<ComponentEvent>) -> Result<(), Error> { |
| let (client_end, request_stream) = |
| fidl::endpoints::create_request_stream::<fsys::EventStreamMarker>()?; |
| let mut event_names = |
| vec!["running", "started", "stopped", "diagnostics_ready"].into_iter(); |
| let subscription = self.subscribe(&mut event_names, client_end); |
| subscription.await?.map_err(|error| format_err!("Error: {:?}", error))?; |
| EventStreamServer::new(sender).spawn(request_stream); |
| Ok(()) |
| } |
| } |
| |
| struct EventStreamServer { |
| sender: ComponentEventChannel, |
| } |
| |
| impl EventStreamServer { |
| fn new(sender: ComponentEventChannel) -> Self { |
| Self { sender } |
| } |
| } |
| |
| impl EventStreamServer { |
| fn spawn(self, stream: fsys::EventStreamRequestStream) { |
| fasync::Task::spawn(async move { |
| self.handle_request_stream(stream) |
| .await |
| .unwrap_or_else(|e: Error| error!(?e, "failed to run event stream server")); |
| }) |
| .detach(); |
| } |
| |
| async fn handle_request_stream( |
| mut self, |
| mut stream: fsys::EventStreamRequestStream, |
| ) -> Result<(), Error> { |
| while let Some(request) = |
| stream.try_next().await.context("Error running event stream server")? |
| { |
| match request { |
| fsys::EventStreamRequest::OnEvent { event, .. } => { |
| if let Ok(event) = event.try_into() { |
| self.send(event).await; |
| } |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn send(&mut self, event: ComponentEvent) { |
| // Ignore Err(SendError) result. If we fail to send it means that the archivist has been |
| // stopped and therefore the receving end of this channel is closed. A send operation can |
| // only fail if this is the case. |
| let _ = self.sender.send(event).await; |
| } |
| } |
| |
| #[cfg(test)] |
| pub mod tests { |
| use { |
| super::*, |
| crate::events::types::*, |
| fidl::endpoints::ClientEnd, |
| fidl_fuchsia_io::NodeMarker, |
| fuchsia_zircon as zx, |
| futures::{future::RemoteHandle, FutureExt, StreamExt}, |
| }; |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn event_stream() { |
| let (source_proxy, stream_receiver) = spawn_fake_event_source(); |
| let (sender, mut event_stream) = mpsc::channel(CHANNEL_CAPACITY); |
| source_proxy.listen(sender).await.expect("failed to listen"); |
| let stream_server = stream_receiver.await.into_proxy().expect("get stream proxy"); |
| |
| // Send a `Started` event. |
| stream_server |
| .on_event(fsys::Event { |
| header: Some(fsys::EventHeader { |
| event_type: Some(fsys::EventType::Started), |
| moniker: Some("./foo:0/bar:0".to_string()), |
| component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cmx".to_string()), |
| timestamp: Some(zx::Time::get_monotonic().into_nanos()), |
| ..fsys::EventHeader::EMPTY |
| }), |
| |
| ..fsys::Event::EMPTY |
| }) |
| .expect("send started event ok"); |
| |
| // Send a `Running` event. |
| stream_server |
| .on_event(fsys::Event { |
| header: Some(fsys::EventHeader { |
| event_type: Some(fsys::EventType::Running), |
| moniker: Some("./foo:0/bar:0".to_string()), |
| component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cmx".to_string()), |
| timestamp: Some(zx::Time::get_monotonic().into_nanos()), |
| ..fsys::EventHeader::EMPTY |
| }), |
| event_result: Some(fsys::EventResult::Payload(fsys::EventPayload::Running( |
| fsys::RunningPayload { |
| started_timestamp: Some(0), |
| ..fsys::RunningPayload::EMPTY |
| }, |
| ))), |
| ..fsys::Event::EMPTY |
| }) |
| .expect("send running event ok"); |
| |
| // Send a `CapabilityReady` event for diagnostics. |
| let (node, _) = fidl::endpoints::create_request_stream::<NodeMarker>().unwrap(); |
| stream_server |
| .on_event(fsys::Event { |
| header: Some(fsys::EventHeader { |
| event_type: Some(fsys::EventType::CapabilityReady), |
| moniker: Some("./foo:0/bar:0".to_string()), |
| component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cmx".to_string()), |
| timestamp: Some(zx::Time::get_monotonic().into_nanos()), |
| ..fsys::EventHeader::EMPTY |
| }), |
| event_result: Some(fsys::EventResult::Payload( |
| fsys::EventPayload::CapabilityReady(fsys::CapabilityReadyPayload { |
| name: Some("diagnostics".to_string()), |
| node: Some(node), |
| ..fsys::CapabilityReadyPayload::EMPTY |
| }), |
| )), |
| ..fsys::Event::EMPTY |
| }) |
| .expect("send diagnostics ready event ok"); |
| |
| // Send a Stopped event. |
| stream_server |
| .on_event(fsys::Event { |
| header: Some(fsys::EventHeader { |
| event_type: Some(fsys::EventType::Stopped), |
| moniker: Some("./foo:0/bar:0".to_string()), |
| component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cmx".to_string()), |
| timestamp: Some(zx::Time::get_monotonic().into_nanos()), |
| ..fsys::EventHeader::EMPTY |
| }), |
| |
| ..fsys::Event::EMPTY |
| }) |
| .expect("send stopped event ok"); |
| |
| let expected_component_id = ComponentIdentifier::Moniker("./foo:0/bar:0".to_string()); |
| |
| let shared_data = EventMetadata { |
| component_id: expected_component_id.clone(), |
| component_url: "fuchsia-pkg://fuchsia.com/foo#meta/bar.cmx".to_string(), |
| timestamp: zx::Time::get_monotonic(), |
| }; |
| // Assert the first received event was a Start event. |
| let event = event_stream.next().await.unwrap(); |
| compare_events_ignore_timestamp_and_payload( |
| &event, |
| &ComponentEvent::Start(StartEvent { metadata: shared_data.clone() }), |
| ); |
| |
| // Assert the second received event was a Running event. |
| let event = event_stream.next().await.unwrap(); |
| compare_events_ignore_timestamp_and_payload( |
| &event, |
| &ComponentEvent::Running(RunningEvent { |
| metadata: shared_data.clone(), |
| component_start_time: zx::Time::from_nanos(0), |
| }), |
| ); |
| |
| // Assert the third received event was a CapabilityReady event for diagnostics. |
| let event = event_stream.next().await.unwrap(); |
| match event { |
| ComponentEvent::DiagnosticsReady(DiagnosticsReadyEvent { |
| metadata, |
| directory: Some(_), |
| }) => assert_eq!(metadata.component_id, expected_component_id), |
| _ => assert!(false), |
| } |
| |
| // Assert the last received event was a Stop event. |
| let event = event_stream.next().await.unwrap(); |
| compare_events_ignore_timestamp_and_payload( |
| &event, |
| &ComponentEvent::Stop(StopEvent { metadata: shared_data.clone() }), |
| ); |
| } |
| |
| pub fn compare_events_ignore_timestamp_and_payload( |
| event1: &ComponentEvent, |
| event2: &ComponentEvent, |
| ) { |
| let metadata_comparator = |x: &EventMetadata, y: &EventMetadata| { |
| assert_eq!(x.component_id, y.component_id); |
| assert_eq!(x.component_url, y.component_url); |
| }; |
| |
| // Need to explicitly check every case despite the logic being the same since rust |
| // requires multi-case match arms to have variable bindings be the same type in every |
| // case. This isn't doable in our polymorphic event enums. |
| match (event1, event2) { |
| (ComponentEvent::Start(x), ComponentEvent::Start(y)) => { |
| metadata_comparator(&x.metadata, &y.metadata) |
| } |
| (ComponentEvent::Stop(x), ComponentEvent::Stop(y)) => { |
| metadata_comparator(&x.metadata, &y.metadata) |
| } |
| (ComponentEvent::Running(x), ComponentEvent::Running(y)) => { |
| metadata_comparator(&x.metadata, &y.metadata) |
| } |
| (ComponentEvent::DiagnosticsReady(x), ComponentEvent::DiagnosticsReady(y)) => { |
| metadata_comparator(&x.metadata, &y.metadata) |
| } |
| _ => panic!( |
| "Events are not equal, they are different enumerations: {:?}, {:?}", |
| event1, event2 |
| ), |
| } |
| } |
| |
| fn spawn_fake_event_source( |
| ) -> (fsys::EventSourceProxy, RemoteHandle<ClientEnd<fsys::EventStreamMarker>>) { |
| let (source, mut request_stream) = |
| fidl::endpoints::create_proxy_and_stream::<fsys::EventSourceMarker>().unwrap(); |
| let (f, stream_client_end_fut) = async move { |
| if let Some(request) = |
| request_stream.try_next().await.expect("error running fake event source") |
| { |
| match request { |
| fsys::EventSourceRequest::Subscribe { events, stream, responder } => { |
| assert_eq!( |
| events, |
| vec!["running", "started", "stopped", "diagnostics_ready",] |
| ); |
| responder.send(&mut Ok(())).expect("responder send ok"); |
| return stream; |
| } |
| } |
| } |
| unreachable!("This shouldn't be exercised"); |
| } |
| .remote_handle(); |
| fasync::Task::spawn(f).detach(); |
| (source, stream_client_end_fut) |
| } |
| } |