| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use { |
| crate::{ |
| constants, |
| container::{PopulatedInspectDataContainer, ReadSnapshot, SnapshotData}, |
| diagnostics::DiagnosticsServerStats, |
| repository::DiagnosticsDataRepository, |
| }, |
| anyhow::Error, |
| collector::Moniker, |
| diagnostics_data::{self as schema, Data, Inspect}, |
| fidl_fuchsia_diagnostics::{self, Selector}, |
| fuchsia_inspect::reader::PartialNodeHierarchy, |
| fuchsia_inspect_node_hierarchy::{InspectHierarchyMatcher, NodeHierarchy}, |
| fuchsia_zircon as zx, |
| futures::prelude::*, |
| log::error, |
| parking_lot::RwLock, |
| selectors, |
| std::{ |
| convert::{TryFrom, TryInto}, |
| sync::Arc, |
| }, |
| }; |
| |
| pub mod collector; |
| |
| /// Packet containing a node hierarchy and all the metadata needed to |
| /// populate a diagnostics schema for that node hierarchy. |
| pub struct NodeHierarchyData { |
| // Name of the file that created this snapshot. |
| filename: String, |
| // Timestamp at which this snapshot resolved or failed. |
| timestamp: zx::Time, |
| // Errors encountered when processing this snapshot. |
| errors: Vec<schema::Error>, |
| // Optional NodeHierarchy of the inspect hierarchy, in case reading fails |
| // and we have errors to share with client. |
| hierarchy: Option<NodeHierarchy>, |
| } |
| |
| impl Into<NodeHierarchyData> for SnapshotData { |
| fn into(self: SnapshotData) -> NodeHierarchyData { |
| match self.snapshot { |
| Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) { |
| Ok(node_hierarchy) => NodeHierarchyData { |
| filename: self.filename, |
| timestamp: self.timestamp, |
| errors: self.errors, |
| hierarchy: Some(node_hierarchy), |
| }, |
| Err(e) => NodeHierarchyData { |
| filename: self.filename, |
| timestamp: self.timestamp, |
| errors: vec![schema::Error { message: format!("{:?}", e) }], |
| hierarchy: None, |
| }, |
| }, |
| None => NodeHierarchyData { |
| filename: self.filename, |
| timestamp: self.timestamp, |
| errors: self.errors, |
| hierarchy: None, |
| }, |
| } |
| } |
| } |
| |
| /// ReaderServer holds the state and data needed to serve Inspect data |
| /// reading requests for a single client. |
| /// |
| /// configured_selectors: are the selectors provided by the client which define |
| /// what inspect data is returned by read requests. A none type |
| /// implies that all available data should be returned. |
| /// |
| /// inspect_repo: the DiagnosticsDataRepository which holds the access-points for all relevant |
| /// inspect data. |
| pub struct ReaderServer { |
| selectors: Option<Vec<Arc<Selector>>>, |
| } |
| |
| fn convert_snapshot_to_node_hierarchy(snapshot: ReadSnapshot) -> Result<NodeHierarchy, Error> { |
| match snapshot { |
| ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()), |
| ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(), |
| ReadSnapshot::Finished(hierarchy) => Ok(hierarchy), |
| } |
| } |
| |
| pub struct BatchResultItem { |
| /// Relative moniker of the component associated with this result. |
| pub moniker: Moniker, |
| /// The url with which the component associated with this result was launched. |
| pub component_url: String, |
| /// The resulting Node hierarchy plus some metadata. |
| pub hierarchy_data: NodeHierarchyData, |
| } |
| |
| impl ReaderServer { |
| /// Create a stream of filtered inspect data, ready to serve. |
| pub fn stream( |
| inspect_repo: Arc<RwLock<DiagnosticsDataRepository>>, |
| timeout: Option<i64>, |
| selectors: Option<Vec<Selector>>, |
| stats: Arc<DiagnosticsServerStats>, |
| ) -> impl Stream<Item = Data<Inspect>> + Send + 'static { |
| let selectors = selectors.map(|s| s.into_iter().map(Arc::new).collect()); |
| let repo_data = inspect_repo.read().fetch_inspect_data(&selectors).into_iter(); |
| |
| let server = Self { selectors }; |
| let timeout = timeout.unwrap_or(constants::PER_COMPONENT_ASYNC_TIMEOUT_SECONDS); |
| |
| futures::stream::iter(repo_data) |
| // make a stream of futures of populated Vec's |
| .map(move |unpopulated| { |
| let global_stats = stats.global_stats().clone(); |
| |
| // this returns a future, which means the closure capture must be 'static |
| async move { |
| let start_time = zx::Time::get_monotonic(); |
| let global_stats_2 = global_stats.clone(); |
| let result = |
| unpopulated.populate(timeout, move || global_stats.add_timeout()).await; |
| global_stats_2.record_component_duration( |
| &result.relative_moniker.join("/"), |
| zx::Time::get_monotonic() - start_time, |
| ); |
| result |
| } |
| }) |
| // buffer a small number in memory in case later components time out |
| .buffer_unordered(constants::MAXIMUM_SIMULTANEOUS_SNAPSHOTS_PER_READER) |
| // filter each component's inspect |
| .map(move |populated| server.filter_snapshot(populated)) |
| // turn each of the vecs of filtered snapshots into their own streams |
| .map(futures::stream::iter) |
| // and merge them all into a single stream |
| .flatten() |
| } |
| |
| fn filter_single_components_snapshots( |
| snapshots: Vec<SnapshotData>, |
| static_matcher: Option<InspectHierarchyMatcher>, |
| client_matcher: Option<InspectHierarchyMatcher>, |
| ) -> Vec<NodeHierarchyData> { |
| let statically_filtered_hierarchies: Vec<NodeHierarchyData> = match static_matcher { |
| Some(static_matcher) => snapshots |
| .into_iter() |
| .map(|snapshot_data| { |
| let node_hierarchy_data: NodeHierarchyData = snapshot_data.into(); |
| |
| match node_hierarchy_data.hierarchy { |
| Some(node_hierarchy) => { |
| match fuchsia_inspect_node_hierarchy::filter_node_hierarchy( |
| node_hierarchy, |
| &static_matcher, |
| ) { |
| Ok(filtered_hierarchy_opt) => NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: node_hierarchy_data.errors, |
| hierarchy: filtered_hierarchy_opt, |
| }, |
| Err(e) => { |
| error!("Archivist failed to filter a node hierarchy: {:?}", e); |
| NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: vec![schema::Error { message: format!("{:?}", e) }], |
| hierarchy: None, |
| } |
| } |
| } |
| } |
| None => NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: node_hierarchy_data.errors, |
| hierarchy: None, |
| }, |
| } |
| }) |
| .collect(), |
| |
| // The only way we have a None value for the PopulatedDataContainer is |
| // if there were no provided static selectors, which is only valid in |
| // the AllAccess pipeline. For all other pipelines, if no static selectors |
| // matched, the data wouldn't have ended up in the repository to begin |
| // with. |
| None => snapshots.into_iter().map(|snapshot_data| snapshot_data.into()).collect(), |
| }; |
| |
| match client_matcher { |
| // If matcher is present, and there was an InspectHierarchyMatcher, |
| // then this means the client provided their own selectors, and a subset of |
| // them matched this component. So we need to filter each of the snapshots from |
| // this component with the dynamically provided components. |
| Some(dynamic_matcher) => statically_filtered_hierarchies |
| .into_iter() |
| .map(|node_hierarchy_data| match node_hierarchy_data.hierarchy { |
| Some(node_hierarchy) => { |
| match fuchsia_inspect_node_hierarchy::filter_node_hierarchy( |
| node_hierarchy, |
| &dynamic_matcher, |
| ) { |
| Ok(filtered_hierarchy_opt) => NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: node_hierarchy_data.errors, |
| hierarchy: filtered_hierarchy_opt, |
| }, |
| Err(e) => NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: vec![schema::Error { message: format!("{:?}", e) }], |
| hierarchy: None, |
| }, |
| } |
| } |
| None => NodeHierarchyData { |
| filename: node_hierarchy_data.filename, |
| timestamp: node_hierarchy_data.timestamp, |
| errors: node_hierarchy_data.errors, |
| hierarchy: None, |
| }, |
| }) |
| .collect(), |
| None => statically_filtered_hierarchies, |
| } |
| } |
| |
| /// Takes a PopulatedInspectDataContainer and converts all non-error |
| /// results into in-memory node hierarchies. The hierarchies are filtered |
| /// such that the only diagnostics properties they contain are those |
| /// configured by the static and client-provided selectors. |
| /// |
| // TODO(fxbug.dev/4601): Error entries should still be included, but with a custom hierarchy |
| // that makes it clear to clients that snapshotting failed. |
| fn filter_snapshot( |
| &self, |
| pumped_inspect_data: PopulatedInspectDataContainer, |
| ) -> Vec<Data<Inspect>> { |
| // Since a single PopulatedInspectDataContainer shares a moniker for all pieces of data it |
| // contains, we can store the result of component selector filtering to avoid reapplying |
| // the selectors. |
| let mut client_selectors: Option<InspectHierarchyMatcher> = None; |
| |
| // We iterate the vector of pumped inspect data packets, consuming each inspect vmo |
| // and filtering it using the provided selector regular expressions. Each filtered |
| // inspect hierarchy is then added to an accumulator as a HierarchyData to be converted |
| // into a JSON string and returned. |
| let sanitized_moniker = pumped_inspect_data |
| .relative_moniker |
| .iter() |
| .map(|s| selectors::sanitize_string_for_selectors(s)) |
| .collect::<Vec<String>>() |
| .join("/"); |
| |
| if let Some(configured_selectors) = &self.selectors { |
| client_selectors = { |
| let matching_selectors = selectors::match_component_moniker_against_selectors( |
| &pumped_inspect_data.relative_moniker, |
| configured_selectors, |
| ) |
| .unwrap_or_else(|err| { |
| error!( |
| "Failed to evaluate client selectors for: {:?} Error: {:?}", |
| pumped_inspect_data.relative_moniker, err |
| ); |
| Vec::new() |
| }); |
| |
| if matching_selectors.is_empty() { |
| None |
| } else { |
| match (&matching_selectors).try_into() { |
| Ok(hierarchy_matcher) => Some(hierarchy_matcher), |
| Err(e) => { |
| error!("Failed to create hierarchy matcher: {:?}", e); |
| None |
| } |
| } |
| } |
| }; |
| |
| // If there were configured matchers and none of them matched |
| // this component, then we should return early since there is no data to |
| // extract. |
| if client_selectors.is_none() { |
| return vec![]; |
| } |
| } |
| |
| let component_url = pumped_inspect_data.component_url; |
| ReaderServer::filter_single_components_snapshots( |
| pumped_inspect_data.snapshots, |
| pumped_inspect_data.inspect_matcher, |
| client_selectors, |
| ) |
| .into_iter() |
| .map(|hierarchy_data| { |
| Data::for_inspect( |
| sanitized_moniker.clone(), |
| hierarchy_data.hierarchy, |
| hierarchy_data.timestamp.into_nanos(), |
| component_url.clone(), |
| hierarchy_data.filename, |
| hierarchy_data.errors, |
| ) |
| }) |
| .collect() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::collector::InspectDataCollector, |
| super::*, |
| crate::{ |
| diagnostics, |
| events::types::{ComponentIdentifier, InspectData, LegacyIdentifier, RealmPath}, |
| logs::LogManager, |
| server::AccessorServer, |
| }, |
| anyhow::format_err, |
| fdio, |
| fidl::endpoints::{create_proxy_and_stream, DiscoverableService}, |
| fidl_fuchsia_diagnostics::{BatchIteratorMarker, BatchIteratorProxy, StreamMode}, |
| fidl_fuchsia_inspect::TreeMarker, |
| fidl_fuchsia_io::DirectoryMarker, |
| fuchsia_async::{self as fasync, Task}, |
| fuchsia_component::server::ServiceFs, |
| fuchsia_inspect::{assert_inspect_tree, reader, testing::AnyProperty, Inspector}, |
| fuchsia_inspect_node_hierarchy::{trie::TrieIterableNode, NodeHierarchy}, |
| fuchsia_zircon as zx, |
| fuchsia_zircon::Peered, |
| futures::future::join_all, |
| futures::{FutureExt, StreamExt}, |
| serde_json::json, |
| std::path::PathBuf, |
| }; |
| |
| const TEST_URL: &'static str = "fuchsia-pkg://test"; |
| const BATCH_RETRIEVAL_TIMEOUT_SECONDS: Option<i64> = Some(300); |
| |
| fn get_vmo(text: &[u8]) -> zx::Vmo { |
| let vmo = zx::Vmo::create(4096).unwrap(); |
| vmo.write(text, 0).unwrap(); |
| vmo |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn inspect_data_collector() { |
| let path = PathBuf::from("/test-bindings"); |
| // Make a ServiceFs containing two files. |
| // One is an inspect file, and one is not. |
| let mut fs = ServiceFs::new(); |
| let vmo = get_vmo(b"test1"); |
| let vmo2 = get_vmo(b"test2"); |
| let vmo3 = get_vmo(b"test3"); |
| let vmo4 = get_vmo(b"test4"); |
| fs.dir("diagnostics").add_vmo_file_at("root.inspect", vmo, 0, 4096); |
| fs.dir("diagnostics").add_vmo_file_at("root_not_inspect", vmo2, 0, 4096); |
| fs.dir("diagnostics").dir("a").add_vmo_file_at("root.inspect", vmo3, 0, 4096); |
| fs.dir("diagnostics").dir("b").add_vmo_file_at("root.inspect", vmo4, 0, 4096); |
| // Create a connection to the ServiceFs. |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.join("out").to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| |
| let thread_path = path.join("out/diagnostics"); |
| |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let path = thread_path; |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| executor.run_singlethreaded(async { |
| let collector = InspectDataCollector::new(); |
| // Trigger collection on a clone of the inspect collector so |
| // we can use collector to take the collected data. |
| Box::new(collector.clone()).collect(path).await.unwrap(); |
| let collector: Box<InspectDataCollector> = Box::new(collector); |
| |
| let extra_data = collector.take_data().expect("collector missing data"); |
| assert_eq!(3, extra_data.len()); |
| |
| let assert_extra_data = |path: &str, content: &[u8]| { |
| let extra = extra_data.get(path); |
| assert!(extra.is_some()); |
| |
| match extra.unwrap() { |
| InspectData::Vmo(vmo) => { |
| let mut buf = [0u8; 5]; |
| vmo.read(&mut buf, 0).expect("reading vmo"); |
| assert_eq!(content, &buf); |
| } |
| v => { |
| panic!("Expected Vmo, got {:?}", v); |
| } |
| } |
| }; |
| |
| assert_extra_data("root.inspect", b"test1"); |
| assert_extra_data("a/root.inspect", b"test3"); |
| assert_extra_data("b/root.inspect", b"test4"); |
| |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.join("out").to_str().unwrap()).unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn inspect_data_collector_tree() { |
| let path = PathBuf::from("/test-bindings2"); |
| |
| // Make a ServiceFs serving an inspect tree. |
| let mut fs = ServiceFs::new(); |
| let inspector = Inspector::new(); |
| inspector.root().record_int("a", 1); |
| inspector.root().record_lazy_child("lazy", || { |
| async move { |
| let inspector = Inspector::new(); |
| inspector.root().record_double("b", 3.14); |
| Ok(inspector) |
| } |
| .boxed() |
| }); |
| inspector.serve(&mut fs).expect("failed to serve inspector"); |
| |
| // Create a connection to the ServiceFs. |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.join("out").to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| let thread_path = path.join("out/diagnostics"); |
| |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let path = thread_path; |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| executor.run_singlethreaded(async { |
| let collector = InspectDataCollector::new(); |
| |
| //// Trigger collection on a clone of the inspect collector so |
| //// we can use collector to take the collected data. |
| Box::new(collector.clone()).collect(path).await.unwrap(); |
| let collector: Box<InspectDataCollector> = Box::new(collector); |
| |
| let extra_data = collector.take_data().expect("collector missing data"); |
| assert_eq!(1, extra_data.len()); |
| |
| let extra = extra_data.get(TreeMarker::SERVICE_NAME); |
| assert!(extra.is_some()); |
| |
| match extra.unwrap() { |
| InspectData::Tree(tree, vmo) => { |
| // Assert we can read the tree proxy and get the data we expected. |
| let hierarchy = reader::read_from_tree(&tree) |
| .await |
| .expect("failed to read hierarchy from tree"); |
| assert_inspect_tree!(hierarchy, root: { |
| a: 1i64, |
| lazy: { |
| b: 3.14, |
| } |
| }); |
| let partial_hierarchy: NodeHierarchy = |
| PartialNodeHierarchy::try_from(vmo.as_ref().unwrap()) |
| .expect("failed to read hierarchy from vmo") |
| .into(); |
| // Assert the vmo also points to that data (in this case since there's no |
| // lazy nodes). |
| assert_inspect_tree!(partial_hierarchy, root: { |
| a: 1i64, |
| }); |
| } |
| v => { |
| panic!("Expected Tree, got {:?}", v); |
| } |
| } |
| |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.join("out").to_str().unwrap()).unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn reader_server_formatting() { |
| let path = PathBuf::from("/test-bindings3"); |
| |
| // Make a ServiceFs containing two files. |
| // One is an inspect file, and one is not. |
| let mut fs = ServiceFs::new(); |
| let vmo = zx::Vmo::create(4096).unwrap(); |
| let inspector = inspector_for_reader_test(); |
| |
| let data = inspector.copy_vmo_data().unwrap(); |
| vmo.write(&data, 0).unwrap(); |
| fs.dir("diagnostics").add_vmo_file_at("test.inspect", vmo, 0, 4096); |
| |
| // Create a connection to the ServiceFs. |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.join("out").to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| let thread_path = path.join("out"); |
| |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let path = thread_path; |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| executor.run_singlethreaded(async { |
| verify_reader(path).await; |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.join("out").to_str().unwrap()).unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn read_server_formatting_tree() { |
| let path = PathBuf::from("/test-bindings4"); |
| |
| // Make a ServiceFs containing two files. |
| // One is an inspect file, and one is not. |
| let mut fs = ServiceFs::new(); |
| let inspector = inspector_for_reader_test(); |
| inspector.serve(&mut fs).expect("failed to serve inspector"); |
| |
| // Create a connection to the ServiceFs. |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.join("out").to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| let thread_path = path.join("out"); |
| |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let path = thread_path; |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| executor.run_singlethreaded(async { |
| verify_reader(path).await; |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.join("out").to_str().unwrap()).unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn reader_server_reports_errors() { |
| let path = PathBuf::from("/test-bindings-errors-01"); |
| |
| // Make a ServiceFs containing something that looks like an inspect file but is not. |
| let mut fs = ServiceFs::new(); |
| let vmo = zx::Vmo::create(4096).unwrap(); |
| fs.dir("diagnostics").add_vmo_file_at("test.inspect", vmo, 0, 4096); |
| |
| // Create a connection to the ServiceFs. |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.join("out").to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| let thread_path = path.join("out"); |
| |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let path = thread_path; |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| executor.run_singlethreaded(async { |
| verify_reader_with_mode(path, VerifyMode::ExpectComponentFailure).await; |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.join("out").to_str().unwrap()).unwrap(); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn inspect_repo_disallows_duplicated_dirs() { |
| let mut inspect_repo = DiagnosticsDataRepository::new(LogManager::new(), None); |
| let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]); |
| let instance_id = "1234".to_string(); |
| |
| let component_id = ComponentIdentifier::Legacy(LegacyIdentifier { |
| instance_id, |
| realm_path, |
| component_name: "foo.cmx".into(), |
| }); |
| let (proxy, _) = |
| fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy"); |
| |
| inspect_repo |
| .add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0)) |
| .expect("add to repo"); |
| |
| let (proxy, _) = |
| fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy"); |
| |
| inspect_repo |
| .add_inspect_artifacts(component_id.clone(), TEST_URL, proxy, zx::Time::from_nanos(0)) |
| .expect("add to repo"); |
| |
| let key = component_id.unique_key(); |
| assert_eq!(inspect_repo.data_directories.get(key).unwrap().get_values().len(), 1); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn three_directories_two_batches() { |
| stress_test_diagnostics_repository(vec![33, 33, 33], vec![64, 35]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn max_batch_intact_two_batches_merged() { |
| stress_test_diagnostics_repository(vec![64, 63, 1], vec![64, 64]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn sixty_four_vmos_packed_into_one_batch() { |
| stress_test_diagnostics_repository([1usize; 64].to_vec(), vec![64]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn component_with_more_than_max_batch_size_is_split_in_two() { |
| stress_test_diagnostics_repository(vec![65], vec![64, 1]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn errorful_component_doesnt_halt_iteration() { |
| stress_test_diagnostics_repository(vec![64, 65, 64, 64], vec![64, 64, 64, 64, 1]).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn merge_errorful_component_into_next_batch() { |
| stress_test_diagnostics_repository(vec![63, 65], vec![64, 64]).await; |
| } |
| |
| async fn stress_test_diagnostics_repository( |
| directory_vmo_counts: Vec<usize>, |
| expected_batch_results: Vec<usize>, |
| ) { |
| let path = PathBuf::from("/stress_test_root_directory"); |
| |
| let dir_name_and_filecount: Vec<(String, usize)> = directory_vmo_counts |
| .into_iter() |
| .enumerate() |
| .map(|(index, filecount)| (format!("diagnostics_{}", index), filecount)) |
| .collect(); |
| |
| // Make a ServiceFs that will host inspect vmos under each |
| // of the new diagnostics directories. |
| let mut fs = ServiceFs::new(); |
| let log_manager = LogManager::new(); |
| |
| let inspector = inspector_for_reader_test(); |
| |
| for (directory_name, filecount) in dir_name_and_filecount.clone() { |
| for i in 0..filecount { |
| let vmo = inspector |
| .duplicate_vmo() |
| .ok_or(format_err!("Failed to duplicate VMO")) |
| .unwrap(); |
| |
| let size = vmo.get_size().unwrap(); |
| fs.dir(directory_name.clone()).add_vmo_file_at( |
| format!("root_{}.inspect", i), |
| vmo, |
| 0, /* vmo offset */ |
| size, |
| ); |
| } |
| } |
| let (h0, h1) = zx::Channel::create().unwrap(); |
| fs.serve_connection(h1).unwrap(); |
| |
| // We bind the root of the FS that hosts our 3 test dirs to |
| // stress_test_root_dir. Now each directory can be found at |
| // stress_test_root_dir/diagnostics_<i> |
| let ns = fdio::Namespace::installed().unwrap(); |
| ns.bind(path.to_str().unwrap(), h0).unwrap(); |
| |
| fasync::Task::spawn(fs.collect()).detach(); |
| |
| let (done0, done1) = zx::Channel::create().unwrap(); |
| |
| let cloned_path = path.clone(); |
| // Run the actual test in a separate thread so that it does not block on FS operations. |
| // Use signalling on a zx::Channel to indicate that the test is done. |
| std::thread::spawn(move || { |
| let done = done1; |
| let mut executor = fasync::Executor::new().unwrap(); |
| |
| executor.run_singlethreaded(async { |
| let id_and_directory_proxy = |
| join_all(dir_name_and_filecount.iter().map(|(dir, _)| { |
| let new_async_clone = cloned_path.clone(); |
| async move { |
| let full_path = new_async_clone.join(dir); |
| let proxy = InspectDataCollector::find_directory_proxy(&full_path) |
| .await |
| .unwrap(); |
| let unique_cid = ComponentIdentifier::Legacy(LegacyIdentifier { |
| instance_id: "1234".into(), |
| realm_path: vec![].into(), |
| component_name: format!("component_{}.cmx", dir).into(), |
| }); |
| (unique_cid, proxy) |
| } |
| })) |
| .await; |
| |
| let inspect_repo = Arc::new(RwLock::new(DiagnosticsDataRepository::new( |
| log_manager.clone(), |
| None, |
| ))); |
| |
| for (cid, proxy) in id_and_directory_proxy { |
| inspect_repo |
| .write() |
| .add_inspect_artifacts( |
| cid.clone(), |
| TEST_URL, |
| proxy, |
| zx::Time::from_nanos(0), |
| ) |
| .unwrap(); |
| } |
| |
| let inspector = Inspector::new(); |
| let root = inspector.root(); |
| let test_archive_accessor_node = root.create_child("test_archive_accessor_node"); |
| |
| let test_accessor_stats = |
| Arc::new(diagnostics::ArchiveAccessorStats::new(test_archive_accessor_node)); |
| let test_batch_iterator_stats1 = Arc::new( |
| diagnostics::DiagnosticsServerStats::for_inspect(test_accessor_stats.clone()), |
| ); |
| |
| let _result_json = read_snapshot_verify_batch_count_and_batch_size( |
| inspect_repo.clone(), |
| expected_batch_results, |
| test_batch_iterator_stats1, |
| ) |
| .await; |
| |
| done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer"); |
| }); |
| }); |
| |
| fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap(); |
| ns.unbind(path.to_str().unwrap()).unwrap(); |
| } |
| |
| fn inspector_for_reader_test() -> Inspector { |
| let inspector = Inspector::new(); |
| let root = inspector.root(); |
| let child_1 = root.create_child("child_1"); |
| child_1.record_int("some-int", 2); |
| let child_1_1 = child_1.create_child("child_1_1"); |
| child_1_1.record_int("some-int", 3); |
| child_1_1.record_int("not-wanted-int", 4); |
| root.record(child_1_1); |
| root.record(child_1); |
| let child_2 = root.create_child("child_2"); |
| child_2.record_int("some-int", 2); |
| root.record(child_2); |
| inspector |
| } |
| |
| enum VerifyMode { |
| ExpectSuccess, |
| ExpectComponentFailure, |
| } |
| |
| async fn verify_reader(path: PathBuf) { |
| verify_reader_with_mode(path, VerifyMode::ExpectSuccess).await; |
| } |
| |
| async fn verify_reader_with_mode(path: PathBuf, mode: VerifyMode) { |
| let child_1_1_selector = selectors::parse_selector(r#"*:root/child_1/*:some-int"#).unwrap(); |
| let child_2_selector = |
| selectors::parse_selector(r#"test_component.cmx:root/child_2:*"#).unwrap(); |
| let inspect_repo = Arc::new(RwLock::new(DiagnosticsDataRepository::new( |
| LogManager::new(), |
| Some(vec![Arc::new(child_1_1_selector), Arc::new(child_2_selector)]), |
| ))); |
| |
| let out_dir_proxy = InspectDataCollector::find_directory_proxy(&path).await.unwrap(); |
| |
| // The absolute moniker here is made up since the selector is a glob |
| // selector, so any path would match. |
| let component_id = ComponentIdentifier::Legacy(LegacyIdentifier { |
| instance_id: "1234".into(), |
| realm_path: vec![].into(), |
| component_name: "test_component.cmx".into(), |
| }); |
| |
| let inspector = Inspector::new(); |
| let root = inspector.root(); |
| let test_archive_accessor_node = root.create_child("test_archive_accessor_node"); |
| |
| assert_inspect_tree!(inspector, root: {test_archive_accessor_node: {}}); |
| |
| let test_accessor_stats = |
| Arc::new(diagnostics::ArchiveAccessorStats::new(test_archive_accessor_node)); |
| |
| let test_batch_iterator_stats1 = |
| Arc::new(diagnostics::DiagnosticsServerStats::for_inspect(test_accessor_stats.clone())); |
| |
| assert_inspect_tree!(inspector, root: { |
| test_archive_accessor_node: { |
| archive_accessor_connections_closed: 0u64, |
| archive_accessor_connections_opened: 0u64, |
| inspect_batch_iterator_connection0:{ |
| inspect_batch_iterator_terminal_responses: 0u64, |
| inspect_batch_iterator_get_next_responses: 0u64, |
| inspect_batch_iterator_get_next_requests: 0u64, |
| }, |
| inspect_batch_iterator_connections_closed: 0u64, |
| inspect_batch_iterator_connections_opened: 0u64, |
| inspect_batch_iterator_get_next_errors: 0u64, |
| inspect_batch_iterator_get_next_requests: 0u64, |
| inspect_batch_iterator_get_next_responses: 0u64, |
| inspect_batch_iterator_get_next_result_count: 0u64, |
| inspect_batch_iterator_get_next_result_errors: 0u64, |
| inspect_component_timeouts_count: 0u64, |
| inspect_reader_servers_constructed: 1u64, |
| inspect_reader_servers_destroyed: 0u64, |
| inspect_batch_iterator_get_next_time_usec: AnyProperty, |
| lifecycle_batch_iterator_connections_closed: 0u64, |
| lifecycle_batch_iterator_connections_opened: 0u64, |
| lifecycle_batch_iterator_get_next_errors: 0u64, |
| lifecycle_batch_iterator_get_next_requests: 0u64, |
| lifecycle_batch_iterator_get_next_responses: 0u64, |
| lifecycle_batch_iterator_get_next_result_count: 0u64, |
| lifecycle_batch_iterator_get_next_result_errors: 0u64, |
| lifecycle_component_timeouts_count: 0u64, |
| lifecycle_reader_servers_constructed: 0u64, |
| lifecycle_reader_servers_destroyed: 0u64, |
| lifecycle_batch_iterator_get_next_time_usec: AnyProperty, |
| logs_batch_iterator_connections_closed: 0u64, |
| logs_batch_iterator_connections_opened: 0u64, |
| logs_batch_iterator_get_next_errors: 0u64, |
| logs_batch_iterator_get_next_requests: 0u64, |
| logs_batch_iterator_get_next_responses: 0u64, |
| logs_batch_iterator_get_next_result_count: 0u64, |
| logs_batch_iterator_get_next_result_errors: 0u64, |
| logs_component_timeouts_count: 0u64, |
| logs_reader_servers_constructed: 0u64, |
| logs_reader_servers_destroyed: 0u64, |
| logs_batch_iterator_get_next_time_usec: AnyProperty, |
| stream_diagnostics_requests: 0u64, |
| } |
| }); |
| |
| let inspector_arc = Arc::new(inspector); |
| |
| inspect_repo |
| .write() |
| .add_inspect_artifacts( |
| component_id.clone(), |
| TEST_URL, |
| out_dir_proxy, |
| zx::Time::from_nanos(0), |
| ) |
| .unwrap(); |
| |
| let expected_get_next_result_errors = match mode { |
| VerifyMode::ExpectComponentFailure => 1u64, |
| _ => 0u64, |
| }; |
| |
| { |
| let result_json = read_snapshot( |
| inspect_repo.clone(), |
| inspector_arc.clone(), |
| test_batch_iterator_stats1, |
| ) |
| .await; |
| |
| let result_array = result_json.as_array().expect("unit test json should be array."); |
| assert_eq!(result_array.len(), 1, "Expect only one schema to be returned."); |
| |
| let result_map = |
| result_array[0].as_object().expect("entries in the schema array are json objects."); |
| |
| let result_payload = |
| result_map.get("payload").expect("diagnostics schema requires payload entry."); |
| |
| let expected_payload = match mode { |
| VerifyMode::ExpectSuccess => json!({ |
| "root": { |
| "child_1": { |
| "child_1_1": { |
| "some-int": 3 |
| } |
| }, |
| "child_2": { |
| "some-int": 2 |
| } |
| } |
| }), |
| VerifyMode::ExpectComponentFailure => json!(null), |
| }; |
| assert_eq!(*result_payload, expected_payload); |
| |
| // stream_diagnostics_requests is 0 since its tracked via archive_accessor server, |
| // which isnt running in this unit test. |
| assert_inspect_tree!(inspector_arc.clone(), root: { |
| test_archive_accessor_node: { |
| archive_accessor_connections_closed: 0u64, |
| archive_accessor_connections_opened: 0u64, |
| inspect_batch_iterator_connections_closed: 1u64, |
| inspect_batch_iterator_connections_opened: 1u64, |
| inspect_batch_iterator_get_next_errors: 0u64, |
| inspect_batch_iterator_get_next_requests: 2u64, |
| inspect_batch_iterator_get_next_responses: 2u64, |
| inspect_batch_iterator_get_next_result_count: 1u64, |
| inspect_batch_iterator_get_next_result_errors: expected_get_next_result_errors, |
| inspect_component_timeouts_count: 0u64, |
| inspect_reader_servers_constructed: 1u64, |
| inspect_reader_servers_destroyed: 1u64, |
| inspect_batch_iterator_get_next_time_usec: AnyProperty, |
| inspect_component_time_usec: AnyProperty, |
| inspect_longest_processing_times: contains { |
| "test_component.cmx": contains { |
| "@time": AnyProperty, |
| "duration_seconds": AnyProperty |
| } |
| }, |
| lifecycle_batch_iterator_connections_closed: 0u64, |
| lifecycle_batch_iterator_connections_opened: 0u64, |
| lifecycle_batch_iterator_get_next_errors: 0u64, |
| lifecycle_batch_iterator_get_next_requests: 0u64, |
| lifecycle_batch_iterator_get_next_responses: 0u64, |
| lifecycle_batch_iterator_get_next_result_count: 0u64, |
| lifecycle_batch_iterator_get_next_result_errors: 0u64, |
| lifecycle_component_timeouts_count: 0u64, |
| lifecycle_reader_servers_constructed: 0u64, |
| lifecycle_reader_servers_destroyed: 0u64, |
| lifecycle_batch_iterator_get_next_time_usec: AnyProperty, |
| logs_batch_iterator_connections_closed: 0u64, |
| logs_batch_iterator_connections_opened: 0u64, |
| logs_batch_iterator_get_next_errors: 0u64, |
| logs_batch_iterator_get_next_requests: 0u64, |
| logs_batch_iterator_get_next_responses: 0u64, |
| logs_batch_iterator_get_next_result_count: 0u64, |
| logs_batch_iterator_get_next_result_errors: 0u64, |
| logs_component_timeouts_count: 0u64, |
| logs_reader_servers_constructed: 0u64, |
| logs_reader_servers_destroyed: 0u64, |
| logs_batch_iterator_get_next_time_usec: AnyProperty, |
| stream_diagnostics_requests: 0u64, |
| } |
| }); |
| } |
| |
| let test_batch_iterator_stats2 = |
| Arc::new(diagnostics::DiagnosticsServerStats::for_inspect(test_accessor_stats.clone())); |
| |
| inspect_repo.write().remove(&component_id); |
| { |
| let result_json = read_snapshot( |
| inspect_repo.clone(), |
| inspector_arc.clone(), |
| test_batch_iterator_stats2, |
| ) |
| .await; |
| |
| let result_array = result_json.as_array().expect("unit test json should be array."); |
| assert_eq!(result_array.len(), 0, "Expect no schemas to be returned."); |
| |
| assert_inspect_tree!(inspector_arc.clone(), root: { |
| test_archive_accessor_node: { |
| archive_accessor_connections_closed: 0u64, |
| archive_accessor_connections_opened: 0u64, |
| inspect_batch_iterator_connections_closed: 2u64, |
| inspect_batch_iterator_connections_opened: 2u64, |
| inspect_batch_iterator_get_next_errors: 0u64, |
| inspect_batch_iterator_get_next_requests: 3u64, |
| inspect_batch_iterator_get_next_responses: 3u64, |
| inspect_batch_iterator_get_next_result_count: 1u64, |
| inspect_batch_iterator_get_next_result_errors: expected_get_next_result_errors, |
| inspect_component_timeouts_count: 0u64, |
| inspect_reader_servers_constructed: 2u64, |
| inspect_reader_servers_destroyed: 2u64, |
| inspect_batch_iterator_get_next_time_usec: AnyProperty, |
| inspect_component_time_usec: AnyProperty, |
| inspect_longest_processing_times: contains { |
| "test_component.cmx": contains { |
| "@time": AnyProperty, |
| "duration_seconds": AnyProperty, |
| } |
| }, |
| lifecycle_batch_iterator_connections_closed: 0u64, |
| lifecycle_batch_iterator_connections_opened: 0u64, |
| lifecycle_batch_iterator_get_next_errors: 0u64, |
| lifecycle_batch_iterator_get_next_requests: 0u64, |
| lifecycle_batch_iterator_get_next_responses: 0u64, |
| lifecycle_batch_iterator_get_next_result_count: 0u64, |
| lifecycle_batch_iterator_get_next_result_errors: 0u64, |
| lifecycle_component_timeouts_count: 0u64, |
| lifecycle_reader_servers_constructed: 0u64, |
| lifecycle_reader_servers_destroyed: 0u64, |
| lifecycle_batch_iterator_get_next_time_usec: AnyProperty, |
| logs_batch_iterator_connections_closed: 0u64, |
| logs_batch_iterator_connections_opened: 0u64, |
| logs_batch_iterator_get_next_errors: 0u64, |
| logs_batch_iterator_get_next_requests: 0u64, |
| logs_batch_iterator_get_next_responses: 0u64, |
| logs_batch_iterator_get_next_result_count: 0u64, |
| logs_batch_iterator_get_next_result_errors: 0u64, |
| logs_component_timeouts_count: 0u64, |
| logs_reader_servers_constructed: 0u64, |
| logs_reader_servers_destroyed: 0u64, |
| logs_batch_iterator_get_next_time_usec: AnyProperty, |
| stream_diagnostics_requests: 0u64, |
| } |
| }); |
| } |
| } |
| |
| fn start_snapshot( |
| inspect_repo: Arc<RwLock<DiagnosticsDataRepository>>, |
| stats: Arc<DiagnosticsServerStats>, |
| ) -> (BatchIteratorProxy, Task<()>) { |
| let reader_server = Box::pin(ReaderServer::stream( |
| inspect_repo, |
| BATCH_RETRIEVAL_TIMEOUT_SECONDS, |
| None, |
| stats.clone(), |
| )); |
| let (consumer, batch_iterator_requests) = |
| create_proxy_and_stream::<BatchIteratorMarker>().unwrap(); |
| ( |
| consumer, |
| Task::spawn(async { |
| AccessorServer::new( |
| reader_server, |
| batch_iterator_requests, |
| StreamMode::Snapshot, |
| stats, |
| ) |
| .unwrap() |
| .run() |
| .await |
| .unwrap() |
| }), |
| ) |
| } |
| |
| async fn read_snapshot( |
| inspect_repo: Arc<RwLock<DiagnosticsDataRepository>>, |
| _test_inspector: Arc<Inspector>, |
| stats: Arc<DiagnosticsServerStats>, |
| ) -> serde_json::Value { |
| let (consumer, server) = start_snapshot(inspect_repo, stats); |
| |
| let mut result_vec: Vec<String> = Vec::new(); |
| loop { |
| let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> = |
| consumer.get_next().await.unwrap().unwrap(); |
| |
| if next_batch.is_empty() { |
| break; |
| } |
| for formatted_content in next_batch { |
| match formatted_content { |
| fidl_fuchsia_diagnostics::FormattedContent::Json(data) => { |
| let mut buf = vec![0; data.size as usize]; |
| data.vmo.read(&mut buf, 0).expect("reading vmo"); |
| let hierarchy_string = std::str::from_utf8(&buf).unwrap(); |
| result_vec.push(hierarchy_string.to_string()); |
| } |
| _ => panic!("test only produces json formatted data"), |
| } |
| } |
| } |
| |
| // ensures connection is marked as closed, wait for stream to terminate |
| drop(consumer); |
| server.await; |
| |
| let result_string = format!("[{}]", result_vec.join(",")); |
| serde_json::from_str(&result_string) |
| .expect(&format!("unit tests shouldn't be creating malformed json: {}", result_string)) |
| } |
| |
| async fn read_snapshot_verify_batch_count_and_batch_size( |
| inspect_repo: Arc<RwLock<DiagnosticsDataRepository>>, |
| expected_batch_sizes: Vec<usize>, |
| stats: Arc<DiagnosticsServerStats>, |
| ) -> serde_json::Value { |
| let (consumer, server) = start_snapshot(inspect_repo, stats); |
| |
| let mut result_vec: Vec<String> = Vec::new(); |
| let mut batch_counts = Vec::new(); |
| loop { |
| let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> = |
| consumer.get_next().await.unwrap().unwrap(); |
| |
| if next_batch.is_empty() { |
| assert_eq!(expected_batch_sizes, batch_counts); |
| break; |
| } |
| |
| batch_counts.push(next_batch.len()); |
| |
| for formatted_content in next_batch { |
| match formatted_content { |
| fidl_fuchsia_diagnostics::FormattedContent::Json(data) => { |
| let mut buf = vec![0; data.size as usize]; |
| data.vmo.read(&mut buf, 0).expect("reading vmo"); |
| let hierarchy_string = std::str::from_utf8(&buf).unwrap(); |
| result_vec.push(hierarchy_string.to_string()); |
| } |
| _ => panic!("test only produces json formatted data"), |
| } |
| } |
| } |
| |
| // ensures connection is marked as closed, wait for stream to terminate |
| drop(consumer); |
| server.await; |
| |
| let result_string = format!("[{}]", result_vec.join(",")); |
| serde_json::from_str(&result_string) |
| .expect(&format!("unit tests shouldn't be creating malformed json: {}", result_string)) |
| } |
| } |