blob: 7f3613ebb75f248afd4585a0497338c9421b7ae0 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use async_stream::stream;
use bitflags::bitflags;
use diagnostics_data::{DiagnosticsData, Metadata, MetadataError};
use fidl_fuchsia_diagnostics::{
ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
Selector, SelectorArgument, StreamMode, StreamParameters,
};
use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
use fuchsia_component::client;
use fuchsia_sync::Mutex;
use fuchsia_zircon::{self as zx, Duration, DurationNum};
use futures::channel::mpsc;
use futures::prelude::*;
use futures::sink::SinkExt;
use futures::stream::FusedStream;
use pin_project::pin_project;
use serde::Deserialize;
use std::future::ready;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use thiserror::Error;
pub use diagnostics_data::{Data, Inspect, Logs, Severity};
pub use diagnostics_hierarchy::{hierarchy, DiagnosticsHierarchy, Property};
const RETRY_DELAY_MS: i64 = 300;
/// Errors that this library can return
#[derive(Debug, Error)]
pub enum Error {
#[error("Failed to connect to the archive accessor")]
ConnectToArchive(#[source] anyhow::Error),
#[error("Failed to create the BatchIterator channel ends")]
CreateIteratorProxy(#[source] fidl::Error),
#[error("Failed to stream diagnostics from the accessor")]
StreamDiagnostics(#[source] fidl::Error),
#[error("Failed to call iterator server")]
GetNextCall(#[source] fidl::Error),
#[error("Received error from the GetNext response: {0:?}")]
GetNextReaderError(ReaderError),
#[error("Failed to read json received")]
ReadJson(#[source] serde_json::Error),
#[error("Failed to read cbor received")]
ReadCbor(#[source] serde_cbor::Error),
#[error("Failed to parse the diagnostics data from the json received")]
ParseDiagnosticsData(#[source] serde_json::Error),
#[error("Failed to read vmo from the response")]
ReadVmo(#[source] zx::Status),
}
/// An inspect tree selector for a component.
pub struct ComponentSelector {
moniker: Vec<String>,
tree_selectors: Vec<String>,
}
impl ComponentSelector {
/// Create a new component event selector.
/// By default it will select the whole tree unless tree selectors are provided.
/// `moniker` is the realm path relative to the realm of the running component plus the
/// component name. For example: [a, b, component].
pub fn new(moniker: Vec<String>) -> Self {
Self { moniker, tree_selectors: Vec::new() }
}
/// Select a section of the inspect tree.
pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
self.tree_selectors.push(tree_selector.into());
self
}
fn moniker_str(&self) -> String {
self.moniker.join("/")
}
}
pub trait ToSelectorArguments {
fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
}
impl ToSelectorArguments for String {
fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
Box::new([SelectorArgument::RawSelector(self)].into_iter())
}
}
impl ToSelectorArguments for &str {
fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
}
}
impl ToSelectorArguments for ComponentSelector {
fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
let moniker = self.moniker_str();
// If not tree selectors were provided, select the full tree.
if self.tree_selectors.is_empty() {
Box::new([SelectorArgument::RawSelector(format!("{}:root", moniker))].into_iter())
} else {
Box::new(
self.tree_selectors
.into_iter()
.map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
)
}
}
}
impl ToSelectorArguments for Selector {
fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
}
}
// Before unsealing this, consider whether your code belongs in this file.
pub trait SerializableValue: private::Sealed {
const FORMAT_OF_VALUE: Format;
}
pub trait CheckResponse: private::Sealed {
fn has_payload(&self) -> bool;
fn was_fully_filtered(&self) -> bool;
}
// The "sealed trait" pattern.
//
// https://rust-lang.github.io/api-guidelines/future-proofing.html
mod private {
pub trait Sealed {}
}
impl private::Sealed for serde_json::Value {}
impl private::Sealed for serde_cbor::Value {}
impl<D: DiagnosticsData> private::Sealed for Data<D> {}
impl<D: DiagnosticsData> CheckResponse for Data<D> {
fn has_payload(&self) -> bool {
self.payload.is_some()
}
fn was_fully_filtered(&self) -> bool {
self.metadata
.errors()
.map(|errors| {
errors
.iter()
.filter_map(|error| error.message())
.any(|msg| msg.contains("Inspect hierarchy was fully filtered"))
})
.unwrap_or(false)
}
}
impl SerializableValue for serde_json::Value {
const FORMAT_OF_VALUE: Format = Format::Json;
}
const FULLY_FILTERED_MSG: &str = "Inspect hierarchy was fully filtered";
impl CheckResponse for serde_json::Value {
fn has_payload(&self) -> bool {
match self {
serde_json::Value::Object(obj) => {
obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
}
_ => false,
}
}
fn was_fully_filtered(&self) -> bool {
self.as_object()
.and_then(|obj| obj.get("metadata"))
.and_then(|metadata| metadata.as_object())
.and_then(|metadata| metadata.get("errors"))
.and_then(|errors| errors.as_array())
.map(|errors| {
errors
.iter()
.filter_map(|error| error.as_object())
.filter_map(|error| error.get("message"))
.filter_map(|msg| msg.as_str())
.any(|message| message.starts_with(FULLY_FILTERED_MSG))
})
.unwrap_or(false)
}
}
impl SerializableValue for serde_cbor::Value {
const FORMAT_OF_VALUE: Format = Format::Cbor;
}
impl CheckResponse for serde_cbor::Value {
fn has_payload(&self) -> bool {
match self {
serde_cbor::Value::Map(m) => m
.get(&serde_cbor::Value::Text("payload".into()))
.map(|p| !matches!(p, serde_cbor::Value::Null))
.is_some(),
_ => false,
}
}
fn was_fully_filtered(&self) -> bool {
let this = match self {
serde_cbor::Value::Map(m) => m,
_ => unreachable!("Only objects are expected"),
};
let metadata = match this.get(&serde_cbor::Value::Text("metadata".into())) {
Some(serde_cbor::Value::Map(m)) => m,
_ => unreachable!("All objects have a metadata field"),
};
let errors = match metadata.get(&serde_cbor::Value::Text("errors".into())) {
None => return false, // if no errors, then we can't possibly be fully filtered.
Some(serde_cbor::Value::Array(a)) => a,
_ => unreachable!("We either get an array or we get nothing"),
};
errors
.iter()
.filter_map(|error| match error {
serde_cbor::Value::Map(m) => Some(m),
_ => None,
})
.filter_map(|error| error.get(&serde_cbor::Value::Text("message".into())))
.filter_map(|msg| match msg {
serde_cbor::Value::Text(s) => Some(s),
_ => None,
})
.any(|message| message.starts_with(FULLY_FILTERED_MSG))
}
}
bitflags! {
/// Retry configuration for ArchiveReader.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RetryConfig: u8 {
/// ArchiveReader will retry on empty responses.
const EMPTY = 0b00000001;
/// ArchiveReader will retry when the returned hierarchy has been fully filtered.
const FULLY_FILTERED = 0x00000010;
}
}
impl RetryConfig {
pub fn always() -> Self {
Self::all()
}
pub fn never() -> Self {
Self::empty()
}
fn on_empty(&self) -> bool {
self.intersects(Self::EMPTY)
}
fn on_fully_filtered(&self) -> bool {
self.intersects(Self::FULLY_FILTERED)
}
}
/// Utility for reading inspect data of a running component using the injected Archive
/// Reader service.
#[derive(Clone)]
pub struct ArchiveReader {
archive: Arc<Mutex<Option<ArchiveAccessorProxy>>>,
selectors: Vec<SelectorArgument>,
retry_config: RetryConfig,
minimum_schema_count: usize,
timeout: Option<Duration>,
batch_retrieval_timeout_seconds: Option<i64>,
max_aggregated_content_size_bytes: Option<u64>,
}
impl ArchiveReader {
/// Creates a new data fetcher with default configuration:
/// - Maximum retries: 2^64-1
/// - Timeout: Never. Use with_timeout() to set a timeout.
pub fn new() -> Self {
Self {
timeout: None,
selectors: vec![],
retry_config: RetryConfig::always(),
archive: Arc::new(Mutex::new(None)),
minimum_schema_count: 1,
batch_retrieval_timeout_seconds: None,
max_aggregated_content_size_bytes: None,
}
}
pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
{
let mut arc = self.archive.lock();
*arc = Some(archive);
}
self
}
/// Requests a single component tree (or sub-tree).
pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
self.selectors.extend(selector.to_selector_arguments());
self
}
/// Requests all data for the component identified by the given moniker.
pub fn select_all_for_moniker(&mut self, moniker: &str) -> &mut Self {
let selector = format!("{}:root", selectors::sanitize_moniker_for_selectors(&moniker));
self.add_selector(selector)
}
/// Sets a custom retry configuration. By default we always retry.
pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
self.retry_config = config;
self
}
// TODO(b/308979621): soft-migrate and remove.
#[doc(hidden)]
pub fn retry_if_empty(&mut self, enable: bool) -> &mut Self {
if enable {
self.retry_config = self.retry_config | RetryConfig::EMPTY;
} else {
self.retry_config = self.retry_config - RetryConfig::EMPTY;
}
self
}
pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
where
T: Iterator<Item = S>,
S: ToSelectorArguments,
{
for selector in selectors {
self.add_selector(selector);
}
self
}
/// Sets the maximum time to wait for a response from the Archive.
/// Do not use in tests unless timeout is the expected behavior.
pub fn with_timeout(&mut self, duration: Duration) -> &mut Self {
self.timeout = Some(duration);
self
}
pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
self.max_aggregated_content_size_bytes = Some(limit_bytes);
self
}
/// Set the maximum time to wait for a wait for a single component
/// to have its diagnostics data "pumped".
pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
self.batch_retrieval_timeout_seconds = Some(timeout);
self
}
/// Sets the minumum number of schemas expected in a result in order for the
/// result to be considered a success.
pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
self.minimum_schema_count = minimum_schema_count;
self
}
/// Connects to the ArchiveAccessor and returns data matching provided selectors.
pub async fn snapshot<D>(&self) -> Result<Vec<Data<D>>, Error>
where
D: DiagnosticsData,
{
let data_future = self.snapshot_inner::<D, Data<D>>(Format::Cbor);
let data = match self.timeout {
Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
None => data_future.await?,
};
Ok(data)
}
/// Connects to the ArchiveAccessor and returns a stream of data containing a snapshot of the
/// current buffer in the Archivist as well as new data that arrives.
pub fn snapshot_then_subscribe<D>(&self) -> Result<Subscription<Data<D>>, Error>
where
D: DiagnosticsData + 'static,
{
let iterator = self.batch_iterator::<D>(StreamMode::SnapshotThenSubscribe, Format::Cbor)?;
Ok(Subscription::new(iterator))
}
/// Connects to the ArchiveAccessor and returns inspect data matching provided selectors.
/// Returns the raw json for each hierarchy fetched.
pub async fn snapshot_raw<D, T>(&self) -> Result<T, Error>
where
D: DiagnosticsData,
T: for<'a> Deserialize<'a> + SerializableValue + From<Vec<T>> + CheckResponse,
{
let data_future = self.snapshot_inner::<D, T>(T::FORMAT_OF_VALUE);
let data = match self.timeout {
Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
None => data_future.await?,
};
Ok(T::from(data))
}
/// Connects to the ArchiveAccessor and returns a stream of data containing a snapshot of the
/// current buffer in the Archivist as well as new data that arrives.
pub fn snapshot_then_subscribe_raw<D, T: SerializableValue + 'static>(
&self,
) -> Result<Subscription<T>, Error>
where
D: DiagnosticsData + 'static,
T: for<'a> Deserialize<'a> + Send,
{
let iterator =
self.batch_iterator::<D>(StreamMode::SnapshotThenSubscribe, T::FORMAT_OF_VALUE)?;
Ok(Subscription::new(iterator))
}
async fn snapshot_inner<D, T>(&self, format: Format) -> Result<Vec<T>, Error>
where
D: DiagnosticsData,
T: for<'a> Deserialize<'a> + CheckResponse,
{
loop {
let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
let result = drain_batch_iterator::<T>(iterator)
.filter_map(|value| ready(value.ok()))
.collect::<Vec<_>>()
.await;
if (self.retry_config.on_empty() && result.len() < self.minimum_schema_count)
|| (self.retry_config.on_fully_filtered()
&& result
.iter()
.any(|entry| !entry.has_payload() && entry.was_fully_filtered()))
{
fasync::Timer::new(fasync::Time::after(RETRY_DELAY_MS.millis())).await;
} else {
return Ok(result);
}
}
}
fn batch_iterator<D>(
&self,
mode: StreamMode,
format: Format,
) -> Result<BatchIteratorProxy, Error>
where
D: DiagnosticsData,
{
// TODO(https://fxbug.dev/42135966) this should be done in an ArchiveReaderBuilder -> Reader init
let mut archive = self.archive.lock();
if archive.is_none() {
*archive = Some(
client::connect_to_protocol::<ArchiveAccessorMarker>()
.map_err(Error::ConnectToArchive)?,
)
}
let archive = archive.as_ref().unwrap();
let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>()
.map_err(Error::CreateIteratorProxy)?;
let mut stream_parameters = StreamParameters::default();
stream_parameters.stream_mode = Some(mode);
stream_parameters.data_type = Some(D::DATA_TYPE);
stream_parameters.format = Some(format);
stream_parameters.client_selector_configuration = if self.selectors.is_empty() {
Some(ClientSelectorConfiguration::SelectAll(true))
} else {
Some(ClientSelectorConfiguration::Selectors(self.selectors.iter().cloned().collect()))
};
stream_parameters.performance_configuration = Some(PerformanceConfiguration {
max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
..Default::default()
});
archive
.stream_diagnostics(&stream_parameters, server_end)
.map_err(Error::StreamDiagnostics)?;
Ok(iterator)
}
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum OneOrMany<T> {
Many(Vec<T>),
One(T),
}
fn drain_batch_iterator<T>(iterator: BatchIteratorProxy) -> impl Stream<Item = Result<T, Error>>
where
T: for<'a> Deserialize<'a>,
{
stream! {
loop {
let next_batch = iterator
.get_next()
.await
.map_err(Error::GetNextCall)?
.map_err(Error::GetNextReaderError)?;
if next_batch.is_empty() {
// End of stream
return;
}
for formatted_content in next_batch {
let output: OneOrMany<T> = match formatted_content {
FormattedContent::Json(data) => {
let mut buf = vec![0; data.size as usize];
data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
serde_json::from_slice(&buf).map_err(Error::ReadJson)?
}
FormattedContent::Cbor(vmo) => {
let mut buf =
vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
serde_cbor::from_slice(&buf).map_err(Error::ReadCbor)?
}
_ => OneOrMany::Many(vec![]),
};
match output {
OneOrMany::One(data) => yield Ok(data),
OneOrMany::Many(datas) => {
for data in datas {
yield Ok(data);
}
}
}
}
}
}
}
#[pin_project]
pub struct Subscription<T> {
#[pin]
recv: Pin<Box<dyn FusedStream<Item = Result<T, Error>> + Send>>,
}
const DATA_CHANNEL_SIZE: usize = 32;
const ERROR_CHANNEL_SIZE: usize = 2;
impl<T> Subscription<T>
where
T: for<'a> Deserialize<'a> + Send + 'static,
{
/// Creates a new subscription stream to a batch iterator.
/// The stream will return diagnostics data structures.
pub fn new(iterator: BatchIteratorProxy) -> Self {
Subscription { recv: Box::pin(drain_batch_iterator::<T>(iterator).fuse()) }
}
/// Splits the subscription into two separate streams: results and errors.
pub fn split_streams(mut self) -> (SubscriptionResultsStream<T>, mpsc::Receiver<Error>) {
let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
let _drain_task = fasync::Task::spawn(async move {
while let Some(result) = self.next().await {
match result {
Ok(value) => results_sender.send(value).await.ok(),
Err(e) => errors_sender.send(e).await.ok(),
};
}
});
(SubscriptionResultsStream { recv, _drain_task }, errors)
}
}
impl<T> Stream for Subscription<T>
where
T: for<'a> Deserialize<'a>,
{
type Item = Result<T, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.recv.poll_next(cx)
}
}
impl<T> FusedStream for Subscription<T>
where
T: for<'a> Deserialize<'a>,
{
fn is_terminated(&self) -> bool {
self.recv.is_terminated()
}
}
#[pin_project]
pub struct SubscriptionResultsStream<T> {
#[pin]
recv: mpsc::Receiver<T>,
_drain_task: fasync::Task<()>,
}
impl<T> Stream for SubscriptionResultsStream<T>
where
T: for<'a> Deserialize<'a>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.recv.poll_next(cx)
}
}
impl<T> FusedStream for SubscriptionResultsStream<T>
where
T: for<'a> Deserialize<'a>,
{
fn is_terminated(&self) -> bool {
self.recv.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::*;
use diagnostics_assertions::assert_data_tree;
use diagnostics_log::{Publisher, PublisherOptions};
use fuchsia_component_test::{
Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
};
use futures::TryStreamExt;
use tracing::{error, info};
use {
fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_logger as flogger,
fuchsia_zircon as zx,
};
const TEST_COMPONENT_URL: &str =
"fuchsia-pkg://fuchsia.com/diagnostics-reader-tests#meta/inspect_test_component.cm";
async fn start_component() -> Result<RealmInstance, anyhow::Error> {
let builder = RealmBuilder::new().await?;
let test_component = builder
.add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
.await?;
builder
.add_route(
Route::new()
.capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
.from(Ref::parent())
.to(&test_component),
)
.await?;
let instance = builder.build().await?;
Ok(instance)
}
#[fuchsia::test]
async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
let instance = start_component().await?;
let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
let results = ArchiveReader::new()
.add_selector(format!("{component_selector}:root"))
.snapshot::<Inspect>()
.await?;
assert_eq!(results.len(), 1);
assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
int: 3u64,
"lazy-node": {
a: "test",
child: {
double: 3.25,
},
}
});
// add_selector can take either a String or a Selector.
let lazy_property_selector = Selector {
component_selector: Some(fdiagnostics::ComponentSelector {
moniker_segments: Some(vec![
fdiagnostics::StringSelector::ExactMatch(format!(
"realm_builder:{}",
instance.root.child_name()
)),
fdiagnostics::StringSelector::ExactMatch("test_component".into()),
]),
..Default::default()
}),
tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
fdiagnostics::PropertySelector {
node_path: vec![
fdiagnostics::StringSelector::ExactMatch("root".into()),
fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
],
target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
},
)),
..Default::default()
};
let int_property_selector = format!("{component_selector}:root:int");
let mut reader = ArchiveReader::new();
reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
let response = reader.snapshot::<Inspect>().await?;
assert_eq!(response.len(), 1);
assert_eq!(response[0].metadata.component_url, Some(TEST_COMPONENT_URL.to_string()));
assert_eq!(response[0].moniker, moniker);
assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
int: 3u64,
"lazy-node": {
a: "test"
}
});
Ok(())
}
#[fuchsia::test]
async fn select_all_for_moniker() {
let instance = start_component().await.expect("started component");
let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
let results = ArchiveReader::new()
.select_all_for_moniker(&moniker)
.snapshot::<Inspect>()
.await
.expect("snapshotted");
assert_eq!(results.len(), 1);
assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
int: 3u64,
"lazy-node": {
a: "test",
child: {
double: 3.25,
},
}
});
}
#[fuchsia::test]
async fn timeout() -> Result<(), anyhow::Error> {
let instance = start_component().await?;
let mut reader = ArchiveReader::new();
reader
.add_selector(format!(
"realm_builder\\:{}/test_component:root",
instance.root.child_name()
))
.with_timeout(0.nanos());
let result = reader.snapshot::<Inspect>().await;
assert!(result.unwrap().is_empty());
Ok(())
}
#[fuchsia::test]
async fn component_selector() {
let selector = ComponentSelector::new(vec!["a".to_string()]);
assert_eq!(selector.moniker_str(), "a");
let arguments: Vec<_> = selector.to_selector_arguments().collect();
assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
let selector =
ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
assert_eq!(selector.moniker_str(), "b/c/a");
let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
let arguments: Vec<_> = selector.to_selector_arguments().collect();
assert_eq!(
arguments,
vec![
SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
]
);
}
#[fuchsia::test]
async fn custom_archive() {
let proxy = spawn_fake_archive(serde_json::json!({
"moniker": "moniker",
"version": 1,
"data_source": "Inspect",
"metadata": {
"component_url": "component-url",
"timestamp": 0,
"filename": "filename",
},
"payload": {
"root": {
"x": 1,
}
}
}));
let result = ArchiveReader::new()
.with_archive(proxy)
.snapshot::<Inspect>()
.await
.expect("got result");
assert_eq!(result.len(), 1);
assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
}
#[fuchsia::test]
async fn handles_lists_correctly_on_snapshot_raw() {
let value = serde_json::json!({
"moniker": "moniker",
"version": 1,
"data_source": "Inspect",
"metadata": {
"component_url": "component-url",
"timestamp": 0,
"filename": "filename",
},
"payload": {
"root": {
"x": 1,
}
}
});
let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
let mut reader = ArchiveReader::new();
reader.with_archive(proxy);
let json_result =
reader.snapshot_raw::<Inspect, serde_json::Value>().await.expect("got result");
match json_result {
serde_json::Value::Array(values) => {
assert_eq!(values.len(), 1);
assert_eq!(values[0], value);
}
result => panic!("unexpected result: {:?}", result),
}
let cbor_result =
reader.snapshot_raw::<Inspect, serde_cbor::Value>().await.expect("got result");
match cbor_result {
serde_cbor::Value::Array(values) => {
assert_eq!(values.len(), 1);
let json_result =
serde_cbor::value::from_value::<serde_json::Value>(values[0].to_owned())
.expect("Should convert cleanly to JSON");
assert_eq!(json_result, value);
}
result => panic!("unexpected result: {:?}", result),
}
}
#[fuchsia::test]
async fn snapshot_then_subscribe() {
let (_instance, publisher, reader) = init_isolated_logging().await;
let (mut stream, _errors) =
reader.snapshot_then_subscribe::<Logs>().expect("subscribed to logs").split_streams();
tracing::subscriber::with_default(publisher, || {
info!("hello from test");
error!("error from test");
});
let log = stream.next().await.unwrap();
assert_eq!(log.msg().unwrap(), "hello from test");
let log = stream.next().await.unwrap();
assert_eq!(log.msg().unwrap(), "error from test");
}
#[fuchsia::test]
async fn snapshot_then_subscribe_raw() {
let (_instance, publisher, reader) = init_isolated_logging().await;
let (mut stream, _errors) = reader
.snapshot_then_subscribe_raw::<Logs, serde_json::Value>()
.expect("subscribed to logs")
.split_streams();
tracing::subscriber::with_default(publisher, || {
info!("hello from test");
error!("error from test");
});
let log = stream.next().await.unwrap();
assert_eq!(log["payload"]["root"]["message"]["value"], "hello from test");
let log = stream.next().await.unwrap();
assert_eq!(log["payload"]["root"]["message"]["value"], "error from test");
}
fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
let (proxy, mut stream) =
fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>()
.expect("create proxy");
fasync::Task::spawn(async move {
while let Some(request) = stream.try_next().await.expect("stream request") {
match request {
fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
result_stream,
..
} => {
let data = data_to_send.clone();
fasync::Task::spawn(async move {
let mut called = false;
let mut stream = result_stream.into_stream().expect("into stream");
while let Some(req) = stream.try_next().await.expect("stream request") {
match req {
fdiagnostics::BatchIteratorRequest::WaitForReady {
responder,
} => {
let _ = responder.send();
}
fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
if called {
responder.send(Ok(Vec::new())).expect("send response");
continue;
}
called = true;
let content = serde_json::to_string_pretty(&data)
.expect("json pretty");
let vmo_size = content.len() as u64;
let vmo =
zx::Vmo::create(vmo_size as u64).expect("create vmo");
vmo.write(content.as_bytes(), 0).expect("write vmo");
let buffer =
fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
responder
.send(Ok(vec![fdiagnostics::FormattedContent::Json(
buffer,
)]))
.expect("send response");
}
fdiagnostics::BatchIteratorRequest::_UnknownMethod {
..
} => {
unreachable!("Unexpected method call");
}
}
}
})
.detach();
}
fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
unreachable!("Unexpected method call");
}
}
}
})
.detach();
return proxy;
}
async fn create_realm() -> RealmBuilder {
let builder = RealmBuilder::new().await.expect("create realm builder");
let archivist = builder
.add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
.await
.expect("add child archivist");
builder
.add_route(
Route::new()
.capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
.capability(
Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
.optional(),
)
.capability(Capability::event_stream("stopped"))
.capability(Capability::event_stream("capability_requested"))
.from(Ref::parent())
.to(&archivist),
)
.await
.expect("added routes from parent to archivist");
builder
.add_route(
Route::new()
.capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
.capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
.from(&archivist)
.to(Ref::parent()),
)
.await
.expect("added routes from archivist to parent");
builder
}
async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader) {
let instance = create_realm().await.build().await.unwrap();
let log_sink_proxy =
instance.root.connect_to_protocol_at_exposed_dir::<flogger::LogSinkMarker>().unwrap();
let accessor_proxy = instance
.root
.connect_to_protocol_at_exposed_dir::<fdiagnostics::ArchiveAccessorMarker>()
.unwrap();
let mut reader = ArchiveReader::new();
reader.with_archive(accessor_proxy);
let options = PublisherOptions::default()
.wait_for_initial_interest(false)
.use_log_sink(log_sink_proxy);
let publisher = Publisher::new(options).unwrap();
(instance, publisher, reader)
}
}