blob: 72a618bb0c9efd3785c81a734de24547d7463ec8 [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
constants::*,
test_topology::{self, expose_test_realm_protocol},
utils,
};
use component_events::{events::*, matcher::*};
use diagnostics_reader::{assert_data_tree, ArchiveReader, Data, Logs};
use fasync::Task;
use fidl::endpoints::ServerEnd;
use fidl_fuchsia_diagnostics::ArchiveAccessorMarker;
use fidl_fuchsia_io::DirectoryMarker;
use fidl_fuchsia_sys2::{ChildRef, EventSourceMarker, RealmMarker};
use fidl_fuchsia_sys_internal::{LogConnectorRequest, LogConnectorRequestStream};
use fuchsia_async as fasync;
use fuchsia_component::{client, server::ServiceFs};
use fuchsia_component_test::{
builder::{Capability, CapabilityRoute, ComponentSource, RouteEndpoint},
mock::{Mock, MockHandles},
};
use fuchsia_zircon as zx;
use futures::{channel::mpsc, lock::Mutex, SinkExt, StreamExt};
use std::sync::Arc;
async fn serve_mocks(
mock_handles: MockHandles,
before_response_recv: Arc<Mutex<mpsc::UnboundedReceiver<()>>>,
after_response_snd: mpsc::UnboundedSender<()>,
) -> Result<(), anyhow::Error> {
let mut fs = ServiceFs::new();
fs.dir("svc").add_fidl_service(move |mut stream: LogConnectorRequestStream| {
let recv_clone = before_response_recv.clone();
let mut send_clone = after_response_snd.clone();
Task::spawn(async move {
while let Some(Ok(LogConnectorRequest::TakeLogConnectionListener { responder })) =
stream.next().await
{
let (_client, server) = zx::Channel::create().unwrap();
recv_clone.lock().await.next().await;
responder.send(Some(ServerEnd::new(server))).unwrap();
send_clone.send(()).await.unwrap();
}
})
.detach()
});
fs.serve_connection(mock_handles.outgoing_dir.into_channel()).unwrap();
fs.collect::<()>().await;
Ok(())
}
#[fuchsia::test]
async fn test_logs_with_hanging_log_connector() {
let (mut before_response_snd, before_response_recv) = mpsc::unbounded();
let (after_response_snd, mut after_response_recv) = mpsc::unbounded();
let recv = Arc::new(Mutex::new(before_response_recv));
let mut builder = test_topology::create(test_topology::Options {
archivist_url:
"fuchsia-pkg://fuchsia.com/archivist-integration-tests-v2#meta/archivist_with_log_connector.cm",
})
.await
.expect("create base topology");
builder
.add_component(
"mocks-server",
ComponentSource::Mock(Mock::new(move |mock_handles| {
Box::pin(serve_mocks(mock_handles, recv.clone(), after_response_snd.clone()))
})),
)
.await
.unwrap()
.add_route(CapabilityRoute {
capability: Capability::protocol("fuchsia.sys.internal.LogConnector"),
source: RouteEndpoint::component("mocks-server"),
targets: vec![RouteEndpoint::component("test/archivist")],
})
.unwrap();
test_topology::add_component(&mut builder, "log_and_exit", LOG_AND_EXIT_COMPONENT_URL)
.await
.expect("add log_and_exit");
let mut realm = builder.build();
expose_test_realm_protocol(&mut realm).await;
let instance = realm.create().await.expect("create instance");
let accessor =
instance.root.connect_to_protocol_at_exposed_dir::<ArchiveAccessorMarker>().unwrap();
let mut reader = ArchiveReader::new();
reader
.with_archive(accessor)
.with_minimum_schema_count(0) // we want this to return even when no log messages
.retry_if_empty(false);
let (mut subscription, mut errors) =
reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let _log_errors = fasync::Task::spawn(async move {
if let Some(error) = errors.next().await {
panic!("{:#?}", error);
}
});
let moniker = format!(
"fuchsia_component_test_collection:{}/test/log_and_exit",
instance.root.child_name()
);
let mut child_ref = ChildRef { name: "log_and_exit".to_string(), collection: None };
reader.retry_if_empty(true);
let (_client_end, server_end) = fidl::endpoints::create_endpoints::<DirectoryMarker>().unwrap();
let realm = instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap();
realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap();
check_message(&moniker, subscription.next().await.unwrap());
// Trigger a response to the TakeLogConnectionListener request that is hanging for
// the purposes of the test and ensure the archivist received it so we don't see a PEER_CLOSED
// in the fuchshia_component_test intermediary component.
before_response_snd.send(()).await.unwrap();
after_response_recv.next().await.unwrap();
}
#[fuchsia::test]
async fn test_logs_lifecycle() {
let mut builder = test_topology::create(test_topology::Options::default())
.await
.expect("create base topology");
test_topology::add_component(&mut builder, "log_and_exit", LOG_AND_EXIT_COMPONENT_URL)
.await
.expect("add log_and_exit");
// Currently RealmBuilder doesn't support to expose a capability from framework, therefore we
// manually update the decl that the builder creates.
let mut realm = builder.build();
test_topology::expose_test_realm_protocol(&mut realm).await;
let instance = realm.create().await.expect("create instance");
let accessor =
instance.root.connect_to_protocol_at_exposed_dir::<ArchiveAccessorMarker>().unwrap();
let mut reader = ArchiveReader::new();
reader
.with_archive(accessor)
.with_minimum_schema_count(0) // we want this to return even when no log messages
.retry_if_empty(false);
let (mut subscription, mut errors) =
reader.snapshot_then_subscribe::<Logs>().unwrap().split_streams();
let _log_errors = fasync::Task::spawn(async move {
if let Some(error) = errors.next().await {
panic!("{:#?}", error);
}
});
let moniker = format!(
"fuchsia_component_test_collection:{}/test/log_and_exit",
instance.root.child_name()
);
let event_source =
EventSource::from_proxy(client::connect_to_protocol::<EventSourceMarker>().unwrap());
let mut event_stream = event_source
.subscribe(vec![EventSubscription::new(vec![Stopped::NAME], EventMode::Async)])
.await
.unwrap();
let mut child_ref = ChildRef { name: "log_and_exit".to_string(), collection: None };
reader.retry_if_empty(true);
for i in 1..100 {
// launch our child and wait for it to exit before asserting on its logs
let (_client_end, server_end) =
fidl::endpoints::create_endpoints::<DirectoryMarker>().unwrap();
let realm = instance.root.connect_to_protocol_at_exposed_dir::<RealmMarker>().unwrap();
realm.bind_child(&mut child_ref, server_end).await.unwrap().unwrap();
utils::wait_for_component_stopped_event(
&instance.root.child_name(),
"log_and_exit",
ExitStatusMatcher::Clean,
&mut event_stream,
)
.await;
check_message(&moniker, subscription.next().await.unwrap());
reader.with_minimum_schema_count(i);
let all_messages = reader.snapshot::<Logs>().await.unwrap();
for message in all_messages {
check_message(&moniker, message);
}
}
}
fn check_message(expected_moniker: &str, message: Data<Logs>) {
assert_eq!(message.moniker, expected_moniker,);
assert_eq!(message.metadata.component_url, Some(LOG_AND_EXIT_COMPONENT_URL.to_string()));
assert_data_tree!(message.payload.unwrap(), root: {
message: {
value: "Hello, world!",
}
});
}