blob: fdd5711712285081c42b6ebfa29c97e4cd9ae58c [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::pipeline::Pipeline,
diagnostics_data::{Data, LifecycleData},
futures::prelude::*,
parking_lot::RwLock,
selectors,
std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
},
};
pub mod container;
use container::LifecycleDataContainer;
/// LifecycleServer holds the state and data needed to serve Lifecycle data
/// reading requests for a single client.
///
/// diagnostics_repo: the DataRepo which holds the access-points for
/// all relevant lifecycle data.
pub struct LifecycleServer {
data: std::vec::IntoIter<LifecycleDataContainer>,
}
impl LifecycleServer {
pub fn new(diagnostics_pipeline: Arc<RwLock<Pipeline>>) -> Self {
LifecycleServer {
data: diagnostics_pipeline.read().fetch_lifecycle_event_data().into_iter(),
}
}
fn next_event(&mut self) -> Option<LifecycleData> {
self.data.next().map(|lifecycle_container| {
let sanitized_moniker = lifecycle_container
.identity
.relative_moniker
.iter()
.map(|s| selectors::sanitize_string_for_selectors(s))
.collect::<Vec<String>>()
.join("/");
Data::for_lifecycle_event(
sanitized_moniker,
lifecycle_container.lifecycle_type,
lifecycle_container.payload,
lifecycle_container.identity.url.clone(),
lifecycle_container.event_timestamp.into_nanos(),
Vec::new(),
)
})
}
}
impl Stream for LifecycleServer {
type Item = LifecycleData;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.next_event())
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
accessor::BatchIterator,
container::ComponentIdentity,
diagnostics::{self, ConnectionStats},
events::types::ComponentIdentifier,
inspect::collector::InspectDataCollector,
repository::DataRepo,
},
fdio,
fidl::endpoints::create_proxy_and_stream,
fidl_fuchsia_diagnostics::{BatchIteratorMarker, StreamMode},
fuchsia_async::{self as fasync, Task},
fuchsia_component::server::ServiceFs,
fuchsia_inspect::Inspector,
fuchsia_zircon as zx,
fuchsia_zircon::Peered,
futures::StreamExt,
std::path::PathBuf,
};
const TEST_URL: &'static str = "fuchsia-pkg://test";
#[fuchsia::test]
async fn reader_server_formatting() {
let path = PathBuf::from("/test-bindings3");
// Make a ServiceFs containing two files.
// One is an inspect file, and one is not.
let mut fs = ServiceFs::new();
let vmo = zx::Vmo::create(4096).unwrap();
let inspector = inspector_for_reader_test();
let data = inspector.copy_vmo_data().unwrap();
vmo.write(&data, 0).unwrap();
fs.dir("diagnostics").add_vmo_file_at("test.inspect", vmo, 0, 4096);
// Create a connection to the ServiceFs.
let (h0, h1) = zx::Channel::create().unwrap();
fs.serve_connection(h1).unwrap();
let ns = fdio::Namespace::installed().unwrap();
ns.bind(path.join("out").to_str().unwrap(), h0).unwrap();
fasync::Task::spawn(fs.collect()).detach();
let (done0, done1) = zx::Channel::create().unwrap();
let thread_path = path.join("out");
// Run the actual test in a separate thread so that it does not block on FS operations.
// Use signalling on a zx::Channel to indicate that the test is done.
std::thread::spawn(move || {
let path = thread_path;
let done = done1;
let mut executor = fasync::Executor::new().unwrap();
executor.run_singlethreaded(async {
verify_reader(path).await;
done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
});
});
fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
ns.unbind(path.join("out").to_str().unwrap()).unwrap();
}
fn inspector_for_reader_test() -> Inspector {
let inspector = Inspector::new();
let root = inspector.root();
let child_1 = root.create_child("child_1");
child_1.record_int("some-int", 2);
let child_1_1 = child_1.create_child("child_1_1");
child_1_1.record_int("some-int", 3);
child_1_1.record_int("not-wanted-int", 4);
root.record(child_1_1);
root.record(child_1);
let child_2 = root.create_child("child_2");
child_2.record_int("some-int", 2);
root.record(child_2);
inspector
}
async fn verify_reader(path: PathBuf) {
let diagnostics_repo = DataRepo::default();
let pipeline_wrapper =
Arc::new(RwLock::new(Pipeline::for_test(None, diagnostics_repo.clone())));
let out_dir_proxy = InspectDataCollector::find_directory_proxy(&path).await.unwrap();
// The absolute moniker here is made up since the selector is a glob
// selector, so any path would match.
let component_id = ComponentIdentifier::Legacy {
instance_id: "1234".into(),
moniker: vec!["test_component.cmx"].into(),
};
let identity = ComponentIdentity::from_identifier_and_url(&component_id, TEST_URL);
diagnostics_repo
.write()
.add_new_component(identity.clone(), zx::Time::from_nanos(0), None)
.unwrap();
diagnostics_repo
.write()
.add_inspect_artifacts(identity.clone(), out_dir_proxy, zx::Time::from_nanos(0))
.unwrap();
pipeline_wrapper.write().add_inspect_artifacts(&identity.relative_moniker).unwrap();
let inspector = Inspector::new();
let root = inspector.root();
let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
let test_accessor_stats =
Arc::new(diagnostics::AccessorStats::new(test_archive_accessor_node));
let test_batch_iterator_stats1 =
Arc::new(diagnostics::ConnectionStats::for_lifecycle(test_accessor_stats.clone()));
{
let reader_server = LifecycleServer::new(pipeline_wrapper.clone());
let result_json = read_snapshot(reader_server, test_batch_iterator_stats1).await;
let result_array = result_json.as_array().expect("unit test json should be array.");
assert_eq!(result_array.len(), 2, "Expect only two schemas to be returned.");
}
diagnostics_repo.write().mark_stopped(&identity.unique_key);
pipeline_wrapper.write().remove(&identity.relative_moniker);
let test_batch_iterator_stats2 =
Arc::new(diagnostics::ConnectionStats::for_lifecycle(test_accessor_stats.clone()));
{
let reader_server = LifecycleServer::new(pipeline_wrapper.clone());
let result_json = read_snapshot(reader_server, test_batch_iterator_stats2).await;
let result_array = result_json.as_array().expect("unit test json should be array.");
assert_eq!(result_array.len(), 0, "Expect no schema to be returned.");
}
}
async fn read_snapshot(
reader_server: LifecycleServer,
stats: Arc<ConnectionStats>,
) -> serde_json::Value {
let (consumer, batch_iterator_requests) =
create_proxy_and_stream::<BatchIteratorMarker>().unwrap();
let _server = Task::spawn(async move {
BatchIterator::new(
reader_server,
batch_iterator_requests,
StreamMode::Snapshot,
stats,
None,
)
.unwrap()
.run()
.await
.unwrap()
});
let mut result_vec: Vec<String> = Vec::new();
loop {
let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
consumer.get_next().await.unwrap().unwrap();
if next_batch.is_empty() {
break;
}
for formatted_content in next_batch {
match formatted_content {
fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
let mut buf = vec![0; data.size as usize];
data.vmo.read(&mut buf, 0).expect("reading vmo");
let hierarchy_string = std::str::from_utf8(&buf).unwrap();
result_vec.push(hierarchy_string.to_string());
}
_ => panic!("test only produces json formatted data"),
}
}
}
let result_string = format!("[{}]", result_vec.join(","));
serde_json::from_str(&result_string)
.expect(&format!("unit tests shouldn't be creating malformed json: {}", result_string))
}
}