blob: eb64a69467848cd4c6ea27b8796e503c3b8683ba [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::constants;
use crate::diagnostics::GlobalConnectionStats;
use crate::identity::ComponentIdentity;
use crate::inspect::collector::{self as collector, InspectData};
use diagnostics_data::{self as schema, InspectHandleName};
use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
use fidl::endpoints::Proxy;
use fidl_fuchsia_inspect::TreeProxy;
use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
use fuchsia_inspect::reader::snapshot::{Snapshot, SnapshotTree};
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::channel::oneshot;
use futures::{FutureExt, Stream};
use lazy_static::lazy_static;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tracing::warn;
use {fidl_fuchsia_io as fio, fuchsia_trace as ftrace, inspect_fidl_load as deprecated_inspect};
#[derive(Debug, Clone)]
pub enum InspectHandle {
Tree(TreeProxy, Option<InspectHandleName>),
impl InspectHandle {
pub fn is_closed(&self) -> bool {
match self {
Self::Directory(proxy) => proxy.as_channel().is_closed(),
Self::Tree(proxy, _) => proxy.as_channel().is_closed(),
pub async fn on_closed(&self) -> Result<zx::Signals, zx::Status> {
match self {
Self::Tree(proxy, _) => proxy.on_closed().await,
Self::Directory(proxy) => proxy.on_closed().await,
pub fn koid(&self) -> zx::Koid {
match self {
Self::Directory(proxy) => {
proxy.as_channel().as_handle_ref().get_koid().expect("DirectoryProxy has koid")
Self::Tree(proxy, _) => {
proxy.as_channel().as_handle_ref().get_koid().expect("TreeProxy has koid")
pub fn from_named_tree_proxy(proxy: TreeProxy, name: Option<String>) -> Self {
pub fn is_tree(&self) -> bool {
matches!(self, Self::Tree(_, _))
impl From<fio::DirectoryProxy> for InspectHandle {
fn from(proxy: fio::DirectoryProxy) -> Self {
impl From<TreeProxy> for InspectHandle {
fn from(proxy: TreeProxy) -> Self {
InspectHandle::Tree(proxy, None)
pub struct InspectArtifactsContainer {
/// One or more proxies that this container is configured for.
inspect_handles: HashMap<zx::Koid, Arc<InspectHandle>>,
on_closed_tasks: Vec<fasync::Task<()>>,
impl InspectArtifactsContainer {
/// Create a new `InspectArtifactsContainer`.
/// Returns itself and a `Receiver` that resolves into the koid of the input `proxy`
/// when that proxy closes.
pub fn new(proxy: impl Into<InspectHandle>) -> (Self, oneshot::Receiver<zx::Koid>) {
let mut this = Self { inspect_handles: HashMap::new(), on_closed_tasks: vec![] };
// safe to unwrap as diagnostics_proxy is guaranteed to be empty
let rx = this.push_handle(proxy).unwrap();
(this, rx)
/// Remove a handle via its `koid` from the set of proxies managed by `self`.
pub fn remove_handle(&mut self, koid: zx::Koid) -> usize {
/// Push a new handle into the container.
/// Returns `None` if the handle is a DirectoryProxy and there is already one tracked,
/// as only single handles are supported in the DirectoryProxy case.
pub fn push_handle(
&mut self,
new_handle: impl Into<InspectHandle>,
) -> Option<oneshot::Receiver<zx::Koid>> {
let handle = new_handle.into();
if !self.inspect_handles.is_empty() && matches!(handle, InspectHandle::Directory(_)) {
return None;
let handle = Arc::new(handle);
let koid = handle.koid();
self.inspect_handles.insert(koid, Arc::clone(&handle));
let (task, rcv) = Self::on_closed_task(handle, koid);
/// Generate an `UnpopulatedInspectDataContainer` from the proxies managed by `self`.
/// Returns `None` if there are no valid proxies.
pub fn create_unpopulated(
identity: &Arc<ComponentIdentity>,
matcher: Option<Arc<HierarchyMatcher>>,
) -> Option<UnpopulatedInspectDataContainer> {
if self.inspect_handles.is_empty() {
return None;
// we know there is at least 1
let handle = self.inspect_handles.values().next().unwrap();
match handle.as_ref() {
// there can only be one Directory, semantically
InspectHandle::Directory(dir) if self.inspect_handles.len() == 1 => {
fuchsia_fs::directory::clone_no_describe(dir, None).ok().map(|directory| {
UnpopulatedInspectDataContainer {
identity: Arc::clone(identity),
inspect_handles: vec![directory.into()],
inspect_matcher: matcher,
InspectHandle::Tree(_, _) => Some(UnpopulatedInspectDataContainer {
identity: Arc::clone(identity),
inspect_matcher: matcher,
inspect_handles: self
.filter(|handle| handle.as_ref().is_tree())
.map(|handle| handle.as_ref().clone())
_ => None,
fn on_closed_task(
proxy: Arc<InspectHandle>,
koid: zx::Koid,
) -> (fasync::Task<()>, oneshot::Receiver<zx::Koid>) {
let (snd, rcv) = oneshot::channel();
fasync::Task::spawn(async move {
if !proxy.is_closed() {
let _ = proxy.on_closed().await;
let _ = snd.send(koid);
impl InspectArtifactsContainer {
pub(crate) fn handles(&self) -> impl ExactSizeIterator<Item = &Arc<InspectHandle>> {
lazy_static! {
static ref TIMEOUT_MESSAGE: &'static str =
"Exceeded per-component time limit for fetching diagnostics data";
pub enum ReadSnapshot {
/// Packet containing a snapshot and all the metadata needed to
/// populate a diagnostics schema for that snapshot.
pub struct SnapshotData {
/// Optional name of the file or InspectSink proxy that created this snapshot.
pub name: Option<InspectHandleName>,
/// Timestamp at which this snapshot resolved or failed.
pub timestamp: zx::Time,
/// Errors encountered when processing this snapshot.
pub errors: Vec<schema::InspectError>,
/// Optional snapshot of the inspect hierarchy, in case reading fails
/// and we have errors to share with client.
pub snapshot: Option<ReadSnapshot>,
impl SnapshotData {
async fn new(
name: Option<InspectHandleName>,
data: InspectData,
lazy_child_timeout: zx::Duration,
identity: &ComponentIdentity,
parent_trace_id: ftrace::Id,
) -> SnapshotData {
let trace_id = ftrace::Id::random();
let _trace_guard = ftrace::async_enter!(
// An async duration cannot have multiple concurrent child async durations
// so we include the nonce as metadata to manually determine relationship.
"parent_trace_id" => u64::from(parent_trace_id),
"trace_id" => u64::from(trace_id),
"moniker" => identity.to_string().as_ref(),
"filename" => name
"name" => name
match data {
InspectData::Tree(tree) => {
let lazy_child_timeout =
Duration::from_nanos(lazy_child_timeout.into_nanos() as u64);
match SnapshotTree::try_from_with_timeout(&tree, lazy_child_timeout).await {
Ok(snapshot_tree) => {
SnapshotData::successful(ReadSnapshot::Tree(snapshot_tree), name)
Err(e) => SnapshotData::failed(
schema::InspectError { message: format!("{e:?}") },
InspectData::DeprecatedFidl(inspect_proxy) => {
match deprecated_inspect::load_hierarchy(inspect_proxy).await {
Ok(hierarchy) => {
SnapshotData::successful(ReadSnapshot::Finished(hierarchy), name)
Err(e) => SnapshotData::failed(
schema::InspectError { message: format!("{e:?}") },
InspectData::Vmo(vmo) => match Snapshot::try_from(&vmo) {
Ok(snapshot) => SnapshotData::successful(ReadSnapshot::Single(snapshot), name),
Err(e) => {
SnapshotData::failed(schema::InspectError { message: format!("{e:?}") }, name)
InspectData::File(contents) => match Snapshot::try_from(contents) {
Ok(snapshot) => SnapshotData::successful(ReadSnapshot::Single(snapshot), name),
Err(e) => {
SnapshotData::failed(schema::InspectError { message: format!("{e:?}") }, name)
// Constructs packet that timestamps and packages inspect snapshot for exfiltration.
fn successful(snapshot: ReadSnapshot, name: Option<InspectHandleName>) -> SnapshotData {
SnapshotData {
timestamp: fasync::Time::now().into_zx(),
errors: Vec::new(),
snapshot: Some(snapshot),
// Constructs packet that timestamps and packages inspect snapshot failure for exfiltration.
fn failed(error: schema::InspectError, name: Option<InspectHandleName>) -> SnapshotData {
SnapshotData {
timestamp: fasync::Time::now().into_zx(),
errors: vec![error],
snapshot: None,
/// PopulatedInspectDataContainer is the container that
/// holds the actual Inspect data for a given component,
/// along with all information needed to transform that data
/// to be returned to the client.
pub struct PopulatedInspectDataContainer {
pub identity: Arc<ComponentIdentity>,
/// Vector of all the snapshots of inspect hierarchies under
/// the diagnostics directory of the component identified by
/// moniker, along with the metadata needed to populate
/// this snapshot's diagnostics schema.
pub snapshot: SnapshotData,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
pub inspect_matcher: Option<Arc<HierarchyMatcher>>,
enum Status {
Pending(VecDeque<(Option<InspectHandleName>, InspectData)>),
struct State {
status: Status,
unpopulated: Arc<UnpopulatedInspectDataContainer>,
batch_timeout: zx::Duration,
elapsed_time: zx::Duration,
global_stats: Arc<GlobalConnectionStats>,
trace_guard: Arc<Option<ftrace::AsyncScope>>,
trace_id: ftrace::Id,
impl State {
fn into_pending(
pending: VecDeque<(Option<InspectHandleName>, InspectData)>,
start_time: zx::Time,
) -> Self {
Self {
unpopulated: self.unpopulated,
status: Status::Pending(pending),
batch_timeout: self.batch_timeout,
global_stats: self.global_stats,
elapsed_time: self.elapsed_time + (zx::Time::get_monotonic() - start_time),
trace_guard: self.trace_guard,
trace_id: self.trace_id,
fn add_elapsed_time(&mut self, start_time: zx::Time) {
self.elapsed_time += zx::Time::get_monotonic() - start_time
async fn iterate(
mut self,
start_time: zx::Time,
) -> Option<(PopulatedInspectDataContainer, State)> {
loop {
match &mut self.status {
Status::Begin => {
let data_map =
self = self
.into_pending(data_map.into_iter().collect::<VecDeque<_>>(), start_time);
Status::Pending(ref mut pending) => match pending.pop_front() {
None => {
self.elapsed_time + (zx::Time::get_monotonic() - start_time),
return None;
Some((name, data)) => {
let snapshot = SnapshotData::new(
self.batch_timeout / constants::LAZY_NODE_TIMEOUT_PROPORTION,
let result = PopulatedInspectDataContainer {
identity: Arc::clone(&self.unpopulated.identity),
inspect_matcher: self.unpopulated.inspect_matcher.clone(),
return Some((result, self));
/// UnpopulatedInspectDataContainer is the container that holds
/// all information needed to retrieve Inspect data
/// for a given component, when requested.
pub struct UnpopulatedInspectDataContainer {
pub identity: Arc<ComponentIdentity>,
/// Proxies configured for container. It is an invariant that if any value is an
/// InspectHandle::Directory, then there is exactly one value.
pub inspect_handles: Vec<InspectHandle>,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
pub inspect_matcher: Option<Arc<HierarchyMatcher>>,
impl UnpopulatedInspectDataContainer {
/// Populates this data container with a timeout. On the timeout firing returns a
/// container suitable to return to clients, but with timeout error information recorded.
pub fn populate(
timeout: i64,
global_stats: Arc<GlobalConnectionStats>,
parent_trace_id: ftrace::Id,
) -> impl Stream<Item = PopulatedInspectDataContainer> {
let trace_id = ftrace::Id::random();
let trace_guard = ftrace::async_enter!(
// An async duration cannot have multiple concurrent child async durations
// so we include the nonce as metadata to manually determine relationship.
"parent_trace_id" => u64::from(parent_trace_id),
"trace_id" => u64::from(trace_id),
"moniker" => self.identity.to_string().as_ref()
let this = Arc::new(self);
let state = State {
status: Status::Begin,
unpopulated: this,
batch_timeout: zx::Duration::from_seconds(timeout),
elapsed_time: zx::Duration::from_nanos(0),
trace_guard: Arc::new(trace_guard),
futures::stream::unfold(state, |state| {
let unpopulated_for_timeout = Arc::clone(&state.unpopulated);
let timeout = state.batch_timeout;
let elapsed_time = state.elapsed_time;
let global_stats = Arc::clone(&state.global_stats);
let start_time = zx::Time::get_monotonic();
let trace_guard = Arc::clone(&state.trace_guard);
let trace_id = state.trace_id;
.on_timeout((timeout - elapsed_time).after_now(), move || {
warn!(identity = ?unpopulated_for_timeout.identity.moniker,
let result = PopulatedInspectDataContainer {
identity: Arc::clone(&unpopulated_for_timeout.identity),
inspect_matcher: unpopulated_for_timeout.inspect_matcher.clone(),
snapshot: SnapshotData::failed(
schema::InspectError { message: TIMEOUT_MESSAGE.to_string() },
State {
status: Status::Pending(VecDeque::new()),
unpopulated: unpopulated_for_timeout,
batch_timeout: timeout,
elapsed_time: elapsed_time + (zx::Time::get_monotonic() - start_time),
mod test {
use super::*;
use fuchsia_inspect::Node;
use fuchsia_zircon::DurationNum;
use futures::StreamExt;
lazy_static! {
static ref EMPTY_IDENTITY: Arc<ComponentIdentity> = Arc::new(ComponentIdentity::unknown());
async fn population_times_out() {
// Simulate a directory that hangs indefinitely in any request so that we consistently
// trigger the 0 timeout.
let (directory, mut stream) =
fasync::Task::spawn(async move {
while {
let container = UnpopulatedInspectDataContainer {
identity: EMPTY_IDENTITY.clone(),
inspect_handles: vec![directory.into()],
inspect_matcher: None,
let mut stream = container.populate(
let res =;
assert_eq!(, None);
vec![schema::InspectError { message: TIMEOUT_MESSAGE.to_string() }]
async fn no_inspect_files_do_not_give_an_error_response() {
let directory =
fuchsia_fs::directory::open_in_namespace("/tmp", fuchsia_fs::OpenFlags::RIGHT_READABLE)
let container = UnpopulatedInspectDataContainer {
identity: EMPTY_IDENTITY.clone(),
inspect_handles: vec![directory.into()],
inspect_matcher: None,
let mut stream = container.populate(
fn only_one_directory_proxy_is_populated() {
let _executor = fuchsia_async::LocalExecutor::new();
let (directory, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let (mut container, _rx) = InspectArtifactsContainer::new(directory);
let (directory2, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();