blob: 6279456dbe3b17d3d4a0467643c15daeef435700 [file] [log] [blame]
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::events::router::EventConsumer;
use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
use crate::identity::ComponentIdentity;
use crate::inspect::container::InspectHandle;
use crate::inspect::repository::InspectRepository;
use anyhow::Error;
use fidl::endpoints::ControlHandle;
use fuchsia_sync::{Mutex, RwLock};
use futures::channel::mpsc;
use futures::StreamExt;
use std::sync::Arc;
use tracing::{debug, warn};
use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
pub struct InspectSinkServer {
/// Shared repository holding the Inspect handles.
repo: Arc<InspectRepository>,
/// Sender end of drain_listeners_task.
task_sender: RwLock<mpsc::UnboundedSender<fasync::Task<()>>>,
/// Task that makes sure every handler closes down before closing down InspectSinkSErver.
drain_listeners_task: Mutex<Option<fasync::Task<()>>>,
}
impl InspectSinkServer {
/// Construct a server.
pub fn new(repo: Arc<InspectRepository>) -> Self {
let (sender, receiver) = mpsc::unbounded();
Self {
repo,
task_sender: RwLock::new(sender),
drain_listeners_task: Mutex::new(Some(fasync::Task::spawn(async move {
receiver
.for_each_concurrent(None, |rx| async move {
rx.await;
})
.await;
}))),
}
}
/// Handle incoming events. Mainly for use in EventConsumer impl.
fn spawn(&self, component: Arc<ComponentIdentity>, stream: finspect::InspectSinkRequestStream) {
let repo = Arc::clone(&self.repo);
if let Err(e) = self.task_sender.read().unbounded_send(fasync::Task::spawn(async move {
if let Err(e) = Self::handle_requests(repo, component, stream).await {
warn!("error handling InspectSink requests: {e}");
}
})) {
warn!("couldn't queue listener task: {e:?}");
}
}
async fn handle_requests(
repo: Arc<InspectRepository>,
component: Arc<ComponentIdentity>,
mut stream: finspect::InspectSinkRequestStream,
) -> Result<(), Error> {
while let Some(Ok(request)) = stream.next().await {
match request {
finspect::InspectSinkRequest::Publish {
payload: finspect::InspectSinkPublishRequest { tree: Some(tree), name, .. },
..
} => repo.add_inspect_handle(
Arc::clone(&component),
InspectHandle::from_named_tree_proxy(tree.into_proxy()?, name),
),
finspect::InspectSinkRequest::Publish {
payload: finspect::InspectSinkPublishRequest { tree: None, name, .. },
..
} => {
debug!(name, %component, "InspectSink/Publish without a tree");
}
finspect::InspectSinkRequest::_UnknownMethod {
ordinal,
control_handle,
method_type,
..
} => {
warn!(ordinal, ?method_type, "Received unknown request for InspectSink");
// Close the connection if we receive an unknown interaction.
control_handle.shutdown();
}
}
}
Ok(())
}
/// Instructs the server to finish handling all connections.
pub async fn wait_for_servers_to_complete(&self) {
let task = self
.drain_listeners_task
.lock()
.take()
.expect("the accessor server task is only awaited once");
task.await;
}
/// Instructs the server to stop accepting new connections.
pub fn stop(&self) {
self.task_sender.write().disconnect();
}
}
impl EventConsumer for InspectSinkServer {
fn handle(self: Arc<Self>, event: Event) {
match event.payload {
EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
component,
request_stream,
}) => {
self.spawn(component, request_stream);
}
_ => unreachable!("InspectSinkServer is only subscribed to InspectSinkRequested"),
}
}
}
#[cfg(test)]
mod tests {
use crate::events::router::EventConsumer;
use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
use crate::identity::ComponentIdentity;
use crate::inspect::container::InspectHandle;
use crate::inspect::repository::InspectRepository;
use crate::inspect::servers::InspectSinkServer;
use assert_matches::assert_matches;
use diagnostics_assertions::assert_json_diff;
use fidl::endpoints::{create_proxy_and_stream, create_request_stream, ClientEnd};
use fidl_fuchsia_inspect::{
InspectSinkMarker, InspectSinkProxy, InspectSinkPublishRequest, TreeMarker,
};
use fuchsia_async::Task;
use fuchsia_inspect::reader::read;
use fuchsia_inspect::Inspector;
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::Future;
use inspect_runtime::service::spawn_tree_server_with_stream;
use inspect_runtime::TreeServerSendPreference;
use selectors::VerboseError;
use std::sync::Arc;
struct TestHarness {
/// Associates a faux component via ComponentIdentity with an InspectSinkProxy
proxy_pairs: Vec<(Arc<ComponentIdentity>, Option<InspectSinkProxy>)>,
/// The underlying repository.
repo: Arc<InspectRepository>,
/// The server that would be held by the Archivist.
server: Arc<InspectSinkServer>,
/// The koids of the published TreeProxies in the order they were published.
koids: Vec<zx::Koid>,
/// The servers for each component's Tree protocol
tree_pairs: Vec<(Arc<ComponentIdentity>, Option<Task<()>>)>,
}
impl TestHarness {
/// Construct an InspectSinkServer with a ComponentIdentity/InspectSinkProxy pair
/// for each input ComponentIdentity.
fn new(identity: Vec<Arc<ComponentIdentity>>) -> Self {
let mut proxy_pairs = vec![];
let repo = Arc::new(InspectRepository::default());
let server = Arc::new(InspectSinkServer::new(Arc::clone(&repo)));
for id in identity.into_iter() {
let (proxy, request_stream) =
create_proxy_and_stream::<InspectSinkMarker>().unwrap();
Arc::clone(&server).handle(Event {
timestamp: zx::Time::get_monotonic(),
payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
component: Arc::clone(&id),
request_stream,
}),
});
proxy_pairs.push((id, Some(proxy)));
}
Self { proxy_pairs, repo, server, koids: vec![], tree_pairs: vec![] }
}
/// Publish `tree` via the proxy associated with `component`.
fn publish(&mut self, component: &Arc<ComponentIdentity>, tree: ClientEnd<TreeMarker>) {
for (id, proxy) in &self.proxy_pairs {
if id != component {
continue;
}
if let Some(proxy) = &proxy {
self.koids.push(tree.as_handle_ref().get_koid().unwrap());
proxy
.publish(InspectSinkPublishRequest {
tree: Some(tree),
..Default::default()
})
.unwrap();
return;
} else {
panic!("cannot publish on stopped server/proxy pair");
}
}
}
/// Start a TreeProxy server and return the proxy.
fn serve(
&mut self,
component: Arc<ComponentIdentity>,
inspector: Inspector,
settings: TreeServerSendPreference,
) -> ClientEnd<TreeMarker> {
let (tree, request_stream) = create_request_stream::<TreeMarker>().unwrap();
let server = spawn_tree_server_with_stream(inspector, settings, request_stream);
self.tree_pairs.push((component, Some(server)));
tree
}
/// Drop the server(s) associated with `component`, as initialized by `serve`.
fn drop_tree_servers(&mut self, component: &Arc<ComponentIdentity>) {
for (id, ref mut server) in &mut self.tree_pairs {
if id != component {
continue;
}
if server.is_none() {
continue;
}
server.take();
}
}
/// The published koids, with 0 referring to the first published tree.
fn published_koids(&self) -> &[zx::Koid] {
&self.koids
}
/// Execute closure `assertions` on the `InspectArtifactsContainer` associated with
/// `identity`.
///
/// This function will wait for data to be available in `self.repo`, and therefore
/// might hang indefinitely if the data never appears. This is not a problem since
/// it is a unit test and `fx test` has timeouts available.
async fn assert<const N: usize, F, Fut>(
&self,
identity: &Arc<ComponentIdentity>,
koids: [zx::Koid; N],
assertions: F,
) where
F: FnOnce([InspectHandle; N]) -> Fut,
Fut: Future<Output = ()>,
{
self.repo.wait_for_artifact(identity).await;
let containers = self.repo.fetch_inspect_data(
&Some(vec![selectors::parse_selector::<VerboseError>(&format!("{identity}:root"))
.expect("parse selector")]),
None,
);
assert_eq!(containers.len(), 1);
assertions(
koids
.iter()
.map(|koid| {
containers[0]
.inspect_handles
.iter()
.find(|handle| handle.koid() == *koid)
.unwrap()
})
.cloned()
.collect::<Vec<_>>()
.try_into()
.unwrap(),
)
.await;
}
/// Drops all published proxies, stops the server, and waits for it to complete.
async fn stop_all(&mut self) {
for (_, ref mut proxy) in &mut self.proxy_pairs {
proxy.take();
}
self.server.stop();
self.server.wait_for_servers_to_complete().await;
}
async fn wait_until_gone(&self, component: &Arc<ComponentIdentity>) {
self.repo.wait_until_gone(component).await;
}
}
#[fuchsia::test]
async fn connect() {
let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
let insp = Inspector::default();
insp.root().record_int("int", 0);
let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
test.publish(&identity, tree);
let koid = test.published_koids()[0];
test.assert(&identity, [koid], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
int: 0i64,
});
});
})
.await;
}
#[fuchsia::test]
async fn publish_multiple_times_on_the_same_connection() {
let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
let insp = Inspector::default();
insp.root().record_int("int", 0);
let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
let other_insp = Inspector::default();
other_insp.root().record_double("double", 1.24);
let other_tree =
test.serve(Arc::clone(&identity), other_insp, TreeServerSendPreference::default());
test.publish(&identity, tree);
test.publish(&identity, other_tree);
let koid0 = test.published_koids()[0];
let koid1 = test.published_koids()[1];
test.assert(&identity, [koid0, koid1], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
int: 0i64,
});
});
assert_matches!(
&handles[1],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
double: 1.24,
});
});
})
.await;
}
#[fuchsia::test]
async fn tree_remains_after_inspect_sink_disconnects() {
let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
let insp = Inspector::default();
insp.root().record_int("int", 0);
let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
test.publish(&identity, tree);
let koid = test.published_koids()[0];
test.assert(&identity, [koid], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
int: 0i64,
});
});
})
.await;
test.stop_all().await;
// the data must remain present as long as the tree server started above is alive
test.assert(&identity, [koid], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
int: 0i64,
});
});
})
.await;
}
#[fuchsia::test]
async fn connect_with_multiple_proxies() {
let identities: Vec<Arc<ComponentIdentity>> = vec![
Arc::new(vec!["a", "b", "foo.cm"].into()),
Arc::new(vec!["a", "b", "foo2.cm"].into()),
];
let mut test = TestHarness::new(identities.clone());
let insp = Inspector::default();
insp.root().record_int("int", 0);
let tree =
test.serve(Arc::clone(&identities[0]), insp, TreeServerSendPreference::default());
let insp2 = Inspector::default();
insp2.root().record_bool("is_insp2", true);
let tree2 =
test.serve(Arc::clone(&identities[1]), insp2, TreeServerSendPreference::default());
test.publish(&identities[0], tree);
test.publish(&identities[1], tree2);
let koid_component_0 = test.published_koids()[0];
let koid_component_1 = test.published_koids()[1];
test.assert(&identities[0], [koid_component_0], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
int: 0i64,
});
});
})
.await;
test.assert(&identities[1], [koid_component_1], |handles| async move {
assert_matches!(
&handles[0],
InspectHandle::Tree(tree, _) => {
let hierarchy = read(tree).await.unwrap();
assert_json_diff!(hierarchy, root: {
is_insp2: true,
});
});
})
.await;
}
#[fuchsia::test]
async fn dropping_tree_removes_component_identity_from_repo() {
let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
let tree = test.serve(
Arc::clone(&identity),
Inspector::default(),
TreeServerSendPreference::default(),
);
test.publish(&identity, tree);
test.stop_all().await;
// this executing to completion means the identity was present
test.assert(&identity, [test.published_koids()[0]], |handles: [_; 1]| {
assert_eq!(handles.len(), 1);
async {}
})
.await;
test.drop_tree_servers(&identity);
// this executing to completion means the identity is not there anymore; we know
// it previously was present
test.wait_until_gone(&identity).await;
}
}