| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use crate::message::PublisherRequestMessage; |
| use anyhow::{anyhow, Error}; |
| use async_trait::async_trait; |
| use fidl::endpoints::{RequestStream, ServerEnd}; |
| use fidl_fuchsia_debugdata as fdebug; |
| use fidl_fuchsia_sys2 as fsys; |
| use fidl_fuchsia_test_internal as ftest_internal; |
| use fidl_fuchsia_test_manager as ftest_manager; |
| use fuchsia_inspect::types::Node; |
| use fuchsia_zircon as zx; |
| use futures::{channel::mpsc, lock::Mutex, SinkExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; |
| use log::{error, warn}; |
| use moniker::{ChildMoniker, RelativeMoniker, RelativeMonikerBase}; |
| use std::{ |
| collections::{HashMap, HashSet}, |
| sync::{Arc, Weak}, |
| }; |
| |
| #[async_trait(?Send)] |
| pub trait PublishRequestHandler { |
| /// Handle a stream of |DebugData| connection requests. The final processed data |
| /// is reported over the |iterator| channel. |
| async fn handle_publish_requests( |
| &self, |
| publish_request_recv: mpsc::Receiver<PublisherRequestMessage>, |
| iterator: ftest_manager::DebugDataIteratorRequestStream, |
| ) -> Result<(), Error>; |
| } |
| |
| /// Processes |DebugDataController| requests and component lifecycle events. |
| /// This collects the debug data sets that are defined over the |DebugDataController| |
| /// and |DebugDataSetController| processes, and demultiplixes the events into streams |
| /// per data set. |
| /// |
| /// The output is a terminated stream of |DebugData| connection requests and an |DebugDataIterator| |
| /// request for each set. This output is passed to |publish_request_handler| for processing. |
| /// |
| /// The stream is terminated by observing the passed lifecycle events and determining when all |
| /// realms in a set have stopped. To do this, a number of assumptions about ordering of events: |
| /// * A new realm is reported via DebugDataSetController::AddRealm before any events for that |
| /// realm are sent |
| /// * After a realm is stopped, no child components will start under it |
| /// If these assumptions are broken, events may be lost. |
| /// |
| /// |timeout_after_finish| is used to stop draining events after Finish() has been called on the |
| /// controller. This is a workaround for missing Destroy events. |
| /// TODO(fxbug.dev/94274): Remove this workaround once the missing Destroy events are fixed or use |
| /// Stopped instead. |
| pub async fn handle_debug_data_controller_and_events<CS, D>( |
| controller_requests: CS, |
| events: fsys::EventStreamRequestStream, |
| publish_request_handler: D, |
| timeout_after_finish: zx::Duration, |
| inspect_node: &Node, |
| ) where |
| CS: Stream<Item = Result<ftest_internal::DebugDataControllerRequest, fidl::Error>> |
| + std::marker::Unpin, |
| D: PublishRequestHandler, |
| { |
| let debug_data_sets: Mutex<Vec<Weak<Mutex<inner::DebugDataSet>>>> = Mutex::new(vec![]); |
| let debug_data_sets_ref = &debug_data_sets; |
| let publish_request_handler_ref = &publish_request_handler; |
| |
| let inspect_node_count = std::sync::atomic::AtomicU32::new(0); |
| let inspect_node_count_ref = &inspect_node_count; |
| |
| let controller_fut = controller_requests.for_each_concurrent(None, |request_result| { |
| async move { |
| let ftest_internal::DebugDataControllerRequest::NewSet { iter, controller, .. } = |
| request_result?; |
| let iter = iter.into_stream()?; |
| let controller = controller.into_stream()?; |
| let controller_handle = controller.control_handle(); |
| let (publish_request_send, publish_request_recv) = mpsc::channel(5); |
| let debug_data_set = |
| inner::DebugDataSet::new(publish_request_send, move |debug_data_requested| { |
| if debug_data_requested { |
| let _ = controller_handle.send_on_debug_data_produced(); |
| } |
| }) |
| .with_timeout_after_finish(timeout_after_finish) |
| .with_inspect( |
| inspect_node, |
| &format!( |
| "{:?}", |
| inspect_node_count_ref.fetch_add(1, std::sync::atomic::Ordering::Relaxed) |
| ), |
| ); |
| debug_data_sets_ref.lock().await.push(Arc::downgrade(&debug_data_set)); |
| futures::future::try_join( |
| serve_debug_data_set_controller(&*debug_data_set, controller), |
| publish_request_handler_ref.handle_publish_requests(publish_request_recv, iter), |
| ) |
| .await?; |
| Result::<(), Error>::Ok(()) |
| } |
| .unwrap_or_else(|e| warn!("Error serving debug data set: {:?}", e)) |
| }); |
| let event_fut = route_events(events, debug_data_sets_ref) |
| .unwrap_or_else(|e| error!("Error routing debug data events: {:?}", e)); |
| futures::future::join(controller_fut, event_fut).await; |
| } |
| |
| async fn route_events( |
| mut event_stream: fsys::EventStreamRequestStream, |
| sets: &Mutex<Vec<Weak<Mutex<inner::DebugDataSet>>>>, |
| ) -> Result<(), Error> { |
| while let Some(req_res) = event_stream.next().await { |
| let fsys::EventStreamRequest::OnEvent { event, .. } = match req_res { |
| Ok(req) => req, |
| Err(e) => { |
| warn!("Error getting event: {:?}", e); |
| continue; |
| } |
| }; |
| let header = event.header.as_ref().unwrap(); |
| let moniker = RelativeMoniker::parse(&header.moniker.as_ref().unwrap()).unwrap(); |
| let mut locked_sets = sets.lock().await; |
| let mut active_sets = vec![]; |
| locked_sets.retain(|weak_set| match Weak::upgrade(weak_set) { |
| Some(set) => { |
| active_sets.push(set); |
| true |
| } |
| None => false, |
| }); |
| drop(locked_sets); |
| let mut matched = false; |
| for active_set in active_sets { |
| let mut locked_set = active_set.lock().await; |
| if locked_set.includes_moniker(&moniker) { |
| locked_set |
| .handle_event(event) |
| .await |
| .unwrap_or_else(|e| warn!("Error handling event: {:?}", e)); |
| matched = true; |
| break; |
| } |
| } |
| if !matched { |
| match moniker.down_path().get(0) { |
| Some(child_moniker) if child_moniker.collection.is_some() => { |
| warn!("Unhandled event moniker {}", moniker) |
| } |
| // suppress warning if the moniker isn't in a collection (and thus isn't a test). |
| None | Some(_) => (), |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn serve_debug_data_set_controller( |
| set: &Mutex<inner::DebugDataSet>, |
| mut controller: ftest_internal::DebugDataSetControllerRequestStream, |
| ) -> Result<(), Error> { |
| let mut finish_called = false; |
| let control_fut = async { |
| while let Some(request) = controller.try_next().await? { |
| match request { |
| ftest_internal::DebugDataSetControllerRequest::AddRealm { |
| realm_moniker, |
| url, |
| responder, |
| } => { |
| let mut result = match set.lock().await.add_realm(realm_moniker, url) { |
| Ok(()) => Ok(()), |
| Err(e) => { |
| warn!("Error while adding realm: {:?}", e); |
| Err(zx::Status::INVALID_ARGS.into_raw()) |
| } |
| }; |
| let _ = responder.send(&mut result)?; |
| } |
| ftest_internal::DebugDataSetControllerRequest::RemoveRealm { |
| realm_moniker, |
| .. |
| } => match set.lock().await.remove_realm(realm_moniker).await { |
| Ok(()) => (), |
| Err(e) => warn!("Error removing a realm: {:?}", e), |
| }, |
| ftest_internal::DebugDataSetControllerRequest::Finish { .. } => { |
| finish_called = true; |
| break; |
| } |
| } |
| } |
| Ok::<_, fidl::Error>(()) |
| }; |
| let result = control_fut.await; |
| set.lock().await.complete_set().await; |
| result?; |
| match finish_called { |
| true => Ok(()), |
| false => Err(anyhow!("Controller client did not call finish!")), |
| } |
| } |
| |
| mod inner { |
| use { |
| super::*, |
| fuchsia_async as fasync, |
| fuchsia_inspect::{ArrayProperty, LazyNode}, |
| futures::FutureExt, |
| moniker::ChildMonikerBase, |
| }; |
| |
| /// Callback invoked when the DataSet determines that there is or is not any debug data |
| /// produced. The parameter is true iff debug data was produced. |
| type Callback = Box<dyn 'static + Fn(bool) + Send + Sync>; |
| |
| /// A container that tracks the current known state of realms in a debug data set. |
| pub(super) struct DebugDataSet { |
| realms: HashMap<ChildMoniker, String>, |
| running_components: HashSet<RelativeMoniker>, |
| destroyed_before_start: HashSet<RelativeMoniker>, |
| seen_realms: HashSet<ChildMoniker>, |
| done_adding_realms: bool, |
| sender: mpsc::Sender<PublisherRequestMessage>, |
| on_capability_event: Arc<Mutex<Option<Callback>>>, |
| finish_timeout_task: Option<fasync::Task<()>>, |
| /// Timeout after Finish() is called to stop waiting for events and serve any |
| /// debug data. |
| // TODO(fxbug.dev/94274): This is in place due to rare missing destroy events that |
| // lead to timeouts. We should either switch to using Stopped, or remove this timeout |
| // once missing destroy event is fixed. |
| timeout_after_finish: zx::Duration, |
| inspect_node: Option<LazyNode>, |
| } |
| |
| impl DebugDataSet { |
| /// Create a new DebugDataSet. |
| pub fn new<F: 'static + Fn(bool) + Send + Sync>( |
| sender: mpsc::Sender<PublisherRequestMessage>, |
| on_capability_event: F, |
| ) -> Self { |
| Self { |
| realms: HashMap::new(), |
| running_components: HashSet::new(), |
| destroyed_before_start: HashSet::new(), |
| seen_realms: HashSet::new(), |
| done_adding_realms: false, |
| sender, |
| on_capability_event: Arc::new(Mutex::new(Some(Box::new(on_capability_event)))), |
| finish_timeout_task: None, |
| timeout_after_finish: zx::Duration::INFINITE, |
| inspect_node: None, |
| } |
| } |
| |
| // TODO(fxbug.dev/93280): array creation panics if a slot size larger than |
| // 255 is specified. Remove this maximum once this is fixed in fuchsia_inspect. |
| const MAX_INSPECT_ARRAY_SIZE: usize = u8::MAX as usize; |
| |
| pub fn with_timeout_after_finish(self, timeout: zx::Duration) -> Self { |
| Self { timeout_after_finish: timeout, ..self } |
| } |
| |
| /// Attach an inspect node to the DebugDataSet under |parent_node|. |
| pub fn with_inspect(self, parent_node: &Node, name: &str) -> Arc<Mutex<Self>> { |
| let arc_self = Arc::new(Mutex::new(self)); |
| let weak_self = Arc::downgrade(&arc_self); |
| let lazy_node_fn = move || { |
| let weak_clone = weak_self.clone(); |
| async move { |
| let inspector = fuchsia_inspect::Inspector::new(); |
| let root = inspector.root(); |
| if let Some(this_mutex) = weak_clone.upgrade() { |
| let this_lock = this_mutex.lock().await; |
| let this = &*this_lock; |
| root.record_child("realms", |realm_node| { |
| for (realm, url) in this.realms.iter() { |
| realm_node.record_string(realm.as_str(), &url); |
| } |
| }); |
| root.record_int( |
| "num_running_components", |
| this.running_components.len() as i64, |
| ); |
| let running_components = root.create_string_array( |
| "running_components", |
| std::cmp::min( |
| this.running_components.len(), |
| Self::MAX_INSPECT_ARRAY_SIZE, |
| ), |
| ); |
| for (idx, cmp) in this |
| .running_components |
| .iter() |
| .take(Self::MAX_INSPECT_ARRAY_SIZE) |
| .enumerate() |
| { |
| running_components.set(idx, format!("{}", cmp)) |
| } |
| root.record(running_components); |
| root.record_int( |
| "num_destroyed_before_start", |
| this.destroyed_before_start.len() as i64, |
| ); |
| let destroyed_before_start = root.create_string_array( |
| "destroyed_before_start", |
| std::cmp::min( |
| this.destroyed_before_start.len(), |
| Self::MAX_INSPECT_ARRAY_SIZE, |
| ), |
| ); |
| for (idx, cmp) in this |
| .destroyed_before_start |
| .iter() |
| .take(Self::MAX_INSPECT_ARRAY_SIZE) |
| .enumerate() |
| { |
| destroyed_before_start.set(idx, format!("{}", cmp)) |
| } |
| root.record(destroyed_before_start); |
| root.record_int("num_seen_realms", this.seen_realms.len() as i64); |
| let seen_realms = root.create_string_array( |
| "seen_realms", |
| std::cmp::min(this.seen_realms.len(), Self::MAX_INSPECT_ARRAY_SIZE), |
| ); |
| for (idx, cmp) in |
| this.seen_realms.iter().take(Self::MAX_INSPECT_ARRAY_SIZE).enumerate() |
| { |
| seen_realms.set(idx, format!("{}", cmp)) |
| } |
| root.record(seen_realms); |
| root.record_bool("done_adding_realms", this.done_adding_realms); |
| } |
| Ok(inspector) |
| } |
| .boxed() |
| }; |
| |
| // Lock can't fail since we created the mutex above and control all the handles. |
| arc_self |
| .try_lock() |
| .unwrap() |
| .inspect_node |
| .replace(parent_node.create_lazy_child(name, lazy_node_fn)); |
| arc_self |
| } |
| |
| pub fn includes_moniker(&self, moniker: &RelativeMoniker) -> bool { |
| // Assumes that the test realm is contained in a collection under test_manager. |
| if !moniker.up_path().is_empty() { |
| return false; |
| } |
| match moniker.down_path().iter().next() { |
| None => false, |
| Some(child_moniker) => self.realms.contains_key(child_moniker), |
| } |
| } |
| |
| pub fn add_realm(&mut self, moniker: String, url: String) -> Result<(), Error> { |
| let moniker_child = realm_moniker_child(moniker)?; |
| self.realms.insert(moniker_child, url); |
| Ok(()) |
| } |
| |
| pub async fn remove_realm(&mut self, moniker: String) -> Result<(), Error> { |
| let moniker_child = realm_moniker_child(moniker)?; |
| self.realms.remove(&moniker_child); |
| self.running_components |
| .retain(|component_moniker| component_moniker.down_path()[0] != moniker_child); |
| self.destroyed_before_start |
| .retain(|component_moniker| component_moniker.down_path()[0] != moniker_child); |
| self.seen_realms.remove(&moniker_child); |
| self.close_sink_if_done().await; |
| Ok(()) |
| } |
| |
| pub async fn complete_set(&mut self) { |
| self.done_adding_realms = true; |
| self.close_sink_if_done().await; |
| |
| let mut sender_clone = self.sender.clone(); |
| let on_capability_event_clone = self.on_capability_event.clone(); |
| let timeout = self.timeout_after_finish; |
| self.finish_timeout_task = Some(fasync::Task::spawn(async move { |
| fasync::Timer::new(timeout).await; |
| // This log is detected in triage. Update the config in |
| // src/diagnostics/config/triage/test_manager.triage when changing this log. |
| warn!("Debug data timeout invoked"); |
| sender_clone.close_channel(); |
| on_capability_event_clone.lock().await.take().map(|callback| callback(false)); |
| })); |
| } |
| |
| pub async fn handle_event(&mut self, event: fsys::Event) -> Result<(), Error> { |
| let header = event.header.as_ref().ok_or(anyhow!("Event contained no header"))?; |
| let unparsed_moniker = |
| header.moniker.as_ref().ok_or(anyhow!("Event contained no moniker"))?; |
| let moniker = RelativeMoniker::parse(unparsed_moniker)?; |
| |
| let realm_id = moniker |
| .down_path() |
| .iter() |
| .next() |
| .ok_or(anyhow!("Event moniker contains empty down path"))? |
| .clone(); |
| |
| match header.event_type.ok_or(anyhow!("Event contained no event type"))? { |
| fsys::EventType::CapabilityRequested => { |
| let test_url = self.realms.get(&realm_id).unwrap().clone(); |
| let request = publish_request_from_event(event); |
| if let Err(e) = |
| self.sender.send(PublisherRequestMessage { test_url, request }).await |
| { |
| warn!( |
| "Dropping debug data request from {} for test {}: {:?}", |
| moniker, |
| self.realms.get(&realm_id).unwrap(), |
| e |
| ); |
| } |
| self.on_capability_event.lock().await.take().map(|callback| callback(true)); |
| } |
| fsys::EventType::Started => { |
| if self.destroyed_before_start.remove(&moniker) { |
| warn!("Got a destroy event before start event for {}", moniker); |
| } else { |
| self.seen_realms.insert(realm_id); |
| self.running_components.insert(moniker); |
| } |
| } |
| // TODO(fxbug.dev/86503): Sometimes an instance may be destroyed before it is |
| // started or stopped. So we listen for destroyed instead of stopped, and record |
| // instances for which we destroyed but never got a start event for. |
| fsys::EventType::Destroyed => { |
| if !self.running_components.remove(&moniker) { |
| self.seen_realms.insert(realm_id); |
| self.destroyed_before_start.insert(moniker); |
| } |
| self.close_sink_if_done().await; |
| } |
| other => warn!("Got unhandled event type: {:?}", other), |
| } |
| Ok(()) |
| } |
| |
| async fn close_sink_if_done(&mut self) { |
| if self.done_adding_realms |
| && self.running_components.is_empty() |
| && self.seen_realms.len() == self.realms.len() |
| { |
| self.sender.close_channel(); |
| self.on_capability_event.lock().await.take().map(|callback| callback(false)); |
| } |
| } |
| } |
| |
| fn realm_moniker_child(realm_moniker: String) -> Result<ChildMoniker, Error> { |
| let moniker = RelativeMoniker::parse(&realm_moniker)?; |
| let moniker_is_valid = moniker.up_path().is_empty() && moniker.down_path().len() == 1; |
| match moniker_is_valid { |
| true => Ok(moniker.down_path()[0].clone()), |
| false => { |
| Err(anyhow!("Moniker {:?} invalidates assumptions about test topology", moniker)) |
| } |
| } |
| } |
| |
| fn publish_request_from_event(event: fsys::Event) -> ServerEnd<fdebug::PublisherMarker> { |
| let result = event.event_result.unwrap(); |
| match result { |
| fsys::EventResult::Payload(fsys::EventPayload::CapabilityRequested( |
| fsys::CapabilityRequestedPayload { name: _, capability, .. }, |
| )) => { |
| // TODO: Check name and other stuff. |
| ServerEnd::new(capability.unwrap()) |
| } |
| _ => panic!("unexpected payload"), |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::super::testing::*; |
| use super::*; |
| use fuchsia_inspect::assert_data_tree; |
| use maplit::hashmap; |
| use std::task::Poll; |
| |
| #[fuchsia::test] |
| fn includes_moniker() { |
| let added_realms = vec![ |
| ("./test:child-1", "test-url-1"), |
| ("./test:child-2", "test-url-2"), |
| ("./system-test:child-1", "test-url-3"), |
| ("./system-test:child-2", "test-url-4"), |
| ]; |
| let included_monikers = vec![ |
| "./test:child-1", |
| "./test:child-1/sub1", |
| "./test:child-1/sub1/sub2:child", |
| "./system-test:child-1", |
| "./system-test:child-2/sub1:child/sub2", |
| ]; |
| let excluded_monikers = vec![ |
| ".", |
| ".\\super/test:child-1", |
| "./test:child-3", |
| "./test:child-3/sub1", |
| "./realm", |
| "./realm/sub1", |
| "./system-test:child3", |
| ]; |
| |
| let (send, _) = mpsc::channel(1); |
| let mut set = DebugDataSet::new(send, |_| ()); |
| for (realm, url) in added_realms { |
| set.add_realm(realm.to_string(), url.to_string()).expect("add realm"); |
| } |
| for moniker in included_monikers { |
| let parsed = RelativeMoniker::parse(moniker).unwrap(); |
| assert!(set.includes_moniker(&parsed), "Expected {} to be in the set", moniker); |
| } |
| for moniker in excluded_monikers { |
| let parsed = RelativeMoniker::parse(moniker).unwrap(); |
| assert!( |
| !set.includes_moniker(&parsed), |
| "Expected {} to not be in the set", |
| moniker |
| ); |
| } |
| } |
| |
| fn common_test_realms() -> Vec<(&'static str, &'static str)> { |
| vec![("./test:child-1", "test-url-1"), ("./system-test:child-2", "test-url-2")] |
| } |
| |
| fn common_test_events() -> Vec<fsys::Event> { |
| vec![ |
| start_event("./test:child-1"), |
| capability_event("./test:child-1"), |
| destroy_event("./test:child-1"), |
| start_event("./system-test:child-2"), |
| capability_event("./system-test:child-2"), |
| start_event("./system-test:child-2/sub1"), |
| capability_event("./system-test:child-2/sub1"), |
| destroy_event("./system-test:child-2/sub1"), |
| destroy_event("./system-test:child-2"), |
| ] |
| } |
| |
| /// Collect the requests sent on the receiver and count by test url. |
| async fn collect_requests_to_count( |
| recv: mpsc::Receiver<PublisherRequestMessage>, |
| ) -> HashMap<String, u32> { |
| let mut occurrences = HashMap::new(); |
| recv.for_each(|message| { |
| occurrences.entry(message.test_url).and_modify(|count| *count += 1).or_insert(1); |
| futures::future::ready(()) |
| }) |
| .await; |
| occurrences |
| } |
| |
| #[fuchsia::test] |
| async fn no_debug_data() { |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(!got_data)); |
| for (realm, url) in common_test_realms() { |
| set.add_realm(realm.to_string(), url.to_string()).expect("add realm"); |
| set.handle_event(start_event(realm)).await.expect("handle event"); |
| set.handle_event(destroy_event(realm)).await.expect("handle event"); |
| } |
| set.complete_set().await; |
| assert_eq!(collect_requests_to_count(recv).await, hashmap! {}); |
| } |
| |
| #[fuchsia::test] |
| async fn complete_set_before_realm_events_complete() { |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(got_data)); |
| for (realm, url) in common_test_realms() { |
| set.add_realm(realm.to_string(), url.to_string()).expect("add realm"); |
| } |
| // If the set is marked complete, then events finish, stream should terminate. |
| set.complete_set().await; |
| for event in common_test_events() { |
| set.handle_event(event).await.expect("handle event"); |
| } |
| |
| assert_eq!( |
| collect_requests_to_count(recv).await, |
| hashmap! { |
| "test-url-1".to_string() => 1, |
| "test-url-2".to_string() => 2 |
| } |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn complete_set_after_realm_events_complete() { |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(got_data)); |
| for (realm, url) in common_test_realms() { |
| set.add_realm(realm.to_string(), url.to_string()).expect("add realm"); |
| } |
| // If events finish, then set is marked complete, the stream should terminate. |
| for event in common_test_events() { |
| set.handle_event(event).await.expect("handle event"); |
| } |
| set.complete_set().await; |
| |
| assert_eq!( |
| collect_requests_to_count(recv).await, |
| hashmap! { |
| "test-url-1".to_string() => 1, |
| "test-url-2".to_string() => 2 |
| } |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn add_realm_after_initial_realm_completes() { |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(got_data)); |
| set.add_realm("./test:realm-1".to_string(), "test-url-1".to_string()) |
| .expect("add realm"); |
| set.handle_event(start_event("./test:realm-1")).await.expect("handle event"); |
| set.handle_event(capability_event("./test:realm-1")).await.expect("handle event"); |
| set.handle_event(destroy_event("./test:realm-1")).await.expect("handle event"); |
| // At this point, realm-1 has stopped, but the set should remain open as we may still |
| // add additional realms. |
| set.add_realm("./test:realm-2".to_string(), "test-url-2".to_string()) |
| .expect("add realm"); |
| set.handle_event(start_event("./test:realm-2")).await.expect("handle event"); |
| set.handle_event(capability_event("./test:realm-2")).await.expect("handle event"); |
| set.handle_event(destroy_event("./test:realm-2")).await.expect("handle event"); |
| set.complete_set().await; |
| // Requests for both realms should be present. |
| assert_eq!( |
| collect_requests_to_count(recv).await, |
| hashmap! { |
| "test-url-1".to_string() => 1, |
| "test-url-2".to_string() => 1 |
| } |
| ); |
| } |
| |
| #[fuchsia::test] |
| async fn remove_realm_before_it_produces_events() { |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(got_data)); |
| set.add_realm("./test:realm-1".to_string(), "test-url-1".to_string()) |
| .expect("add realm"); |
| set.handle_event(start_event("./test:realm-1")).await.expect("handle event"); |
| set.handle_event(capability_event("./test:realm-1")).await.expect("handle event"); |
| set.handle_event(destroy_event("./test:realm-1")).await.expect("handle event"); |
| // At this point, realm-1 has stopped, but the set should remain open as we may still |
| // add additional realms. |
| set.add_realm("./test:realm-2".to_string(), "test-url-2".to_string()) |
| .expect("add realm"); |
| set.remove_realm("./test:realm-2".to_string()).await.expect("remove realm"); |
| set.complete_set().await; |
| // Requests for only the realm that wasn't removed should be present. |
| assert_eq!( |
| collect_requests_to_count(recv).await, |
| hashmap! { |
| "test-url-1".to_string() => 1, |
| } |
| ); |
| } |
| |
| #[fuchsia::test] |
| fn timeout_after_finish() { |
| const TIMEOUT: zx::Duration = zx::Duration::from_seconds(10); |
| |
| let mut executor = fasync::TestExecutor::new_with_fake_time().expect("create executor"); |
| let (send, recv) = mpsc::channel(10); |
| let mut set = DebugDataSet::new(send, |got_data| assert!(!got_data)) |
| .with_timeout_after_finish(TIMEOUT); |
| |
| let mut fut_1 = async { |
| set.add_realm("./test:realm-1".to_string(), "test-url-1".to_string()) |
| .expect("add realm"); |
| set.handle_event(start_event("./test:realm-1")).await.expect("handle event"); |
| // add a start event without corresponding destroy event |
| set.handle_event(start_event("./test:realm-1/test_wrapper/hermetic_resolver")) |
| .await |
| .expect("handle event"); |
| set.handle_event(destroy_event("./test:realm-1")).await.expect("handle event"); |
| set.complete_set().await; |
| } |
| .boxed(); |
| assert_eq!(executor.run_until_stalled(&mut fut_1), Poll::Ready(())); |
| |
| // after processing all events request collection should be stalled. |
| let mut collect_requests_fut = collect_requests_to_count(recv).boxed(); |
| assert_eq!(executor.run_until_stalled(&mut collect_requests_fut), Poll::Pending); |
| |
| // after waking up the timer, the request collection should complete, ignoring the |
| // missing event. |
| let time_before = executor.now(); |
| assert_eq!(executor.wake_next_timer().unwrap(), time_before + TIMEOUT); |
| let requests = match executor.run_until_stalled(&mut collect_requests_fut) { |
| Poll::Ready(req) => req, |
| Poll::Pending => panic!("Expected requests to be ready but was pending"), |
| }; |
| assert_eq!(requests, hashmap! {}); |
| } |
| |
| #[fuchsia::test] |
| async fn export_inspect() { |
| let inspector = fuchsia_inspect::Inspector::new(); |
| let (send, recv) = mpsc::channel(10); |
| let set = DebugDataSet::new(send, |got_data| assert!(!got_data)) |
| .with_inspect(inspector.root(), "set"); |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: {}, |
| running_components: Vec::<String>::new(), |
| seen_realms: Vec::<String>::new(), |
| done_adding_realms: false, |
| destroyed_before_start: Vec::<String>::new() |
| } |
| } |
| ); |
| |
| set.lock() |
| .await |
| .add_realm("./test:realm-1".to_string(), "test-url-1".to_string()) |
| .expect("add realm"); |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: { |
| "test:realm-1": "test-url-1" |
| }, |
| done_adding_realms: false, |
| num_running_components: 0i64, |
| running_components: Vec::<String>::new(), |
| num_seen_realms: 0i64, |
| seen_realms: Vec::<String>::new(), |
| } |
| } |
| ); |
| |
| set.lock() |
| .await |
| .handle_event(start_event("./test:realm-1")) |
| .await |
| .expect("handle event"); |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: { |
| "test:realm-1": "test-url-1" |
| }, |
| done_adding_realms: false, |
| num_running_components: 1i64, |
| running_components: vec!["./test:realm-1"], |
| num_seen_realms: 1i64, |
| seen_realms: vec!["test:realm-1"] |
| } |
| } |
| ); |
| |
| set.lock() |
| .await |
| .handle_event(destroy_event("./test:realm-1")) |
| .await |
| .expect("handle event"); |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: { |
| "test:realm-1": "test-url-1" |
| }, |
| done_adding_realms: false, |
| running_components: Vec::<String>::new(), |
| seen_realms: vec!["test:realm-1"] |
| } |
| } |
| ); |
| |
| set.lock().await.complete_set().await; |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: { |
| "test:realm-1": "test-url-1" |
| }, |
| done_adding_realms: true, |
| running_components: Vec::<String>::new(), |
| seen_realms: vec!["test:realm-1"] |
| } |
| } |
| ); |
| // Requests for only the realm that wasn't removed should be present. |
| assert!(collect_requests_to_count(recv).await.is_empty()); |
| |
| drop(set); |
| assert_data_tree!( |
| inspector, |
| root: {} |
| ); |
| } |
| |
| /// A PropertyAssertion impl that expects an array property to contain a subset of some |
| /// string values. |
| struct SubsetProperty { |
| expected_superset: HashSet<String>, |
| expected_size: usize, |
| } |
| |
| impl SubsetProperty { |
| fn new(expected_superset: HashSet<String>, expected_size: usize) -> Self { |
| Self { expected_superset, expected_size } |
| } |
| } |
| |
| impl<K> fuchsia_inspect::testing::PropertyAssertion<K> for SubsetProperty { |
| fn run(&self, actual: &fuchsia_inspect::hierarchy::Property<K>) -> Result<(), Error> { |
| match actual { |
| fuchsia_inspect::hierarchy::Property::StringList(_, ref string_list) => { |
| let set: HashSet<String> = string_list.iter().cloned().collect(); |
| match set.is_subset(&self.expected_superset) { |
| true => match set.len() == self.expected_size { |
| true => Ok(()), |
| false => Err(anyhow!( |
| "Expected a set of size {:?} but got set {:?}", |
| self.expected_size, |
| set |
| )), |
| }, |
| false => Err(anyhow!( |
| "Expected a subset of {:?} but got {:?}", |
| self.expected_superset, |
| set |
| )), |
| } |
| } |
| _ => Err(anyhow!("Expected a string list")), |
| } |
| } |
| } |
| |
| #[fuchsia::test] |
| async fn export_inspect_truncate_on_overflow() { |
| let inspector = fuchsia_inspect::Inspector::new(); |
| let (send, _) = mpsc::channel(10); |
| let set = DebugDataSet::new(send, |got_data| assert!(!got_data)) |
| .with_inspect(inspector.root(), "set"); |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| realms: {}, |
| running_components: Vec::<String>::new(), |
| seen_realms: Vec::<String>::new(), |
| done_adding_realms: false, |
| destroyed_before_start: Vec::<String>::new() |
| } |
| } |
| ); |
| |
| const OVERFLOW_COUNT: usize = DebugDataSet::MAX_INSPECT_ARRAY_SIZE + 1; |
| for idx in 0..OVERFLOW_COUNT { |
| let realm = format!("./test:{:?}", idx); |
| let url = format!("url-{:?}", idx); |
| set.lock().await.add_realm(realm, url).expect("add realm"); |
| } |
| |
| for idx in 0..OVERFLOW_COUNT { |
| // add destroyed child realms first to test destroyed_before_start |
| let realm = format!("./test:{:?}", idx); |
| let moniker = format!("./test:{:?}/child", idx); |
| set.lock().await.handle_event(destroy_event(&moniker)).await.expect("handle event"); |
| set.lock().await.handle_event(start_event(&realm)).await.expect("handle event"); |
| } |
| |
| assert_data_tree!( |
| inspector, |
| root: { |
| set: contains { |
| num_destroyed_before_start: OVERFLOW_COUNT as i64, |
| destroyed_before_start: SubsetProperty::new( |
| (0..OVERFLOW_COUNT).map(|idx| format!("./test:{:?}/child", idx)).collect(), |
| DebugDataSet::MAX_INSPECT_ARRAY_SIZE |
| ), |
| num_running_components: OVERFLOW_COUNT as i64, |
| running_components: SubsetProperty::new( |
| (0..OVERFLOW_COUNT).map(|idx| format!("./test:{:?}", idx)).collect(), |
| DebugDataSet::MAX_INSPECT_ARRAY_SIZE |
| ), |
| num_seen_realms: OVERFLOW_COUNT as i64, |
| seen_realms: SubsetProperty::new( |
| (0..OVERFLOW_COUNT).map(|idx| format!("test:{:?}", idx)).collect(), |
| DebugDataSet::MAX_INSPECT_ARRAY_SIZE |
| ), |
| } |
| } |
| ); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod testing { |
| use super::*; |
| use fidl::endpoints::ProtocolMarker; |
| |
| pub(super) fn start_event(moniker: &str) -> fsys::Event { |
| fsys::Event { |
| header: fsys::EventHeader { |
| event_type: fsys::EventType::Started.into(), |
| moniker: moniker.to_string().into(), |
| ..fsys::EventHeader::EMPTY |
| } |
| .into(), |
| event_result: Some(fsys::EventResult::Payload(fsys::EventPayload::Started( |
| fsys::StartedPayload::EMPTY, |
| ))), |
| ..fsys::Event::EMPTY |
| } |
| } |
| |
| pub(super) fn destroy_event(moniker: &str) -> fsys::Event { |
| fsys::Event { |
| header: fsys::EventHeader { |
| event_type: fsys::EventType::Destroyed.into(), |
| moniker: moniker.to_string().into(), |
| ..fsys::EventHeader::EMPTY |
| } |
| .into(), |
| event_result: Some(fsys::EventResult::Payload(fsys::EventPayload::Destroyed( |
| fsys::DestroyedPayload::EMPTY, |
| ))), |
| ..fsys::Event::EMPTY |
| } |
| } |
| |
| pub(super) fn capability_event(moniker: &str) -> fsys::Event { |
| let (_client, server) = zx::Channel::create().unwrap(); |
| fsys::Event { |
| header: fsys::EventHeader { |
| event_type: fsys::EventType::CapabilityRequested.into(), |
| moniker: moniker.to_string().into(), |
| ..fsys::EventHeader::EMPTY |
| } |
| .into(), |
| event_result: Some(fsys::EventResult::Payload( |
| fsys::EventPayload::CapabilityRequested(fsys::CapabilityRequestedPayload { |
| name: fdebug::PublisherMarker::NAME.to_string().into(), |
| capability: Some(server), |
| ..fsys::CapabilityRequestedPayload::EMPTY |
| }), |
| )), |
| ..fsys::Event::EMPTY |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use super::testing::*; |
| use super::*; |
| use assert_matches::assert_matches; |
| use fidl::endpoints::{create_proxy, create_proxy_and_stream}; |
| use futures::Future; |
| use maplit::hashmap; |
| use std::sync::atomic::{AtomicU32, Ordering}; |
| |
| /// A |PublishRequestHandler| implementation that counts the number of requests per state, and |
| /// sends them on completion. |
| struct TestPublishRequestHandler(AtomicU32, mpsc::Sender<DebugSetState>); |
| |
| #[derive(PartialEq, Debug)] |
| struct DebugSetState { |
| /// Number of requests received for each test URL. |
| requests_for_url: HashMap<String, u32>, |
| /// Index of the set (in order they are created.) |
| index: u32, |
| } |
| |
| impl TestPublishRequestHandler { |
| fn new() -> (Self, mpsc::Receiver<DebugSetState>) { |
| let (send, recv) = mpsc::channel(10); |
| (Self(AtomicU32::new(0), send), recv) |
| } |
| } |
| |
| #[async_trait(?Send)] |
| impl PublishRequestHandler for TestPublishRequestHandler { |
| async fn handle_publish_requests( |
| &self, |
| mut publish_request_recv: mpsc::Receiver<PublisherRequestMessage>, |
| _iterator: ftest_manager::DebugDataIteratorRequestStream, |
| ) -> Result<(), Error> { |
| let index = self.0.fetch_add(1, Ordering::Relaxed); |
| let mut requests_for_url = HashMap::new(); |
| |
| while let Some(debug_request) = publish_request_recv.next().await { |
| requests_for_url |
| .entry(debug_request.test_url) |
| .and_modify(|count| *count += 1) |
| .or_insert(1); |
| } |
| let _ = self.1.clone().send(DebugSetState { requests_for_url, index }).await; |
| Ok(()) |
| } |
| } |
| |
| async fn controller_and_event_test<F, Fut>(test_fn: F) |
| where |
| F: Fn( |
| ftest_internal::DebugDataControllerProxy, |
| fsys::EventStreamProxy, |
| mpsc::Receiver<DebugSetState>, |
| ) -> Fut, |
| Fut: Future<Output = ()>, |
| { |
| let (controller_request_proxy, controller_request_stream) = |
| create_proxy_and_stream::<ftest_internal::DebugDataControllerMarker>().unwrap(); |
| let (event_proxy, event_request_stream) = |
| create_proxy_and_stream::<fsys::EventStreamMarker>().unwrap(); |
| let (request_handler, request_recv) = TestPublishRequestHandler::new(); |
| let ((), ()) = futures::future::join( |
| handle_debug_data_controller_and_events( |
| controller_request_stream, |
| event_request_stream, |
| request_handler, |
| zx::Duration::INFINITE, |
| fuchsia_inspect::Inspector::new().root(), |
| ), |
| test_fn(controller_request_proxy, event_proxy, request_recv), |
| ) |
| .await; |
| } |
| |
| fn create_set_contoller_proxy() -> ( |
| ftest_internal::DebugDataSetControllerProxy, |
| ServerEnd<ftest_internal::DebugDataSetControllerMarker>, |
| ) { |
| create_proxy::<ftest_internal::DebugDataSetControllerMarker>().unwrap() |
| } |
| |
| fn create_iterator_proxy( |
| ) -> (ftest_manager::DebugDataIteratorProxy, ServerEnd<ftest_manager::DebugDataIteratorMarker>) |
| { |
| create_proxy::<ftest_manager::DebugDataIteratorMarker>().unwrap() |
| } |
| |
| const TEST_REALM: &str = "./test:child-1"; |
| const TEST_URL: &str = "test-url-1"; |
| |
| #[fuchsia::test] |
| async fn route_single_set_no_debug_data() { |
| controller_and_event_test(|controller_proxy, event_proxy, mut request_recv| async move { |
| let (set_controller, set_server) = create_set_contoller_proxy(); |
| let (_iterator, iterator_server) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server, set_server).expect("create new set"); |
| |
| // Add a realm and send events showing it started and stopped. |
| set_controller |
| .add_realm(TEST_REALM, TEST_URL) |
| .await |
| .expect("add realm") |
| .expect("add realm returned error"); |
| set_controller.finish().expect("finish set"); |
| event_proxy.on_event(start_event(TEST_REALM)).expect("start event"); |
| event_proxy.on_event(destroy_event(TEST_REALM)).expect("start event"); |
| |
| // Since no debug data was produced, no OnDebugData event is produced. |
| assert!(set_controller.take_event_stream().next().await.is_none()); |
| let set_state = request_recv.next().await.unwrap(); |
| assert_eq!(set_state, DebugSetState { requests_for_url: hashmap! {}, index: 0 }); |
| }) |
| .await; |
| } |
| |
| #[fuchsia::test] |
| async fn route_single_set_with_debug_data() { |
| controller_and_event_test(|controller_proxy, event_proxy, mut request_recv| async move { |
| let (set_controller, set_server) = create_set_contoller_proxy(); |
| let (_iterator, iterator_server) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server, set_server).expect("create new set"); |
| |
| // Add a realm and send events showing it started and stopped. |
| set_controller |
| .add_realm(TEST_REALM, TEST_URL) |
| .await |
| .expect("add realm") |
| .expect("add realm returned error"); |
| set_controller.finish().expect("finish set"); |
| event_proxy.on_event(start_event(TEST_REALM)).expect("start event"); |
| event_proxy.on_event(capability_event(TEST_REALM)).expect("capability event"); |
| event_proxy |
| .on_event(capability_event(&format!("{}/child1", TEST_REALM))) |
| .expect("capability event"); |
| event_proxy.on_event(destroy_event(TEST_REALM)).expect("start event"); |
| |
| // OnDebugData event is produced. |
| assert_matches!( |
| set_controller.take_event_stream().next().await.unwrap().unwrap(), |
| ftest_internal::DebugDataSetControllerEvent::OnDebugDataProduced { .. } |
| ); |
| let set_state = request_recv.next().await.unwrap(); |
| assert_eq!( |
| set_state, |
| DebugSetState { |
| requests_for_url: hashmap! { |
| TEST_URL.to_string() => 2, |
| }, |
| index: 0, |
| } |
| ); |
| }) |
| .await; |
| } |
| |
| #[fuchsia::test] |
| async fn destroy_before_start_okay() { |
| // This test verifies that the stream won't hang if a Destroyed event comes before Started |
| controller_and_event_test(|controller_proxy, event_proxy, mut request_recv| async move { |
| let (set_controller, set_server) = create_set_contoller_proxy(); |
| let (_iterator, iterator_server) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server, set_server).expect("create new set"); |
| |
| // Add a realm and send events showing it started and stopped. |
| set_controller |
| .add_realm(TEST_REALM, TEST_URL) |
| .await |
| .expect("add realm") |
| .expect("add realm returned error"); |
| set_controller.finish().expect("finish set"); |
| event_proxy.on_event(destroy_event(TEST_REALM)).expect("destroy event"); |
| event_proxy.on_event(start_event(TEST_REALM)).expect("start event"); |
| |
| // Since no debug data was produced, no OnDebugData event is produced. |
| assert!(set_controller.take_event_stream().next().await.is_none()); |
| let set_state = request_recv.next().await.unwrap(); |
| assert_eq!(set_state, DebugSetState { requests_for_url: hashmap! {}, index: 0 }); |
| }) |
| .await; |
| } |
| |
| #[fuchsia::test] |
| async fn route_multiple_sets_with_debug_data() { |
| controller_and_event_test(|controller_proxy, event_proxy, mut request_recv| async move { |
| let set_1_realms = |
| vec![("./test:child-1", "test-url-1-1"), ("./system-test:child-1", "test-url-1-2")]; |
| let set_2_realms = |
| vec![("./test:child-2", "test-url-2-1"), ("./system-test:child-2", "test-url-2-2")]; |
| |
| let (set_controller_1, set_server_1) = create_set_contoller_proxy(); |
| let (_iterator_1, iterator_server_1) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server_1, set_server_1).expect("create new set"); |
| |
| let (set_controller_2, set_server_2) = create_set_contoller_proxy(); |
| let (_iterator_2, iterator_server_2) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server_2, set_server_2).expect("create new set"); |
| |
| for (realm, url) in set_1_realms.iter() { |
| set_controller_1 |
| .add_realm(realm, url) |
| .await |
| .expect("add realm") |
| .expect("add_realm returned error"); |
| event_proxy.on_event(start_event(realm)).expect("send event"); |
| event_proxy.on_event(capability_event(realm)).expect("send event"); |
| event_proxy.on_event(destroy_event(realm)).expect("send event"); |
| } |
| for (realm, url) in set_2_realms.iter() { |
| set_controller_2 |
| .add_realm(realm, url) |
| .await |
| .expect("add realm") |
| .expect("add_realm returned error"); |
| event_proxy.on_event(start_event(realm)).expect("send event"); |
| event_proxy.on_event(capability_event(realm)).expect("send event"); |
| event_proxy.on_event(destroy_event(realm)).expect("send event"); |
| } |
| set_controller_1.finish().expect("finish set 1"); |
| set_controller_2.finish().expect("finish set 2"); |
| |
| assert_matches!( |
| set_controller_1.take_event_stream().next().await.unwrap().unwrap(), |
| ftest_internal::DebugDataSetControllerEvent::OnDebugDataProduced { .. } |
| ); |
| assert_matches!( |
| set_controller_2.take_event_stream().next().await.unwrap().unwrap(), |
| ftest_internal::DebugDataSetControllerEvent::OnDebugDataProduced { .. } |
| ); |
| |
| let set_state_1 = request_recv.next().await.unwrap(); |
| assert_eq!( |
| set_state_1, |
| DebugSetState { |
| requests_for_url: hashmap! { |
| "test-url-1-1".to_string() => 1, |
| "test-url-1-2".to_string() => 1, |
| }, |
| index: 0, |
| } |
| ); |
| |
| let set_state_2 = request_recv.next().await.unwrap(); |
| assert_eq!( |
| set_state_2, |
| DebugSetState { |
| requests_for_url: hashmap! { |
| "test-url-2-1".to_string() => 1, |
| "test-url-2-2".to_string() => 1, |
| }, |
| index: 1, |
| } |
| ); |
| }) |
| .await; |
| } |
| |
| #[fuchsia::test] |
| async fn terminate_set_if_controller_terminates_without_finish() { |
| controller_and_event_test(|controller_proxy, event_proxy, mut request_recv| async move { |
| let (set_controller, set_server) = create_set_contoller_proxy(); |
| let (_iterator, iterator_server) = create_iterator_proxy(); |
| controller_proxy.new_set(iterator_server, set_server).expect("create new set"); |
| |
| set_controller |
| .add_realm(TEST_REALM, TEST_URL) |
| .await |
| .expect("add realm") |
| .expect("add realm returned error"); |
| drop(set_controller); |
| // drop event and controller proxy too so that the input streams terminate. |
| drop(controller_proxy); |
| drop(event_proxy); |
| assert!(request_recv.next().await.is_none()); |
| }) |
| .await; |
| } |
| } |