blob: ed50d1d2515410bd3e46af5d11c3473997571362 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
events::types::{ComponentIdentifier, InspectData},
formatter::{self, JsonInspectSchema},
},
anyhow::{format_err, Error},
fidl::endpoints::DiscoverableService,
fidl::endpoints::{RequestStream, ServerEnd},
fidl_fuchsia_diagnostics::{self, BatchIteratorMarker, BatchIteratorRequestStream, Selector},
fidl_fuchsia_inspect::TreeMarker,
fidl_fuchsia_inspect_deprecated::InspectMarker,
fidl_fuchsia_io::{DirectoryProxy, NodeInfo, CLONE_FLAG_SAME_RIGHTS},
fidl_fuchsia_mem, files_async,
fuchsia_async::{self as fasync, DurationExt, TimeoutExt},
fuchsia_inspect::reader::{
snapshot::{Snapshot, SnapshotTree},
PartialNodeHierarchy,
},
fuchsia_inspect_node_hierarchy::{
trie::{self, TrieIterableNode},
InspectHierarchyMatcher, NodeHierarchy,
},
fuchsia_zircon::{self as zx, DurationNum, HandleBased},
futures::future::{join_all, BoxFuture},
futures::stream::{FusedStream, StreamExt},
futures::{FutureExt, TryFutureExt, TryStreamExt},
inspect_fidl_load as deprecated_inspect, io_util,
log::error,
parking_lot::{Mutex, RwLock},
pin_utils::pin_mut,
selectors,
std::collections::HashMap,
std::convert::{TryFrom, TryInto},
std::path::{Path, PathBuf},
std::sync::Arc,
};
/// Keep only 64 hierarchy snapshots in memory at a time.
/// We limit to 64 because each snapshot is sent over a VMO and we can only have
/// 64 handles sent over a message.
// TODO(4601): Make this product-configurable.
// TODO(4601): Consider configuring batch sizes by bytes, not by hierarchy count.
static IN_MEMORY_SNAPSHOT_LIMIT: usize = 64;
// Number of seconds to wait before timing out various async operations;
// 1) Reading the diagnostics directory of a component, searching for inspect files.
// 2) Getting the description of a file proxy backing the inspect data.
// 3) Reading the bytes from a File.
// 4) Loading a hierachy from the deprecated inspect fidl protocol.
// 5) Converting an unpopulated data map into a populated data map.
static INSPECT_ASYNC_TIMEOUT_SECONDS: i64 = 5;
type InspectDataTrie = trie::Trie<String, UnpopulatedInspectDataContainer>;
enum ReadSnapshot {
Single(Snapshot),
Tree(SnapshotTree),
Finished(NodeHierarchy),
}
/// Mapping from a diagnostics filename to the underlying encoding of that
/// diagnostics data.
type DataMap = HashMap<String, InspectData>;
type Moniker = String;
pub trait DataCollector {
// Processes all previously collected data from the configured sources,
// provides the returned DataMap with ownership of that data, returns the
// map, and clears the collector state.
//
// If no data has yet been collected, or if the data had previously been
// collected, then the return value will be None.
fn take_data(self: Box<Self>) -> Option<DataMap>;
// Triggers the process of collection, causing the collector to find and stage
// all data it is configured to collect for transfer of ownership by the next
// take_data call.
fn collect(self: Box<Self>, path: PathBuf) -> BoxFuture<'static, Result<(), Error>>;
}
/// InspectDataCollector holds the information needed to retrieve the Inspect
/// VMOs associated with a particular component
#[derive(Clone, Debug)]
pub struct InspectDataCollector {
/// The inspect data associated with a particular event.
///
/// This is wrapped in an Arc Mutex so it can be shared between multiple data sources.
///
/// Note: The Arc is needed so that we can both add the data map to a data collector
/// and trigger async collection of the data in the same method. This can only
/// be done by allowing the async method to populate the same data that is being
/// passed into the component event.
inspect_data_map: Arc<Mutex<Option<DataMap>>>,
}
impl InspectDataCollector {
/// Construct a new InspectDataCollector, wrapped by an Arc<Mutex>.
pub fn new() -> Self {
InspectDataCollector { inspect_data_map: Arc::new(Mutex::new(Some(DataMap::new()))) }
}
/// Convert a fully-qualified path to a directory-proxy in the executing namespace.
/// NOTE: Currently does a synchronous directory-open, since there are no available
/// async apis.
pub async fn find_directory_proxy(path: &Path) -> Result<DirectoryProxy, Error> {
// TODO(36762): When available, use the async directory-open api.
return io_util::open_directory_in_namespace(
&path.to_string_lossy(),
io_util::OPEN_RIGHT_READABLE | io_util::OPEN_RIGHT_WRITABLE,
);
}
/// Searches the directory specified by inspect_directory_proxy for
/// .inspect files and populates the `inspect_data_map` with the found VMOs.
pub async fn populate_data_map(&mut self, inspect_proxy: &DirectoryProxy) -> Result<(), Error> {
// TODO(36762): Use a streaming and bounded readdir API when available to avoid
// being hung.
let entries = files_async::readdir_recursive(
inspect_proxy,
Some(INSPECT_ASYNC_TIMEOUT_SECONDS.seconds()),
)
.filter_map(|result| {
async move {
// TODO(fxb/49157): decide how to show directories that we failed to read.
result.ok()
}
});
pin_mut!(entries);
while let Some(entry) = entries.next().await {
// We are only currently interested in inspect VMO files (root.inspect) and
// inspect services.
if let Some(proxy) = self.maybe_load_service::<TreeMarker>(inspect_proxy, &entry)? {
let maybe_vmo = proxy
.get_content()
.err_into::<anyhow::Error>()
.on_timeout(INSPECT_ASYNC_TIMEOUT_SECONDS.seconds().after_now(), || {
Err(format_err!("Timed out reading contents via Tree protocol."))
})
.await?
.buffer
.map(|b| b.vmo);
self.maybe_add(&entry.name, InspectData::Tree(proxy, maybe_vmo));
continue;
}
if let Some(proxy) = self.maybe_load_service::<InspectMarker>(inspect_proxy, &entry)? {
self.maybe_add(&entry.name, InspectData::DeprecatedFidl(proxy));
continue;
}
if !entry.name.ends_with(".inspect") || entry.kind != files_async::DirentKind::File {
continue;
}
let file_proxy = match io_util::open_file(
inspect_proxy,
Path::new(&entry.name),
io_util::OPEN_RIGHT_READABLE,
) {
Ok(proxy) => proxy,
Err(_) => {
continue;
}
};
// Obtain the vmo backing any VmoFiles.
match file_proxy
.describe()
.err_into::<anyhow::Error>()
.on_timeout(INSPECT_ASYNC_TIMEOUT_SECONDS.seconds().after_now(), || {
Err(format_err!(
"Timed out waiting for backing file description: {:?}",
file_proxy
))
})
.await
{
Ok(nodeinfo) => match nodeinfo {
NodeInfo::Vmofile(vmofile) => {
self.maybe_add(&entry.name, InspectData::Vmo(vmofile.vmo));
}
NodeInfo::File(_) => {
let contents = io_util::read_file_bytes(&file_proxy)
.on_timeout(INSPECT_ASYNC_TIMEOUT_SECONDS.seconds().after_now(), || {
Err(format_err!(
"Timed out reading contents of fuchsia File: {:?}",
file_proxy
))
})
.await?;
self.maybe_add(&entry.name, InspectData::File(contents));
}
ty @ _ => {
error!(
"found an inspect file '{}' of unexpected type {:?}",
&entry.name, ty
);
}
},
Err(_) => {}
}
}
Ok(())
}
/// Adds a key value to the contained vector if it hasn't been taken yet. Otherwise, does
/// nothing.
fn maybe_add(&mut self, key: impl Into<String>, value: InspectData) {
if let Some(map) = self.inspect_data_map.lock().as_mut() {
map.insert(key.into(), value);
};
}
fn maybe_load_service<S: DiscoverableService>(
&self,
dir_proxy: &DirectoryProxy,
entry: &files_async::DirEntry,
) -> Result<Option<S::Proxy>, Error> {
if entry.name.ends_with(S::SERVICE_NAME) {
if entry.kind != files_async::DirentKind::Service {
return Ok(None);
}
let (proxy, server) = fidl::endpoints::create_proxy::<S>()?;
fdio::service_connect_at(dir_proxy.as_ref(), &entry.name, server.into_channel())?;
return Ok(Some(proxy));
}
Ok(None)
}
}
impl DataCollector for InspectDataCollector {
/// Takes the contained extra data. Additions following this have no effect.
fn take_data(self: Box<Self>) -> Option<DataMap> {
self.inspect_data_map.lock().take()
}
/// Collect extra data stored under the given path.
///
/// This currently only does a single pass over the directory to find information.
fn collect(mut self: Box<Self>, path: PathBuf) -> BoxFuture<'static, Result<(), Error>> {
async move {
let inspect_proxy = match InspectDataCollector::find_directory_proxy(&path)
.on_timeout(INSPECT_ASYNC_TIMEOUT_SECONDS.seconds().after_now(), || {
Err(format_err!("Timed out converting path into directory proxy: {:?}", path))
})
.await
{
Ok(proxy) => proxy,
Err(e) => {
return Err(format_err!("Failed to open out directory at {:?}: {}", path, e));
}
};
self.populate_data_map(&inspect_proxy).await
}
.boxed()
}
}
/// Packet containing a node hierarchy and all the metadata needed to
/// populate a diagnostics schema for that node hierarchy.
pub struct NodeHierarchyData {
// Name of the file that created this snapshot.
filename: String,
// Timestamp at which this snapshot resolved or failed.
timestamp: zx::Time,
// Errors encountered when processing this snapshot.
errors: Vec<formatter::Error>,
// Optional NodeHierarchy of the inspect hierarchy, in case reading fails
// and we have errors to share with client.
hierarchy: Option<NodeHierarchy>,
}
impl Into<NodeHierarchyData> for SnapshotData {
fn into(self: SnapshotData) -> NodeHierarchyData {
match self.snapshot {
Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) {
Ok(node_hierarchy) => NodeHierarchyData {
filename: self.filename,
timestamp: self.timestamp,
errors: self.errors,
hierarchy: Some(node_hierarchy),
},
Err(e) => NodeHierarchyData {
filename: self.filename,
timestamp: self.timestamp,
errors: vec![formatter::Error { message: format!("{:?}", e) }],
hierarchy: None,
},
},
None => NodeHierarchyData {
filename: self.filename,
timestamp: self.timestamp,
errors: self.errors,
hierarchy: None,
},
}
}
}
/// Packet containing a snapshot and all the metadata needed to
/// populate a diagnostics schema for that snapshot.
pub struct SnapshotData {
// Name of the file that created this snapshot.
filename: String,
// Timestamp at which this snapshot resolved or failed.
timestamp: zx::Time,
// Errors encountered when processing this snapshot.
errors: Vec<formatter::Error>,
// Optional snapshot of the inspect hierarchy, in case reading fails
// and we have errors to share with client.
snapshot: Option<ReadSnapshot>,
}
impl SnapshotData {
// Constructs packet that timestamps and packages inspect snapshot for exfiltration.
fn successful(snapshot: ReadSnapshot, filename: String) -> SnapshotData {
SnapshotData {
filename,
timestamp: fasync::Time::now().into_zx(),
errors: Vec::new(),
snapshot: Some(snapshot),
}
}
// Constructs packet that timestamps and packages inspect snapshot failure for exfiltration.
fn failed(error: formatter::Error, filename: String) -> SnapshotData {
SnapshotData {
filename,
timestamp: fasync::Time::now().into_zx(),
errors: vec![error],
snapshot: None,
}
}
}
/// PopulatedInspectDataContainer is the container that
/// holds the actual Inspect data for a given component,
/// along with all information needed to transform that data
/// to be returned to the client.
pub struct PopulatedInspectDataContainer {
/// Relative moniker of the component that this populated
/// data packet has gathered data for.
relative_moniker: Vec<String>,
/// Vector of all the snapshots of inspect hierarchies under
/// the diagnostics directory of the component identified by
/// relative_moniker, along with the metadata needed to populate
/// this snapshot's diagnostics schema.
snapshots: Vec<SnapshotData>,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
inspect_matcher: Option<InspectHierarchyMatcher>,
}
impl PopulatedInspectDataContainer {
async fn try_from(
unpopulated: UnpopulatedInspectDataContainer,
) -> Result<PopulatedInspectDataContainer, Error> {
let mut collector = InspectDataCollector::new();
match collector.populate_data_map(&unpopulated.component_diagnostics_proxy).await {
Ok(_) => {
let mut snapshots_data_opt = None;
if let Some(data_map) = Box::new(collector).take_data() {
let mut acc: Vec<SnapshotData> = vec![];
for (filename, data) in data_map {
match data {
InspectData::Tree(tree, _) => match SnapshotTree::try_from(&tree).await {
Ok(snapshot_tree) => {
acc.push(SnapshotData::successful(ReadSnapshot::Tree(snapshot_tree), filename));
}
Err(e) => {
acc.push(SnapshotData::failed(formatter::Error{message: format!("{:?}", e)}, filename));
}
},
InspectData::DeprecatedFidl(inspect_proxy) => {
match deprecated_inspect::load_hierarchy(inspect_proxy)
.on_timeout(
INSPECT_ASYNC_TIMEOUT_SECONDS.seconds().after_now(),
|| {
Err(format_err!(
"Timed out reading via deprecated inspect protocol.",
))
},
)
.await
{
Ok(hierarchy) => {
acc.push(SnapshotData::successful(ReadSnapshot::Finished(hierarchy), filename));
}
Err(e) => {
acc.push(SnapshotData::failed(formatter::Error{message: format!("{:?}", e)}, filename));
}
}
}
InspectData::Vmo(vmo) => match Snapshot::try_from(&vmo) {
Ok(snapshot) => {
acc.push(SnapshotData::successful(ReadSnapshot::Single(snapshot), filename));
}
Err(e) => {
acc.push(SnapshotData::failed(formatter::Error{message: format!("{:?}", e)}, filename));
}
},
InspectData::File(contents) => match Snapshot::try_from(contents) {
Ok(snapshot) => {
acc.push(SnapshotData::successful(ReadSnapshot::Single(snapshot), filename));
}
Err(e) => {
acc.push(SnapshotData::failed(formatter::Error{message: format!("{:?}", e)}, filename));
}
},
InspectData::Empty => {}
}
}
snapshots_data_opt = Some(acc);
}
match snapshots_data_opt {
Some(snapshots) => Ok(PopulatedInspectDataContainer {
relative_moniker: unpopulated.relative_moniker,
snapshots: snapshots,
inspect_matcher: unpopulated.inspect_matcher,
}),
None => Err(format_err!(
"Failed to parse snapshots for: {:?}.",
unpopulated.relative_moniker
)),
}
}
Err(e) => Err(e),
}
}
}
/// UnpopulatedInspectDataContainer is the container that holds
/// all information needed to retrieve Inspect data
/// for a given component, when requested.
pub struct UnpopulatedInspectDataContainer {
/// Relative moniker of the component that this data container
/// is representing.
relative_moniker: Vec<String>,
/// DirectoryProxy for the out directory that this
/// data packet is configured for.
component_diagnostics_proxy: DirectoryProxy,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
inspect_matcher: Option<InspectHierarchyMatcher>,
}
/// InspectDataRepository manages storage of all state needed in order
/// for the inspect reader to retrieve inspect data when a read is requested.
pub struct InspectDataRepository {
// TODO(lukenicholson): Wrap directory proxies in a trie of
// component names to make filtering by selectors work.
data_directories: InspectDataTrie,
/// Optional static selectors. For the all_access reader, there
/// are no provided selectors. For all other pipelines, a non-empty
/// vector is required.
static_selectors: Option<Vec<Arc<Selector>>>,
}
impl InspectDataRepository {
pub fn new(static_selectors: Option<Vec<Arc<Selector>>>) -> Self {
InspectDataRepository {
data_directories: InspectDataTrie::new(),
static_selectors: static_selectors,
}
}
pub fn remove(&mut self, component_id: &ComponentIdentifier) {
self.data_directories.remove(component_id.unique_key());
}
pub fn add(
&mut self,
identifier: ComponentIdentifier,
directory_proxy: DirectoryProxy,
) -> Result<(), Error> {
let relative_moniker = identifier.relative_moniker_for_selectors();
let matched_selectors = match &self.static_selectors {
Some(selectors) => Some(selectors::match_component_moniker_against_selectors(
&relative_moniker,
&selectors,
)?),
None => None,
};
// The component events stream might contain duplicated events for out/diagnostics
// directories of components that already existed before the archivist started or the
// archivist itself, make sure we don't track duplicated component diagnostics directories.
if self.contains(&identifier, &relative_moniker) {
return Ok(());
}
let key = identifier.unique_key();
match matched_selectors {
Some(selectors) => {
if !selectors.is_empty() {
self.data_directories.insert(
key,
UnpopulatedInspectDataContainer {
relative_moniker: relative_moniker,
component_diagnostics_proxy: directory_proxy,
inspect_matcher: Some((&selectors).try_into()?),
},
);
}
Ok(())
}
None => {
self.data_directories.insert(
key,
UnpopulatedInspectDataContainer {
relative_moniker: relative_moniker,
component_diagnostics_proxy: directory_proxy,
inspect_matcher: None,
},
);
Ok(())
}
}
}
/// Return all of the DirectoryProxies that contain Inspect hierarchies
/// which contain data that should be selected from.
pub fn fetch_data(&self) -> Vec<UnpopulatedInspectDataContainer> {
return self
.data_directories
.iter()
.filter_map(
|(_, unpopulated_data_container_opt)| match unpopulated_data_container_opt {
Some(unpopulated_data_container) => io_util::clone_directory(
&unpopulated_data_container.component_diagnostics_proxy,
CLONE_FLAG_SAME_RIGHTS,
)
.ok()
.map(|directory| UnpopulatedInspectDataContainer {
relative_moniker: unpopulated_data_container.relative_moniker.clone(),
component_diagnostics_proxy: directory,
inspect_matcher: unpopulated_data_container.inspect_matcher.clone(),
}),
None => None,
},
)
.collect();
}
fn contains(
&mut self,
component_id: &ComponentIdentifier,
relative_moniker: &[String],
) -> bool {
self.data_directories
.get(component_id.unique_key())
.map(|trie_node| {
trie_node
.get_values()
.iter()
.any(|container| container.relative_moniker == relative_moniker)
})
.unwrap_or(false)
}
}
/// ReaderServer holds the state and data needed to serve Inspect data
/// reading requests for a single client.
///
/// configured_selectors: are the selectors provided by the client which define
/// what inspect data is returned by read requests. A none type
/// implies that all available data should be returned.
///
/// inspect_repo: the InspectDataRepository which holds the access-points for all relevant
/// inspect data.
#[derive(Clone)]
pub struct ReaderServer {
pub inspect_repo: Arc<RwLock<InspectDataRepository>>,
pub configured_selectors: Option<Vec<Arc<fidl_fuchsia_diagnostics::Selector>>>,
}
fn convert_snapshot_to_node_hierarchy(snapshot: ReadSnapshot) -> Result<NodeHierarchy, Error> {
match snapshot {
ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()),
ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(),
ReadSnapshot::Finished(hierarchy) => Ok(hierarchy),
}
}
impl ReaderServer {
pub fn new(
inspect_repo: Arc<RwLock<InspectDataRepository>>,
configured_selectors: Option<Vec<fidl_fuchsia_diagnostics::Selector>>,
) -> Self {
ReaderServer {
inspect_repo,
configured_selectors: configured_selectors.map(|selectors| {
selectors.into_iter().map(|selector| Arc::new(selector)).collect()
}),
}
}
fn filter_single_components_snapshots(
sanitized_moniker: String,
snapshots: Vec<SnapshotData>,
static_matcher: Option<InspectHierarchyMatcher>,
client_matcher_container: &HashMap<String, Option<InspectHierarchyMatcher>>,
) -> Vec<NodeHierarchyData> {
let statically_filtered_hierarchies: Vec<NodeHierarchyData> = match static_matcher {
Some(static_matcher) => snapshots
.into_iter()
.map(|snapshot_data| {
let node_hierarchy_data: NodeHierarchyData = snapshot_data.into();
match node_hierarchy_data.hierarchy {
Some(node_hierarchy) => {
match fuchsia_inspect_node_hierarchy::filter_node_hierarchy(
node_hierarchy,
&static_matcher,
) {
Ok(filtered_hierarchy_opt) => NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: node_hierarchy_data.errors,
hierarchy: filtered_hierarchy_opt,
},
Err(e) => {
error!("Archivist failed to filter a node hierarchy: {:?}", e);
NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: vec![formatter::Error {
message: format!("{:?}", e),
}],
hierarchy: None,
}
}
}
}
None => NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: node_hierarchy_data.errors,
hierarchy: None,
},
}
})
.collect(),
// The only way we have a None value for the PopulatedDataContainer is
// if there were no provided static selectors, which is only valid in
// the AllAccess pipeline. For all other pipelines, if no static selectors
// matched, the data wouldn't have ended up in the repository to begin
// with.
None => snapshots.into_iter().map(|snapshot_data| snapshot_data.into()).collect(),
};
match client_matcher_container.get(&sanitized_moniker) {
// If the moniker key was present, and there was an InspectHierarchyMatcher,
// then this means the client provided their own selectors, and a subset of
// them matched this component. So we need to filter each of the snapshots from
// this component with the dynamically provided components.
Some(Some(dynamic_matcher)) => statically_filtered_hierarchies
.into_iter()
.map(|node_hierarchy_data| match node_hierarchy_data.hierarchy {
Some(node_hierarchy) => {
match fuchsia_inspect_node_hierarchy::filter_node_hierarchy(
node_hierarchy,
&dynamic_matcher,
) {
Ok(filtered_hierarchy_opt) => NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: node_hierarchy_data.errors,
hierarchy: filtered_hierarchy_opt,
},
Err(e) => {
eprintln!("Archivist failed to filter a node hierarchy: {:?}", e);
NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: vec![formatter::Error { message: format!("{:?}", e) }],
hierarchy: None,
}
}
}
}
None => NodeHierarchyData {
filename: node_hierarchy_data.filename,
timestamp: node_hierarchy_data.timestamp,
errors: node_hierarchy_data.errors,
hierarchy: None,
},
})
.collect(),
// If the moniker key was present, but the InspectHierarchyMatcher option was
// None, this means that the client provided their own selectors, and none of
// them matched this particular component, so no values are to be returned.
Some(None) => Vec::new(),
// If the moniker key was absent, then the entire client_matcher_container should
// be empty since the implication is that the client provided none of their own
// selectors. Either every moniker is present or none are. And, if no dynamically
// provided selectors exist, then the statically filtered snapshots are all that
// we need.
None => {
assert!(client_matcher_container.is_empty());
statically_filtered_hierarchies
}
}
}
/// Takes a batch of unpopulated inspect data containers, traverses their diagnostics
/// directories, takes snapshots of all the Inspect hierarchies in those directories,
/// and then transforms the data containers into `PopulatedInspectDataContainer` results.
///
/// An entry is only an Error if connecting to the directory fails. Within a component's
/// diagnostics directory, individual snapshots of hierarchies can fail and the transformation
/// to a PopulatedInspectDataContainer will still succeed.
async fn pump_inspect_data(
inspect_batch: Vec<UnpopulatedInspectDataContainer>,
) -> Vec<Result<PopulatedInspectDataContainer, Error>> {
join_all(inspect_batch.into_iter().map(move |inspect_data_packet| {
PopulatedInspectDataContainer::try_from(inspect_data_packet)
}))
.await
}
/// Takes a batch of PopulatedInspectDataContainer results, and for all the non-error
/// entries converts all snapshots into in-memory node hierarchies, filters those hierarchies
/// so that the only diagnostics properties they contain are those configured by the static
/// and client-provided selectors, and then packages the filtered hierarchies into
/// HierarchyData data structs.
///
// TODO(4601): Error entries should still be included, but with a custom hierarchy
// that makes it clear to clients that snapshotting failed.
pub fn filter_snapshots(
configured_selectors: &Option<Vec<Arc<Selector>>>,
pumped_inspect_data_results: Vec<Result<PopulatedInspectDataContainer, Error>>,
) -> Vec<(Moniker, NodeHierarchyData)> {
// In case we encounter multiple PopulatedDataContainers with the same moniker we don't
// want to do the component selector filtering again, so store the results in a map.
let mut client_selector_matches: HashMap<String, Option<InspectHierarchyMatcher>> =
HashMap::new();
// We iterate the vector of pumped inspect data packets, consuming each inspect vmo
// and filtering it using the provided selector regular expressions. Each filtered
// inspect hierarchy is then added to an accumulator as a HierarchyData to be converted
// into a JSON string and returned.
pumped_inspect_data_results.into_iter().fold(Vec::new(), |mut acc, pumped_data| {
match pumped_data {
Ok(PopulatedInspectDataContainer {
relative_moniker,
snapshots,
inspect_matcher,
}) => {
let sanitized_moniker = relative_moniker
.iter()
.map(|s| selectors::sanitize_string_for_selectors(s))
.collect::<Vec<String>>()
.join("/");
// We know that if configured_selectors is some, there is atleast one entry
// since the server validates the stream parameters and an empty
// configured_selectors vector is an error.
if configured_selectors.is_some() {
let configured_matchers = client_selector_matches
.entry(sanitized_moniker.clone())
.or_insert_with(|| {
let matching_selectors =
selectors::match_component_moniker_against_selectors(
&relative_moniker,
// Safe unwrap since we verify it is Some above.
configured_selectors.as_ref().unwrap(),
)
.unwrap_or_else(|err| {
error!(
"Failed to evaluate client selectors for: {:?} Error: {:?}",
relative_moniker, err
);
Vec::new()
});
if matching_selectors.is_empty() {
None
} else {
match (&matching_selectors).try_into() {
Ok(hierarchy_matcher) => Some(hierarchy_matcher),
Err(e) => {
error!("Failed to create hierarchy matcher: {:?}", e);
None
}
}
}
});
// If there were configured matchers and none of them matched
// this component, then we should return early since there is no data to
// extract.
if configured_matchers.is_none() {
return acc;
}
};
let mut filtered_hierarchy_data_with_moniker: Vec<(String, NodeHierarchyData)> =
ReaderServer::filter_single_components_snapshots(
sanitized_moniker.clone(),
snapshots,
inspect_matcher,
&client_selector_matches,
)
.into_iter()
.map(|filtered_hierarchy_data| {
(sanitized_moniker.clone(), filtered_hierarchy_data)
})
.collect();
acc.append(&mut filtered_hierarchy_data_with_moniker);
acc
}
// TODO(36761): What does it mean for IO to fail on a
// subset of directory data collections?
Err(_) => acc,
}
})
}
/// Takes a vector of HierarchyData structs, and a `fidl_fuchsia_diagnostics/Format`
/// enum, and writes each diagnostics hierarchy into a READ_ONLY VMO according to
/// provided format. This VMO is then packaged into a `fidl_fuchsia_mem/Buffer`
/// which is then packaged into a `fidl_fuchsia_diagnostics/FormattedContent`
/// xunion which specifies the format of the VMO for clients.
///
/// Errors in the returned Vector correspond to IO failures in writing to a VMO. If
/// a node hierarchy fails to format, its vmo is an empty string.
fn format_hierarchies(
format: &fidl_fuchsia_diagnostics::Format,
hierarchies_with_monikers: Vec<(Moniker, NodeHierarchyData)>,
) -> Vec<Result<fidl_fuchsia_diagnostics::FormattedContent, Error>> {
hierarchies_with_monikers
.into_iter()
.map(|(moniker, hierarchy_data)| {
let formatted_string_result = match format {
fidl_fuchsia_diagnostics::Format::Json => {
let inspect_schema = JsonInspectSchema::new(
moniker,
hierarchy_data.hierarchy,
hierarchy_data.timestamp,
hierarchy_data.filename,
hierarchy_data.errors,
);
Ok(serde_json::to_string_pretty(&inspect_schema)?)
}
fidl_fuchsia_diagnostics::Format::Text => {
Err(format_err!("Text formatting not supported for inspect."))
}
};
let content_string = match formatted_string_result {
Ok(formatted_string) => formatted_string,
Err(e) => {
// TODO(4601): Convert failed formattings into the
// canonical json schema, with a failure message in "data"
error!("parsing results from the inspect source failed: {:?}", e);
"".to_string()
}
};
let vmo_size: u64 = content_string.len() as u64;
let dump_vmo_result: Result<zx::Vmo, Error> = zx::Vmo::create(vmo_size as u64)
.map_err(|s| format_err!("error creating buffer, zx status: {}", s));
dump_vmo_result.and_then(|dump_vmo| {
dump_vmo
.write(content_string.as_bytes(), 0)
.map_err(|s| format_err!("error writing buffer, zx status: {}", s))?;
let client_vmo =
dump_vmo.duplicate_handle(zx::Rights::READ | zx::Rights::BASIC)?;
let mem_buffer = fidl_fuchsia_mem::Buffer { vmo: client_vmo, size: vmo_size };
match format {
fidl_fuchsia_diagnostics::Format::Json => {
Ok(fidl_fuchsia_diagnostics::FormattedContent::Json(mem_buffer))
}
fidl_fuchsia_diagnostics::Format::Text => {
Ok(fidl_fuchsia_diagnostics::FormattedContent::Json(mem_buffer))
}
}
})
})
.collect()
}
/// Takes a BatchIterator server channel and upon receiving a GetNext request, serves
/// an empty vector denoting that the iterator has reached its end and is terminating.
pub async fn serve_terminal_batch(
&self,
stream: &mut BatchIteratorRequestStream,
) -> Result<(), Error> {
if stream.is_terminated() {
return Ok(());
}
while let Some(req) = stream.try_next().await? {
match req {
fidl_fuchsia_diagnostics::BatchIteratorRequest::GetNext { responder } => {
responder.send(&mut Ok(Vec::new()))?;
}
}
break;
}
Ok(())
}
/// Takes a BatchIterator server channel and starts serving snapshotted
/// Inspect hierarchies to clients as vectors of FormattedContent. The hierarchies
/// are served in batches of `IN_MEMORY_SNAPSHOT_LIMIT` at a time, and snapshots of
/// diagnostics data aren't taken until a component is included in the upcoming batch.
///
/// NOTE: This API does not send the terminal empty-vector at the end of the snapshot.
pub async fn serve_inspect_snapshot(
&self,
stream: &mut BatchIteratorRequestStream,
format: &fidl_fuchsia_diagnostics::Format,
) -> Result<(), Error> {
if stream.is_terminated() {
return Ok(());
}
// We must fetch the repositories in a closure to prevent the
// repository mutex-guard from leaking into futures.
let inspect_repo_data = self.inspect_repo.read().fetch_data();
let inspect_repo_length = inspect_repo_data.len();
let mut inspect_repo_iter = inspect_repo_data.into_iter();
let mut iter = 0;
let max = (inspect_repo_length - 1 / IN_MEMORY_SNAPSHOT_LIMIT) + 1;
while let Some(req) = stream.try_next().await? {
match req {
fidl_fuchsia_diagnostics::BatchIteratorRequest::GetNext { responder } => {
let snapshot_batch: Vec<UnpopulatedInspectDataContainer> =
(&mut inspect_repo_iter).take(IN_MEMORY_SNAPSHOT_LIMIT).collect();
iter = iter + 1;
// Asynchronously populate data containers with snapshots of relevant
// inspect hierarchies.
let pumped_inspect_data_results =
ReaderServer::pump_inspect_data(snapshot_batch).await;
// Apply selector filtering to all snapshot inspect hierarchies in the batch
let batch_hierarchy_data = ReaderServer::filter_snapshots(
&self.configured_selectors,
pumped_inspect_data_results,
);
let formatted_content: Vec<
Result<fidl_fuchsia_diagnostics::FormattedContent, Error>,
> = ReaderServer::format_hierarchies(format, batch_hierarchy_data);
let filtered_results: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
formatted_content.into_iter().filter_map(Result::ok).collect();
responder.send(&mut Ok(filtered_results))?;
}
}
// We've sent all the meaningful content available in snapshot mode.
// The terminal value must be handled separately.
if iter == max - 1 {
break;
}
}
Ok(())
}
pub fn stream_inspect(
self,
stream_mode: fidl_fuchsia_diagnostics::StreamMode,
format: fidl_fuchsia_diagnostics::Format,
result_stream: ServerEnd<BatchIteratorMarker>,
) -> Result<(), Error> {
let result_channel = fasync::Channel::from_channel(result_stream.into_channel())?;
fasync::spawn(
async move {
let mut iterator_req_stream =
fidl_fuchsia_diagnostics::BatchIteratorRequestStream::from_channel(
result_channel,
);
if stream_mode == fidl_fuchsia_diagnostics::StreamMode::Snapshot
|| stream_mode == fidl_fuchsia_diagnostics::StreamMode::SnapshotThenSubscribe
{
self.serve_inspect_snapshot(&mut iterator_req_stream, &format).await?;
}
if stream_mode == fidl_fuchsia_diagnostics::StreamMode::Subscribe
|| stream_mode == fidl_fuchsia_diagnostics::StreamMode::SnapshotThenSubscribe
{
error!("not yet supported");
}
self.serve_terminal_batch(&mut iterator_req_stream).await?;
Ok(())
}
.unwrap_or_else(|e: anyhow::Error| {
error!("Error encountered running inspect stream: {:?}", e);
}),
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::events::types::{LegacyIdentifier, RealmPath},
fdio,
fidl::endpoints::create_proxy,
fidl_fuchsia_io::DirectoryMarker,
fuchsia_async as fasync,
fuchsia_component::server::ServiceFs,
fuchsia_inspect::{assert_inspect_tree, reader, Inspector},
fuchsia_zircon as zx,
fuchsia_zircon::Peered,
futures::StreamExt,
serde_json::json,
};
fn get_vmo(text: &[u8]) -> zx::Vmo {
let vmo = zx::Vmo::create(4096).unwrap();
vmo.write(text, 0).unwrap();
vmo
}
#[fasync::run_singlethreaded(test)]
async fn inspect_data_collector() {
let path = PathBuf::from("/test-bindings");
// Make a ServiceFs containing two files.
// One is an inspect file, and one is not.
let mut fs = ServiceFs::new();
let vmo = get_vmo(b"test1");
let vmo2 = get_vmo(b"test2");
let vmo3 = get_vmo(b"test3");
let vmo4 = get_vmo(b"test4");
fs.dir("diagnostics").add_vmo_file_at("root.inspect", vmo, 0, 4096);
fs.dir("diagnostics").add_vmo_file_at("root_not_inspect", vmo2, 0, 4096);
fs.dir("diagnostics").dir("a").add_vmo_file_at("root.inspect", vmo3, 0, 4096);
fs.dir("diagnostics").dir("b").add_vmo_file_at("root.inspect", vmo4, 0, 4096);
// Create a connection to the ServiceFs.
let (h0, h1) = zx::Channel::create().unwrap();
fs.serve_connection(h1).unwrap();
let ns = fdio::Namespace::installed().unwrap();
ns.bind(path.join("out").to_str().unwrap(), h0).unwrap();
fasync::spawn(fs.collect());
let (done0, done1) = zx::Channel::create().unwrap();
let thread_path = path.join("out/diagnostics");
// Run the actual test in a separate thread so that it does not block on FS operations.
// Use signalling on a zx::Channel to indicate that the test is done.
std::thread::spawn(move || {
let path = thread_path;
let done = done1;
let mut executor = fasync::Executor::new().unwrap();
executor.run_singlethreaded(async {
let collector = InspectDataCollector::new();
// Trigger collection on a clone of the inspect collector so
// we can use collector to take the collected data.
Box::new(collector.clone()).collect(path).await.unwrap();
let collector: Box<InspectDataCollector> = Box::new(collector);
let extra_data = collector.take_data().expect("collector missing data");
assert_eq!(3, extra_data.len());
let assert_extra_data = |path: &str, content: &[u8]| {
let extra = extra_data.get(path);
assert!(extra.is_some());
match extra.unwrap() {
InspectData::Vmo(vmo) => {
let mut buf = [0u8; 5];
vmo.read(&mut buf, 0).expect("reading vmo");
assert_eq!(content, &buf);
}
v => {
panic!("Expected Vmo, got {:?}", v);
}
}
};
assert_extra_data("root.inspect", b"test1");
assert_extra_data("a/root.inspect", b"test3");
assert_extra_data("b/root.inspect", b"test4");
done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
});
});
fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
ns.unbind(path.join("out").to_str().unwrap()).unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn inspect_data_collector_tree() {
let path = PathBuf::from("/test-bindings2");
// Make a ServiceFs serving an inspect tree.
let mut fs = ServiceFs::new();
let inspector = Inspector::new();
inspector.root().record_int("a", 1);
inspector.root().record_lazy_child("lazy", || {
async move {
let inspector = Inspector::new();
inspector.root().record_double("b", 3.14);
Ok(inspector)
}
.boxed()
});
inspector.serve(&mut fs).expect("failed to serve inspector");
// Create a connection to the ServiceFs.
let (h0, h1) = zx::Channel::create().unwrap();
fs.serve_connection(h1).unwrap();
let ns = fdio::Namespace::installed().unwrap();
ns.bind(path.join("out").to_str().unwrap(), h0).unwrap();
fasync::spawn(fs.collect());
let (done0, done1) = zx::Channel::create().unwrap();
let thread_path = path.join("out/diagnostics");
// Run the actual test in a separate thread so that it does not block on FS operations.
// Use signalling on a zx::Channel to indicate that the test is done.
std::thread::spawn(move || {
let path = thread_path;
let done = done1;
let mut executor = fasync::Executor::new().unwrap();
executor.run_singlethreaded(async {
let collector = InspectDataCollector::new();
//// Trigger collection on a clone of the inspect collector so
//// we can use collector to take the collected data.
Box::new(collector.clone()).collect(path).await.unwrap();
let collector: Box<InspectDataCollector> = Box::new(collector);
let extra_data = collector.take_data().expect("collector missing data");
assert_eq!(1, extra_data.len());
let extra = extra_data.get(TreeMarker::SERVICE_NAME);
assert!(extra.is_some());
match extra.unwrap() {
InspectData::Tree(tree, vmo) => {
// Assert we can read the tree proxy and get the data we expected.
let hierarchy = reader::read_from_tree(tree)
.await
.expect("failed to read hierarchy from tree");
assert_inspect_tree!(hierarchy, root: {
a: 1i64,
lazy: {
b: 3.14,
}
});
let partial_hierarchy: NodeHierarchy =
PartialNodeHierarchy::try_from(vmo.as_ref().unwrap())
.expect("failed to read hierarchy from vmo")
.into();
// Assert the vmo also points to that data (in this case since there's no
// lazy nodes).
assert_inspect_tree!(partial_hierarchy, root: {
a: 1i64,
});
}
v => {
panic!("Expected Tree, got {:?}", v);
}
}
done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
});
});
fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
ns.unbind(path.join("out").to_str().unwrap()).unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn reader_server_formatting() {
let path = PathBuf::from("/test-bindings3");
// Make a ServiceFs containing two files.
// One is an inspect file, and one is not.
let mut fs = ServiceFs::new();
let vmo = zx::Vmo::create(4096).unwrap();
let inspector = inspector_for_reader_test();
let data = inspector.copy_vmo_data().unwrap();
vmo.write(&data, 0).unwrap();
fs.dir("diagnostics").add_vmo_file_at("test.inspect", vmo, 0, 4096);
// Create a connection to the ServiceFs.
let (h0, h1) = zx::Channel::create().unwrap();
fs.serve_connection(h1).unwrap();
let ns = fdio::Namespace::installed().unwrap();
ns.bind(path.join("out").to_str().unwrap(), h0).unwrap();
fasync::spawn(fs.collect());
let (done0, done1) = zx::Channel::create().unwrap();
let thread_path = path.join("out");
// Run the actual test in a separate thread so that it does not block on FS operations.
// Use signalling on a zx::Channel to indicate that the test is done.
std::thread::spawn(move || {
let path = thread_path;
let done = done1;
let mut executor = fasync::Executor::new().unwrap();
executor.run_singlethreaded(async {
verify_reader(path).await;
done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
});
});
fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
ns.unbind(path.join("out").to_str().unwrap()).unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn read_server_formatting_tree() {
let path = PathBuf::from("/test-bindings4");
// Make a ServiceFs containing two files.
// One is an inspect file, and one is not.
let mut fs = ServiceFs::new();
let inspector = inspector_for_reader_test();
inspector.serve(&mut fs).expect("failed to serve inspector");
// Create a connection to the ServiceFs.
let (h0, h1) = zx::Channel::create().unwrap();
fs.serve_connection(h1).unwrap();
let ns = fdio::Namespace::installed().unwrap();
ns.bind(path.join("out").to_str().unwrap(), h0).unwrap();
fasync::spawn(fs.collect());
let (done0, done1) = zx::Channel::create().unwrap();
let thread_path = path.join("out");
// Run the actual test in a separate thread so that it does not block on FS operations.
// Use signalling on a zx::Channel to indicate that the test is done.
std::thread::spawn(move || {
let path = thread_path;
let done = done1;
let mut executor = fasync::Executor::new().unwrap();
executor.run_singlethreaded(async {
verify_reader(path).await;
done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
});
});
fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
ns.unbind(path.join("out").to_str().unwrap()).unwrap();
}
#[fasync::run_singlethreaded(test)]
async fn inspect_repo_disallows_duplicated_dirs() {
let mut inspect_repo = InspectDataRepository::new(None);
let realm_path = RealmPath(vec!["a".to_string(), "b".to_string()]);
let instance_id = "1234".to_string();
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id,
realm_path,
component_name: "foo.cmx".into(),
});
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
inspect_repo.add(component_id.clone(), proxy).expect("add to repo");
let (proxy, _) =
fidl::endpoints::create_proxy::<DirectoryMarker>().expect("create directory proxy");
inspect_repo.add(component_id.clone(), proxy).expect("add to repo");
let key = component_id.unique_key();
assert_eq!(inspect_repo.data_directories.get(key).unwrap().get_values().len(), 1);
}
fn inspector_for_reader_test() -> Inspector {
let inspector = Inspector::new();
let root = inspector.root();
let child_1 = root.create_child("child_1");
child_1.record_int("some-int", 2);
let child_1_1 = child_1.create_child("child_1_1");
child_1_1.record_int("some-int", 3);
child_1_1.record_int("not-wanted-int", 4);
root.record(child_1_1);
root.record(child_1);
let child_2 = root.create_child("child_2");
child_2.record_int("some-int", 2);
root.record(child_2);
inspector
}
async fn verify_reader(path: PathBuf) {
let child_1_1_selector = selectors::parse_selector(r#"*:root/child_1/*:some-int"#).unwrap();
let child_2_selector =
selectors::parse_selector(r#"test_component.cmx:root/child_2:*"#).unwrap();
let inspect_repo = Arc::new(RwLock::new(InspectDataRepository::new(Some(vec![
Arc::new(child_1_1_selector),
Arc::new(child_2_selector),
]))));
let out_dir_proxy = InspectDataCollector::find_directory_proxy(&path).await.unwrap();
// The absolute moniker here is made up since the selector is a glob
// selector, so any path would match.
let component_id = ComponentIdentifier::Legacy(LegacyIdentifier {
instance_id: "1234".into(),
realm_path: vec![].into(),
component_name: "test_component.cmx".into(),
});
inspect_repo.write().add(component_id.clone(), out_dir_proxy).unwrap();
let reader_server = ReaderServer::new(inspect_repo.clone(), None);
let result_json = read_snapshot(reader_server.clone()).await;
let result_array = result_json.as_array().expect("unit test json should be array.");
assert_eq!(result_array.len(), 1, "Expect only one schema to be returned.");
let result_map =
result_array[0].as_object().expect("entries in the schema array are json objects.");
let result_payload =
result_map.get("payload").expect("diagnostics schema requires payload entry.");
let expected_payload = json!({
"root": {
"child_1": {
"child_1_1": {
"some-int": 3
}
},
"child_2": {
"some-int": 2
}
}
});
assert_eq!(*result_payload, expected_payload);
inspect_repo.write().remove(&component_id);
let result_json = read_snapshot(reader_server.clone()).await;
let result_array = result_json.as_array().expect("unit test json should be array.");
assert_eq!(result_array.len(), 0, "Expect no schemas to be returned.");
}
async fn read_snapshot(reader_server: ReaderServer) -> serde_json::Value {
let (consumer, batch_iterator): (
_,
ServerEnd<fidl_fuchsia_diagnostics::BatchIteratorMarker>,
) = create_proxy().unwrap();
fasync::spawn(async move {
reader_server
.stream_inspect(
fidl_fuchsia_diagnostics::StreamMode::Snapshot,
fidl_fuchsia_diagnostics::Format::Json,
batch_iterator,
)
.unwrap();
});
let mut result_vec: Vec<String> = Vec::new();
loop {
let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
consumer.get_next().await.unwrap().unwrap();
if next_batch.is_empty() {
break;
}
for formatted_content in next_batch {
match formatted_content {
fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
let mut buf = vec![0; data.size as usize];
data.vmo.read(&mut buf, 0).expect("reading vmo");
let hierarchy_string = std::str::from_utf8(&buf).unwrap();
result_vec.push(hierarchy_string.to_string());
}
_ => panic!("test only produces json formatted data"),
}
}
}
let result_string = format!("[{}]", result_vec.join(","));
serde_json::from_str(&result_string)
.expect(&format!("unit tests shouldn't be creating malformed json: {}", result_string))
}
}