blob: e58b1a181632d116cc91fea7c2f873879393dafe [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
above_root_capabilities::AboveRootCapabilitiesForTest, debug_data_server, error::*, facet,
run_events::RunEvent, running_suite, self_diagnostics,
},
anyhow::{Context, Error},
fidl::endpoints::ClientEnd,
fidl::prelude::*,
fidl_fuchsia_component_resolution::ResolverProxy,
fidl_fuchsia_test_internal as ftest_internal, fidl_fuchsia_test_manager as ftest_manager,
ftest_manager::{
LaunchError, RunControllerRequest, RunControllerRequestStream, SuiteControllerRequest,
SuiteControllerRequestStream, SuiteEvent as FidlSuiteEvent,
},
fuchsia_async::{self as fasync},
fuchsia_zircon as zx,
futures::{
channel::{mpsc, oneshot},
future::join_all,
prelude::*,
StreamExt,
},
std::sync::Arc,
tracing::{error, warn},
};
pub(crate) struct Suite {
pub test_url: String,
pub options: ftest_manager::RunOptions,
pub controller: SuiteControllerRequestStream,
pub resolver: Arc<ResolverProxy>,
pub above_root_capabilities_for_test: Arc<AboveRootCapabilitiesForTest>,
}
pub(crate) struct TestRunBuilder {
pub suites: Vec<Suite>,
}
impl TestRunBuilder {
/// Serve a RunControllerRequestStream. Returns Err if the client stops the test
/// prematurely or there is an error serving he stream.
async fn run_controller(
controller: RunControllerRequestStream,
run_task: futures::future::RemoteHandle<()>,
stop_sender: oneshot::Sender<()>,
event_recv: mpsc::Receiver<RunEvent>,
inspect_node: &self_diagnostics::RunInspectNode,
) -> Result<(), ()> {
let mut task = Some(run_task);
let mut stop_sender = Some(stop_sender);
let mut event_recv = event_recv.fuse();
let (serve_inner, terminated) = controller.into_inner();
let serve_inner_clone = serve_inner.clone();
let channel_closed_fut =
fasync::OnSignals::new(serve_inner_clone.channel(), zx::Signals::CHANNEL_PEER_CLOSED)
.shared();
let mut controller = RunControllerRequestStream::from_inner(serve_inner, terminated);
let mut stop_or_kill_called = false;
let mut events_drained = false;
let mut events_sent_successfully = true;
// no need to check controller error.
let serve_controller_fut = async {
loop {
inspect_node
.set_controller_state(self_diagnostics::RunControllerState::AwaitingRequest);
let request = match controller.try_next().await {
Ok(Some(request)) => request,
_ => break,
};
match request {
RunControllerRequest::Stop { .. } => {
stop_or_kill_called = true;
if let Some(stop_sender) = stop_sender.take() {
// no need to check error.
let _ = stop_sender.send(());
// after this all `senders` go away and subsequent GetEvent call will
// return rest of events and eventually a empty array and will close the
// connection after that.
}
}
RunControllerRequest::Kill { .. } => {
stop_or_kill_called = true;
// dropping the remote handle cancels it.
drop(task.take());
// after this all `senders` go away and subsequent GetEvent call will
// return rest of events and eventually a empty array and will close the
// connection after that.
}
RunControllerRequest::GetEvents { responder } => {
let mut events = vec![];
// TODO(fxbug.dev/91553): This can block handling Stop and Kill requests if no
// events are available.
inspect_node.set_controller_state(
self_diagnostics::RunControllerState::AwaitingEvents,
);
if let Some(event) = event_recv.next().await {
events.push(event);
while events.len() < EVENTS_THRESHOLD {
if let Some(Some(event)) = event_recv.next().now_or_never() {
events.push(event);
} else {
break;
}
}
}
let no_events_left = events.is_empty();
let response_err =
responder.send(&mut events.into_iter().map(RunEvent::into)).is_err();
// Order setting these variables matters. Expected is for the client to receive at
// least one event response. Client might send more, which is okay but we suppress
// response errors after the first empty vec.
if !events_drained && response_err {
events_sent_successfully = false;
}
events_drained = no_events_left;
}
}
}
}
.boxed();
let _ = futures::future::select(channel_closed_fut.clone(), serve_controller_fut).await;
inspect_node.set_controller_state(self_diagnostics::RunControllerState::Done {
stopped_or_killed: stop_or_kill_called,
events_drained,
events_sent_successfully,
});
// Workaround to prevent zx_peer_closed error
// TODO(fxbug.dev/87976) once fxbug.dev/87890 is fixed, the controller should be dropped
// as soon as all events are drained.
if let Err(e) = channel_closed_fut.await {
warn!("Error waiting for the RunController channel to close: {:?}", e);
}
match stop_or_kill_called || !events_drained || !events_sent_successfully {
true => Err(()),
false => Ok(()),
}
}
pub(crate) async fn run(
self,
controller: RunControllerRequestStream,
debug_controller: ftest_internal::DebugDataSetControllerProxy,
debug_iterator: ClientEnd<ftest_manager::DebugDataIteratorMarker>,
inspect_node: self_diagnostics::RunInspectNode,
) {
let (stop_sender, mut stop_recv) = oneshot::channel::<()>();
let (event_sender, event_recv) = mpsc::channel::<RunEvent>(16);
let debug_event_fut = debug_data_server::send_debug_data_if_produced(
event_sender.clone(),
debug_controller.take_event_stream(),
debug_iterator,
&inspect_node,
);
let inspect_node_ref = &inspect_node;
// Generate a random number in an attempt to prevent realm name collisions between runs.
let run_id: u32 = rand::random();
let run_suites_fut = async move {
inspect_node_ref.set_execution_state(self_diagnostics::RunExecutionState::Executing);
// run test suites serially for now
for (suite_idx, suite) in self.suites.into_iter().enumerate() {
// only check before running the test. We should complete the test run for
// running tests, if stop is called.
if let Ok(Some(())) = stop_recv.try_recv() {
break;
}
let instance_name = format!("{:?}-{:?}", run_id, suite_idx);
let suite_inspect = inspect_node_ref.new_suite(&instance_name, &suite.test_url);
run_single_suite(suite, &debug_controller, &instance_name, suite_inspect).await;
}
debug_controller
.finish()
.unwrap_or_else(|e| warn!("Error finishing debug data set: {:?}", e));
// Collect run artifacts
let mut kernel_debug_tasks = vec![];
kernel_debug_tasks.push(debug_data_server::send_kernel_debug_data(event_sender));
join_all(kernel_debug_tasks).await;
inspect_node_ref.set_execution_state(self_diagnostics::RunExecutionState::Complete);
};
let (remote, remote_handle) = futures::future::join(debug_event_fut, run_suites_fut)
.map(|((), ())| ())
.remote_handle();
let ((), controller_res) = futures::future::join(
remote,
Self::run_controller(
controller,
remote_handle,
stop_sender,
event_recv,
inspect_node_ref,
),
)
.await;
if let Err(()) = controller_res {
warn!("Controller terminated early. Last known state: {:#?}", &inspect_node);
inspect_node.persist();
}
}
}
// max events to send so that we don't cross fidl limits.
// TODO(fxbug.dev/100462): Use tape measure to calculate limit.
const EVENTS_THRESHOLD: usize = 50;
impl Suite {
async fn run_controller(
controller: &mut SuiteControllerRequestStream,
stop_sender: oneshot::Sender<()>,
run_suite_remote_handle: futures::future::RemoteHandle<()>,
event_recv: mpsc::Receiver<Result<FidlSuiteEvent, LaunchError>>,
) -> Result<(), Error> {
let mut event_recv = event_recv.into_stream().fuse();
let mut stop_sender = Some(stop_sender);
let mut run_suite_remote_handle = Some(run_suite_remote_handle);
'controller_loop: while let Some(event) =
controller.try_next().await.context("error running controller")?
{
match event {
SuiteControllerRequest::Stop { .. } => {
// no need to handle error as task might already have finished.
if let Some(stop) = stop_sender.take() {
let _ = stop.send(());
// after this all `senders` go away and subsequent GetEvent call will
// return rest of event. Eventually an empty array and will close the
// connection after that.
}
}
SuiteControllerRequest::GetEvents { responder } => {
let mut events = vec![];
// wait for first event
let mut e = event_recv.next().await;
while let Some(event) = e {
match event {
Ok(event) => {
events.push(event);
}
Err(err) => {
responder
.send(&mut Err(err))
.map_err(TestManagerError::Response)?;
break 'controller_loop;
}
}
if events.len() >= EVENTS_THRESHOLD {
responder.send(&mut Ok(events)).map_err(TestManagerError::Response)?;
continue 'controller_loop;
}
e = match event_recv.next().now_or_never() {
Some(e) => e,
None => break,
}
}
let len = events.len();
responder.send(&mut Ok(events)).map_err(TestManagerError::Response)?;
if len == 0 {
break;
}
}
SuiteControllerRequest::Kill { .. } => {
// Dropping the remote handle for the suite execution task cancels it.
drop(run_suite_remote_handle.take());
// after this all `senders` go away and subsequent GetEvent call will
// return rest of event. Eventually an empty array and will close the
// connection after that.
}
}
}
Ok(())
}
}
async fn run_single_suite(
suite: Suite,
debug_controller: &ftest_internal::DebugDataSetControllerProxy,
instance_name: &str,
inspect_node: Arc<self_diagnostics::SuiteInspectNode>,
) {
let (mut sender, recv) = mpsc::channel(1024);
let (stop_sender, stop_recv) = oneshot::channel::<()>();
let mut maybe_instance = None;
let mut realm_moniker = None;
let Suite { test_url, options, mut controller, resolver, above_root_capabilities_for_test } =
suite;
let run_test_fut = async {
inspect_node.set_execution_state(self_diagnostics::ExecutionState::GetFacets);
let facets = match facet::get_suite_facets(&test_url, &resolver).await {
Ok(facets) => facets,
Err(e) => {
sender.send(Err(e.into())).await.unwrap();
return;
}
};
let realm_moniker_ref =
realm_moniker.insert(format!("./{}:{}", facets.collection, instance_name));
if let Err(e) = debug_controller.add_realm(realm_moniker_ref.as_str(), &test_url).await {
warn!("Failed to add realm {} to debug data: {:?}", realm_moniker_ref.as_str(), e);
}
inspect_node.set_execution_state(self_diagnostics::ExecutionState::Launch);
match running_suite::RunningSuite::launch(
&test_url,
facets,
Some(instance_name),
resolver,
above_root_capabilities_for_test,
)
.await
{
Ok(instance) => {
inspect_node.set_execution_state(self_diagnostics::ExecutionState::RunTests);
let instance_ref = maybe_instance.insert(instance);
instance_ref.run_tests(&test_url, options, sender, stop_recv).await;
inspect_node.set_execution_state(self_diagnostics::ExecutionState::TestsDone);
}
Err(e) => {
let _ = debug_controller.remove_realm(realm_moniker_ref.as_str());
sender.send(Err(e.into())).await.unwrap();
}
}
};
let (run_test_remote, run_test_handle) = run_test_fut.remote_handle();
let controller_fut = Suite::run_controller(&mut controller, stop_sender, run_test_handle, recv);
let ((), controller_ret) = futures::future::join(run_test_remote, controller_fut).await;
if let Err(e) = controller_ret {
warn!("Ended test {}: {:?}", test_url, e);
}
if let Some(instance) = maybe_instance.take() {
inspect_node.set_execution_state(self_diagnostics::ExecutionState::TearDown);
if let Err(err) = instance.destroy().await {
// Failure to destroy an instance could mean that some component events fail to send.
// Missing events could cause debug data collection to hang as it relies on events to
// understand when a realm has stopped.
error!(?err, "Failed to destroy instance for {}. Debug data may be lost.", test_url);
if let Some(moniker) = realm_moniker {
let _ = debug_controller.remove_realm(&moniker);
}
}
}
inspect_node.set_execution_state(self_diagnostics::ExecutionState::Complete);
// Workaround to prevent zx_peer_closed error
// TODO(fxbug.dev/87976) once fxbug.dev/87890 is fixed, the controller should be dropped as soon as all
// events are drained.
let (inner, _) = controller.into_inner();
if let Err(e) = fasync::OnSignals::new(inner.channel(), zx::Signals::CHANNEL_PEER_CLOSED).await
{
warn!("Error waiting for SuiteController channel to close: {:?}", e);
}
}
#[cfg(test)]
mod tests {
use {
super::*, crate::run_events::SuiteEvents, fidl::endpoints::create_proxy_and_stream,
self_diagnostics::RootInspectNode,
};
fn new_run_inspect_node() -> self_diagnostics::RunInspectNode {
RootInspectNode::new(&fuchsia_inspect::types::Node::default()).new_run("test-run")
}
#[fuchsia_async::run_singlethreaded(test)]
async fn run_controller_stop_test() {
let (sender, recv) = mpsc::channel(1024);
let (stop_sender, stop_recv) = oneshot::channel::<()>();
let (task, remote_handle) = async move {
stop_recv.await.unwrap();
// drop event sender so that fake test can end.
drop(sender);
}
.remote_handle();
let _task = fasync::Task::spawn(task);
let (proxy, controller) =
create_proxy_and_stream::<ftest_manager::RunControllerMarker>().unwrap();
let run_controller = fasync::Task::spawn(async move {
TestRunBuilder::run_controller(
controller,
remote_handle,
stop_sender,
recv,
&new_run_inspect_node(),
)
.await
});
proxy.stop().unwrap();
assert_eq!(proxy.get_events().await.unwrap(), vec![]);
drop(proxy);
run_controller.await.unwrap_err();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn run_controller_abort_when_channel_closed() {
let (_sender, recv) = mpsc::channel(1024);
let (stop_sender, _stop_recv) = oneshot::channel::<()>();
// Create a future that normally never resolves.
let (task, remote_handle) = futures::future::pending().remote_handle();
let pending_task = fasync::Task::spawn(task);
let (proxy, controller) =
create_proxy_and_stream::<ftest_manager::RunControllerMarker>().unwrap();
let run_controller = fasync::Task::spawn(async move {
TestRunBuilder::run_controller(
controller,
remote_handle,
stop_sender,
recv,
&new_run_inspect_node(),
)
.await
});
drop(proxy);
// After controller is dropped, both the controller future and the task it was
// controlling should terminate.
pending_task.await;
run_controller.await.unwrap_err();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn suite_controller_stop_test() {
let (sender, recv) = mpsc::channel(1024);
let (stop_sender, stop_recv) = oneshot::channel::<()>();
let (task, remote_handle) = async move {
stop_recv.await.unwrap();
// drop event sender so that fake test can end.
drop(sender);
}
.remote_handle();
let _task = fasync::Task::spawn(task);
let (proxy, mut controller) =
create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>().unwrap();
let run_controller = fasync::Task::spawn(async move {
Suite::run_controller(&mut controller, stop_sender, remote_handle, recv).await
});
proxy.stop().unwrap();
assert_eq!(proxy.get_events().await.unwrap(), Ok(vec![]));
// this should end
run_controller.await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn suite_controller_abort_remote_when_controller_closed() {
let (_sender, recv) = mpsc::channel(1024);
let (stop_sender, _stop_recv) = oneshot::channel::<()>();
// Create a future that normally never resolves.
let (task, remote_handle) = futures::future::pending().remote_handle();
let pending_task = fasync::Task::spawn(task);
let (proxy, mut controller) =
create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>().unwrap();
let run_controller = fasync::Task::spawn(async move {
Suite::run_controller(&mut controller, stop_sender, remote_handle, recv).await
});
drop(proxy);
// After controller is dropped, both the controller future and the task it was
// controlling should terminate.
pending_task.await;
run_controller.await.unwrap();
}
#[fuchsia_async::run_singlethreaded(test)]
async fn suite_controller_get_events() {
let (mut sender, recv) = mpsc::channel(1024);
let (stop_sender, stop_recv) = oneshot::channel::<()>();
let (task, remote_handle) = async {}.remote_handle();
let _task = fasync::Task::spawn(task);
let (proxy, mut controller) =
create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>().unwrap();
let run_controller = fasync::Task::spawn(async move {
Suite::run_controller(&mut controller, stop_sender, remote_handle, recv).await
});
sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
let events = proxy.get_events().await.unwrap().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(
events[0].payload,
SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
);
assert_eq!(
events[1].payload,
SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
);
sender.send(Ok(SuiteEvents::case_started(2).into())).await.unwrap();
proxy.stop().unwrap();
// test that controller collects event after stop is called.
sender.send(Ok(SuiteEvents::case_started(1).into())).await.unwrap();
sender.send(Ok(SuiteEvents::case_found(3, "case3".to_string()).into())).await.unwrap();
stop_recv.await.unwrap();
// drop event sender so that fake test can end.
drop(sender);
let events = proxy.get_events().await.unwrap().unwrap();
assert_eq!(events.len(), 3);
assert_eq!(events[0].payload, SuiteEvents::case_started(2).into_suite_run_event().payload,);
assert_eq!(events[1].payload, SuiteEvents::case_started(1).into_suite_run_event().payload,);
assert_eq!(
events[2].payload,
SuiteEvents::case_found(3, "case3".to_string()).into_suite_run_event().payload,
);
let events = proxy.get_events().await.unwrap().unwrap();
assert_eq!(events, vec![]);
// this should end
run_controller.await.unwrap();
}
#[test]
fn suite_controller_hanging_get_events() {
let mut executor = fasync::TestExecutor::new().unwrap();
let (mut sender, recv) = mpsc::channel(1024);
let (stop_sender, _stop_recv) = oneshot::channel::<()>();
let (task, remote_handle) = async {}.remote_handle();
let _task = fasync::Task::spawn(task);
let (proxy, mut controller) =
create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>().unwrap();
let _run_controller = fasync::Task::spawn(async move {
Suite::run_controller(&mut controller, stop_sender, remote_handle, recv).await
});
// send get event call which would hang as there are no events.
let mut get_events =
fasync::Task::spawn(async move { proxy.get_events().await.unwrap().unwrap() });
assert_eq!(executor.run_until_stalled(&mut get_events), std::task::Poll::Pending);
executor.run_singlethreaded(async {
sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
});
let events = executor.run_singlethreaded(get_events);
assert_eq!(events.len(), 2);
assert_eq!(
events[0].payload,
SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
);
assert_eq!(
events[1].payload,
SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
);
}
}