blob: e133ca86ebd8817688107c94a34675839cfaaa4c [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::error::ComponentError,
crate::eval::EvaluationContext,
crate::spec::{Accessor, ProgramSpec},
diagnostics_reader::{ArchiveReader, Inspect, RetryConfig},
fidl::endpoints::ServerEnd,
fidl_fuchsia_component_runner as fcrunner, fidl_fuchsia_diagnostics as fdiagnostics,
fidl_fuchsia_io as fio, fidl_fuchsia_test as ftest, fuchsia_async as fasync,
fuchsia_component::client,
fuchsia_component::server::ServiceFs,
fuchsia_zircon::{Duration, Status, Time},
futures::{
channel::oneshot,
future::{abortable, select, Either},
pin_mut, select, stream, FutureExt, StreamExt, TryStreamExt,
},
std::sync::Arc,
tracing::warn,
};
const NANOS_IN_SECONDS: f64 = 1_000_000_000.0;
const DEFAULT_PARALLEL: u16 = 1;
/// Output a log for the test. Automatically prepends the current monotonic time.
macro_rules! test_stdout {
($logger:ident, $format:literal) => {
let formatted_with_time = format!(
"[{:05.3}] {}\n",
(Time::get_monotonic().into_nanos() as f64 / NANOS_IN_SECONDS),
$format
);
$logger.write(formatted_with_time.as_bytes()).ok()
};
($logger:ident, $format:literal, $($content:expr),*) => {
let formatted = format!($format, $($content, )*);
let formatted_with_time = format!(
"[{:05.3}] {}\n",
(Time::get_monotonic().into_nanos() as f64 / NANOS_IN_SECONDS),
formatted
);
$logger.write(formatted_with_time.as_bytes()).ok()
};
}
/// Implements `fuchsia.test.Suite` and runs provided test.
pub struct TestServer {
spec: ProgramSpec,
controller: ServerEnd<fcrunner::ComponentControllerMarker>,
out_channel: ServerEnd<fio::DirectoryMarker>,
}
impl TestServer {
/// Creates new test server.
pub fn new(
start_info: fcrunner::ComponentStartInfo,
controller: ServerEnd<fcrunner::ComponentControllerMarker>,
) -> Result<Self, ComponentError> {
match ProgramSpec::try_from(
start_info.program.ok_or(ComponentError::MissingRequiredKey("program"))?,
) {
Ok(spec) => Ok(Self {
spec,
controller,
out_channel: start_info
.outgoing_dir
.ok_or(ComponentError::MissingOutgoingChannel)?,
}),
Err(e) => {
warn!("Error loading spec: {}", e);
controller.close_with_epitaph(Status::INVALID_ARGS).unwrap_or_default();
Err(e)
}
}
}
/// Run the individual named test case from the given ProgramSpec.
///
/// Output logs are written to the given socket.
///
/// Returns true on pass and false on failure.
async fn run_case(spec: &ProgramSpec, case: &str, logs: fuchsia_zircon::Socket) -> bool {
let case = match spec.cases.get(case) {
Some(case) => case,
None => {
test_stdout!(logs, "Failed to find test case");
return false;
}
};
let svc = match spec.accessor {
Accessor::All => "/svc/fuchsia.diagnostics.RealArchiveAccessor",
Accessor::Feedback => "/svc/fuchsia.diagnostics.RealFeedbackArchiveAccessor",
Accessor::Legacy => "/svc/fuchsia.diagnostics.RealLegacyMetricsArchiveAccessor",
};
test_stdout!(logs, "Reading `{}` from `{}`", case.key, svc);
let context = match EvaluationContext::try_from(case) {
Ok(c) => c,
Err(e) => {
test_stdout!(logs, "Failed to set up evaluation: {:?}\n", e);
return false;
}
};
let end_time = Time::get_monotonic() + Duration::from_seconds(spec.timeout_seconds);
while end_time > Time::get_monotonic() {
let start_time = Time::get_monotonic();
let proxy = match client::connect_to_protocol_at_path::<
fdiagnostics::ArchiveAccessorMarker,
>(&svc)
{
Ok(p) => p,
Err(e) => {
test_stdout!(logs, "Failed to connect to accessor: {:?}", e);
return false;
}
};
test_stdout!(logs, "Attempting read");
match ArchiveReader::new()
.retry(RetryConfig::never())
.with_archive(proxy)
.with_timeout(end_time - start_time)
.add_selector(case.selector.as_str())
.snapshot_raw::<Inspect, serde_json::Value>()
.await
{
Ok(json) => {
match context.run(&serde_json::to_string_pretty(&json).unwrap_or_default()) {
Ok(_) => {
test_stdout!(logs, "Test case passed");
return true;
}
Err(e) => {
test_stdout!(logs, "Test case attempt failed: {}", e);
}
}
}
Err(e) => {
test_stdout!(logs, "Failed to obtain data: {}", e);
}
}
let sleep_time = Duration::from_seconds(1);
if end_time - Time::get_monotonic() >= Duration::from_seconds(0) {
test_stdout!(
logs,
"Retrying after {}s, timeout after {}s",
sleep_time.into_seconds(),
(end_time - Time::get_monotonic()).into_seconds()
);
fasync::Timer::new(Time::after(sleep_time)).await;
}
}
false
}
pub async fn execute(self) {
let spec = Arc::new(self.spec);
let controller = self.controller;
let mut fs = ServiceFs::new_local();
let (done_sender, done_fut) = oneshot::channel::<()>();
let done_fut = done_fut.shared();
fs.dir("svc").add_fidl_service(move |mut stream: ftest::SuiteRequestStream| {
let spec = spec.clone();
let mut done_fut = done_fut.clone();
fasync::Task::spawn(async move {
// Listen either for the next value form the stream, or the done signal.
while let Ok(Some(req)) = select! {
next = stream.try_next() => next,
_ = done_fut => Ok(None) }
{
match req {
ftest::SuiteRequest::GetTests { iterator, .. } => {
let mut names = spec.test_names().into_iter().map(|n| ftest::Case {
name: Some(n),
enabled: Some(true),
..Default::default()
});
let mut done_fut = done_fut.clone();
fasync::Task::spawn(async move {
if let Ok(mut stream) = iterator.into_stream() {
while let Ok(Some(req)) = select! {
next = stream.try_next() => next,
_ = done_fut => Ok(None)}
{
match req {
ftest::CaseIteratorRequest::GetNext {
responder,
..
} => {
// Continually drain the |names| iterator on each
// call.
responder
.send(&names.by_ref().collect::<Vec<_>>())
.unwrap_or_default();
}
}
}
}
})
.detach();
}
ftest::SuiteRequest::Run { tests, options, listener, .. } => {
let proxy = listener
.into_proxy()
.expect("Can't convert listener channel to proxy");
let mut tasks = vec![];
let parallel = options.parallel.unwrap_or(DEFAULT_PARALLEL);
for test in tests.into_iter() {
let spec = spec.clone();
let proxy = proxy.clone();
tasks.push(async move {
let (stdout_end, stdout) =
fuchsia_zircon::Socket::create_stream();
let name = test.name.clone().unwrap_or_default();
let (case_listener_proxy, case_listener) =
fidl::endpoints::create_proxy::<ftest::CaseListenerMarker>(
)
.expect("cannot create proxy");
proxy
.on_test_case_started(
&test,
ftest::StdHandles {
out: Some(stdout_end),
..Default::default()
},
case_listener,
)
.expect("on_test_case_started failed");
let status =
match TestServer::run_case(&spec, &name, stdout).await {
true => ftest::Status::Passed,
false => ftest::Status::Failed,
};
let result = ftest::Result_ {
status: Some(status),
..Default::default()
};
case_listener_proxy
.finished(&result)
.expect("on_test_case_finished failed");
});
}
let done_fut = done_fut.clone();
fasync::Task::spawn(async move {
let proxy = proxy.clone();
let chunked_tasks_fut = stream::iter(tasks.into_iter())
.buffered(parallel.into())
.collect::<()>();
// If all tasks finished before abort, report OnFinished to the
// listener.
match select(done_fut, chunked_tasks_fut).await {
Either::Right(_) => {
proxy.on_finished().ok();
}
_ => {}
}
})
.detach();
}
}
}
})
.detach();
});
if let Err(e) = fs.serve_connection(self.out_channel) {
warn!("Failed to serve connection for child component {:?}", e);
}
let (fut, abort_handle) = abortable(fs.collect::<()>());
let controller_fut = async move {
if let Ok(mut stream) = controller.into_stream() {
let mut done_sender = Some(done_sender);
while let Ok(Some(request)) = stream.try_next().await {
match request {
fcrunner::ComponentControllerRequest::Stop { .. }
| fcrunner::ComponentControllerRequest::Kill { .. } => {
if let Some(done_sender) = done_sender.take() {
done_sender.send(()).ok();
}
abort_handle.abort();
}
}
}
}
};
pin_mut!(fut);
pin_mut!(controller_fut);
select(fut, controller_fut).await;
}
}