blob: f4d44f95d12c05da0fdedbed5c5e985b398bcb1e [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
constants,
diagnostics::GlobalConnectionStats,
identity::ComponentIdentity,
inspect::collector::{self as collector, InspectData},
};
use diagnostics_data::{self as schema, InspectHandleName};
use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
use fidl::endpoints::Proxy;
use fidl_fuchsia_inspect::TreeProxy;
use fidl_fuchsia_io as fio;
use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
use fuchsia_inspect::reader::snapshot::{Snapshot, SnapshotTree};
use fuchsia_trace as ftrace;
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::{channel::oneshot, FutureExt, Stream};
use inspect_fidl_load as deprecated_inspect;
use lazy_static::lazy_static;
use std::time::Duration;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use tracing::warn;
#[derive(Debug, Clone)]
pub enum InspectHandle {
Tree(TreeProxy, Option<InspectHandleName>),
Directory(fio::DirectoryProxy),
}
impl InspectHandle {
pub fn is_closed(&self) -> bool {
match self {
Self::Directory(proxy) => proxy.as_channel().is_closed(),
Self::Tree(proxy, _) => proxy.as_channel().is_closed(),
}
}
pub async fn on_closed(&self) -> Result<zx::Signals, zx::Status> {
match self {
Self::Tree(proxy, _) => proxy.on_closed().await,
Self::Directory(proxy) => proxy.on_closed().await,
}
}
pub fn koid(&self) -> zx::Koid {
match self {
Self::Directory(proxy) => {
proxy.as_channel().as_handle_ref().get_koid().expect("DirectoryProxy has koid")
}
Self::Tree(proxy, _) => {
proxy.as_channel().as_handle_ref().get_koid().expect("TreeProxy has koid")
}
}
}
pub fn from_named_tree_proxy(proxy: TreeProxy, name: Option<String>) -> Self {
InspectHandle::Tree(proxy, name.map(InspectHandleName::name))
}
pub fn is_tree(&self) -> bool {
matches!(self, Self::Tree(_, _))
}
}
impl From<fio::DirectoryProxy> for InspectHandle {
fn from(proxy: fio::DirectoryProxy) -> Self {
InspectHandle::Directory(proxy)
}
}
impl From<TreeProxy> for InspectHandle {
fn from(proxy: TreeProxy) -> Self {
InspectHandle::Tree(proxy, None)
}
}
#[derive(Debug)]
pub struct InspectArtifactsContainer {
/// One or more proxies that this container is configured for.
inspect_handles: HashMap<zx::Koid, Arc<InspectHandle>>,
on_closed_tasks: Vec<fasync::Task<()>>,
}
impl InspectArtifactsContainer {
/// Create a new `InspectArtifactsContainer`.
///
/// Returns itself and a `Receiver` that resolves into the koid of the input `proxy`
/// when that proxy closes.
pub fn new(proxy: impl Into<InspectHandle>) -> (Self, oneshot::Receiver<zx::Koid>) {
let mut this = Self { inspect_handles: HashMap::new(), on_closed_tasks: vec![] };
// safe to unwrap as diagnostics_proxy is guaranteed to be empty
let rx = this.push_handle(proxy).unwrap();
(this, rx)
}
/// Remove a handle via its `koid` from the set of proxies managed by `self`.
pub fn remove_handle(&mut self, koid: zx::Koid) -> usize {
self.inspect_handles.remove(&koid);
self.inspect_handles.len()
}
/// Push a new handle into the container.
///
/// Returns `None` if the handle is a DirectoryProxy and there is already one tracked,
/// as only single handles are supported in the DirectoryProxy case.
pub fn push_handle(
&mut self,
new_handle: impl Into<InspectHandle>,
) -> Option<oneshot::Receiver<zx::Koid>> {
let handle = new_handle.into();
if !self.inspect_handles.is_empty() && matches!(handle, InspectHandle::Directory(_)) {
return None;
}
let handle = Arc::new(handle);
let koid = handle.koid();
self.inspect_handles.insert(koid, Arc::clone(&handle));
let (task, rcv) = Self::on_closed_task(handle, koid);
self.on_closed_tasks.push(task);
Some(rcv)
}
/// Generate an `UnpopulatedInspectDataContainer` from the proxies managed by `self`.
///
/// Returns `None` if there are no valid proxies.
pub fn create_unpopulated(
&self,
identity: &Arc<ComponentIdentity>,
matcher: Option<Arc<HierarchyMatcher>>,
) -> Option<UnpopulatedInspectDataContainer> {
if self.inspect_handles.is_empty() {
return None;
}
// we know there is at least 1
let handle = self.inspect_handles.values().next().unwrap();
match handle.as_ref() {
// there can only be one Directory, semantically
InspectHandle::Directory(dir) if self.inspect_handles.len() == 1 => {
fuchsia_fs::directory::clone_no_describe(dir, None).ok().map(|directory| {
UnpopulatedInspectDataContainer {
identity: Arc::clone(identity),
inspect_handles: vec![directory.into()],
inspect_matcher: matcher,
}
})
}
InspectHandle::Tree(_, _) => Some(UnpopulatedInspectDataContainer {
identity: Arc::clone(identity),
inspect_matcher: matcher,
inspect_handles: self
.inspect_handles
.values()
.filter(|handle| handle.as_ref().is_tree())
.map(|handle| handle.as_ref().clone())
.collect::<Vec<InspectHandle>>(),
}),
_ => None,
}
}
fn on_closed_task(
proxy: Arc<InspectHandle>,
koid: zx::Koid,
) -> (fasync::Task<()>, oneshot::Receiver<zx::Koid>) {
let (snd, rcv) = oneshot::channel();
(
fasync::Task::spawn(async move {
if !proxy.is_closed() {
let _ = proxy.on_closed().await;
}
let _ = snd.send(koid);
}),
rcv,
)
}
}
#[cfg(test)]
impl InspectArtifactsContainer {
pub(crate) fn handles(&self) -> impl ExactSizeIterator<Item = &Arc<InspectHandle>> {
self.inspect_handles.values()
}
}
lazy_static! {
static ref TIMEOUT_MESSAGE: &'static str =
"Exceeded per-component time limit for fetching diagnostics data";
}
#[derive(Debug)]
pub enum ReadSnapshot {
Single(Snapshot),
Tree(SnapshotTree),
Finished(DiagnosticsHierarchy),
}
/// Packet containing a snapshot and all the metadata needed to
/// populate a diagnostics schema for that snapshot.
#[derive(Debug)]
pub struct SnapshotData {
/// Optional name of the file or InspectSink proxy that created this snapshot.
pub name: Option<InspectHandleName>,
/// Timestamp at which this snapshot resolved or failed.
pub timestamp: zx::Time,
/// Errors encountered when processing this snapshot.
pub errors: Vec<schema::InspectError>,
/// Optional snapshot of the inspect hierarchy, in case reading fails
/// and we have errors to share with client.
pub snapshot: Option<ReadSnapshot>,
}
impl SnapshotData {
async fn new(
name: Option<InspectHandleName>,
data: InspectData,
lazy_child_timeout: zx::Duration,
identity: &ComponentIdentity,
parent_trace_id: ftrace::Id,
) -> SnapshotData {
let trace_id = ftrace::Id::random();
let _trace_guard = ftrace::async_enter!(
trace_id,
c"app",
c"SnapshotData::new",
// An async duration cannot have multiple concurrent child async durations
// so we include the nonce as metadata to manually determine relationship.
"parent_trace_id" => u64::from(parent_trace_id),
"trace_id" => u64::from(trace_id),
"moniker" => identity.to_string().as_ref(),
"filename" => name
.as_ref()
.and_then(InspectHandleName::as_filename)
.unwrap_or(""),
"name" => name
.as_ref()
.and_then(InspectHandleName::as_name)
.unwrap_or("")
);
match data {
InspectData::Tree(tree) => {
let lazy_child_timeout =
Duration::from_nanos(lazy_child_timeout.into_nanos() as u64);
match SnapshotTree::try_from_with_timeout(&tree, lazy_child_timeout).await {
Ok(snapshot_tree) => {
SnapshotData::successful(ReadSnapshot::Tree(snapshot_tree), name)
}
Err(e) => SnapshotData::failed(
schema::InspectError { message: format!("{e:?}") },
name,
),
}
}
InspectData::DeprecatedFidl(inspect_proxy) => {
match deprecated_inspect::load_hierarchy(inspect_proxy).await {
Ok(hierarchy) => {
SnapshotData::successful(ReadSnapshot::Finished(hierarchy), name)
}
Err(e) => SnapshotData::failed(
schema::InspectError { message: format!("{e:?}") },
name,
),
}
}
InspectData::Vmo(vmo) => match Snapshot::try_from(&vmo) {
Ok(snapshot) => SnapshotData::successful(ReadSnapshot::Single(snapshot), name),
Err(e) => {
SnapshotData::failed(schema::InspectError { message: format!("{e:?}") }, name)
}
},
InspectData::File(contents) => match Snapshot::try_from(contents) {
Ok(snapshot) => SnapshotData::successful(ReadSnapshot::Single(snapshot), name),
Err(e) => {
SnapshotData::failed(schema::InspectError { message: format!("{e:?}") }, name)
}
},
}
}
// Constructs packet that timestamps and packages inspect snapshot for exfiltration.
fn successful(snapshot: ReadSnapshot, name: Option<InspectHandleName>) -> SnapshotData {
SnapshotData {
name,
timestamp: fasync::Time::now().into_zx(),
errors: Vec::new(),
snapshot: Some(snapshot),
}
}
// Constructs packet that timestamps and packages inspect snapshot failure for exfiltration.
fn failed(error: schema::InspectError, name: Option<InspectHandleName>) -> SnapshotData {
SnapshotData {
name,
timestamp: fasync::Time::now().into_zx(),
errors: vec![error],
snapshot: None,
}
}
}
/// PopulatedInspectDataContainer is the container that
/// holds the actual Inspect data for a given component,
/// along with all information needed to transform that data
/// to be returned to the client.
pub struct PopulatedInspectDataContainer {
pub identity: Arc<ComponentIdentity>,
/// Vector of all the snapshots of inspect hierarchies under
/// the diagnostics directory of the component identified by
/// moniker, along with the metadata needed to populate
/// this snapshot's diagnostics schema.
pub snapshot: SnapshotData,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
pub inspect_matcher: Option<Arc<HierarchyMatcher>>,
}
enum Status {
Begin,
Pending(VecDeque<(Option<InspectHandleName>, InspectData)>),
}
struct State {
status: Status,
unpopulated: Arc<UnpopulatedInspectDataContainer>,
batch_timeout: zx::Duration,
elapsed_time: zx::Duration,
global_stats: Arc<GlobalConnectionStats>,
trace_guard: Arc<Option<ftrace::AsyncScope>>,
trace_id: ftrace::Id,
}
impl State {
fn into_pending(
self,
pending: VecDeque<(Option<InspectHandleName>, InspectData)>,
start_time: zx::Time,
) -> Self {
Self {
unpopulated: self.unpopulated,
status: Status::Pending(pending),
batch_timeout: self.batch_timeout,
global_stats: self.global_stats,
elapsed_time: self.elapsed_time + (zx::Time::get_monotonic() - start_time),
trace_guard: self.trace_guard,
trace_id: self.trace_id,
}
}
fn add_elapsed_time(&mut self, start_time: zx::Time) {
self.elapsed_time += zx::Time::get_monotonic() - start_time
}
async fn iterate(
mut self,
start_time: zx::Time,
) -> Option<(PopulatedInspectDataContainer, State)> {
loop {
match &mut self.status {
Status::Begin => {
let data_map =
collector::populate_data_map(&self.unpopulated.inspect_handles).await;
self = self
.into_pending(data_map.into_iter().collect::<VecDeque<_>>(), start_time);
}
Status::Pending(ref mut pending) => match pending.pop_front() {
None => {
self.global_stats.record_component_duration(
self.unpopulated.identity.moniker.to_string(),
self.elapsed_time + (zx::Time::get_monotonic() - start_time),
);
return None;
}
Some((name, data)) => {
let snapshot = SnapshotData::new(
name,
data,
self.batch_timeout / constants::LAZY_NODE_TIMEOUT_PROPORTION,
&self.unpopulated.identity,
self.trace_id,
)
.await;
let result = PopulatedInspectDataContainer {
identity: Arc::clone(&self.unpopulated.identity),
snapshot,
inspect_matcher: self.unpopulated.inspect_matcher.clone(),
};
self.add_elapsed_time(start_time);
return Some((result, self));
}
},
}
}
}
}
/// UnpopulatedInspectDataContainer is the container that holds
/// all information needed to retrieve Inspect data
/// for a given component, when requested.
#[derive(Debug)]
pub struct UnpopulatedInspectDataContainer {
pub identity: Arc<ComponentIdentity>,
/// Proxies configured for container. It is an invariant that if any value is an
/// InspectHandle::Directory, then there is exactly one value.
pub inspect_handles: Vec<InspectHandle>,
/// Optional hierarchy matcher. If unset, the reader is running
/// in all-access mode, meaning no matching or filtering is required.
pub inspect_matcher: Option<Arc<HierarchyMatcher>>,
}
impl UnpopulatedInspectDataContainer {
/// Populates this data container with a timeout. On the timeout firing returns a
/// container suitable to return to clients, but with timeout error information recorded.
pub fn populate(
self,
timeout: i64,
global_stats: Arc<GlobalConnectionStats>,
parent_trace_id: ftrace::Id,
) -> impl Stream<Item = PopulatedInspectDataContainer> {
let trace_id = ftrace::Id::random();
let trace_guard = ftrace::async_enter!(
trace_id,
c"app",
c"ReaderServer::stream.populate",
// An async duration cannot have multiple concurrent child async durations
// so we include the nonce as metadata to manually determine relationship.
"parent_trace_id" => u64::from(parent_trace_id),
"trace_id" => u64::from(trace_id),
"moniker" => self.identity.to_string().as_ref()
);
let this = Arc::new(self);
let state = State {
status: Status::Begin,
unpopulated: this,
batch_timeout: zx::Duration::from_seconds(timeout),
global_stats,
elapsed_time: zx::Duration::from_nanos(0),
trace_guard: Arc::new(trace_guard),
trace_id,
};
futures::stream::unfold(state, |state| {
let unpopulated_for_timeout = Arc::clone(&state.unpopulated);
let timeout = state.batch_timeout;
let elapsed_time = state.elapsed_time;
let global_stats = Arc::clone(&state.global_stats);
let start_time = zx::Time::get_monotonic();
let trace_guard = Arc::clone(&state.trace_guard);
let trace_id = state.trace_id;
state
.iterate(start_time)
.on_timeout((timeout - elapsed_time).after_now(), move || {
warn!(identity = ?unpopulated_for_timeout.identity.moniker,
"{}", &*TIMEOUT_MESSAGE);
global_stats.add_timeout();
let result = PopulatedInspectDataContainer {
identity: Arc::clone(&unpopulated_for_timeout.identity),
inspect_matcher: unpopulated_for_timeout.inspect_matcher.clone(),
snapshot: SnapshotData::failed(
schema::InspectError { message: TIMEOUT_MESSAGE.to_string() },
None,
),
};
Some((
result,
State {
status: Status::Pending(VecDeque::new()),
unpopulated: unpopulated_for_timeout,
batch_timeout: timeout,
global_stats,
elapsed_time: elapsed_time + (zx::Time::get_monotonic() - start_time),
trace_guard,
trace_id,
},
))
})
.boxed()
})
}
}
#[cfg(test)]
mod test {
use super::*;
use fuchsia_inspect::Node;
use fuchsia_zircon::DurationNum;
use futures::StreamExt;
lazy_static! {
static ref EMPTY_IDENTITY: Arc<ComponentIdentity> = Arc::new(ComponentIdentity::unknown());
}
#[fuchsia::test]
async fn population_times_out() {
// Simulate a directory that hangs indefinitely in any request so that we consistently
// trigger the 0 timeout.
let (directory, mut stream) =
fidl::endpoints::create_proxy_and_stream::<fio::DirectoryMarker>().unwrap();
fasync::Task::spawn(async move {
while stream.next().await.is_some() {
fasync::Timer::new(fasync::Time::after(100000.second())).await;
}
})
.detach();
let container = UnpopulatedInspectDataContainer {
identity: EMPTY_IDENTITY.clone(),
inspect_handles: vec![directory.into()],
inspect_matcher: None,
};
let mut stream = container.populate(
0,
Arc::new(GlobalConnectionStats::new(Node::default())),
ftrace::Id::random(),
);
let res = stream.next().await.unwrap();
assert_eq!(res.snapshot.name, None);
assert_eq!(
res.snapshot.errors,
vec![schema::InspectError { message: TIMEOUT_MESSAGE.to_string() }]
);
}
#[fuchsia::test]
async fn no_inspect_files_do_not_give_an_error_response() {
let directory =
fuchsia_fs::directory::open_in_namespace("/tmp", fuchsia_fs::OpenFlags::RIGHT_READABLE)
.unwrap();
let container = UnpopulatedInspectDataContainer {
identity: EMPTY_IDENTITY.clone(),
inspect_handles: vec![directory.into()],
inspect_matcher: None,
};
let mut stream = container.populate(
1000000,
Arc::new(GlobalConnectionStats::new(Node::default())),
ftrace::Id::random(),
);
assert!(stream.next().await.is_none());
}
#[fuchsia::test]
fn only_one_directory_proxy_is_populated() {
let _executor = fuchsia_async::LocalExecutor::new();
let (directory, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
let (mut container, _rx) = InspectArtifactsContainer::new(directory);
let (directory2, _) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>().unwrap();
assert!(container.push_handle(directory2).is_none());
}
}