blob: 195a98336b45f48040d2112aa5da7e375d66a729 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! This crate runs and collects results from a test which implements fuchsia.test.Suite protocol.
use {
anyhow::{format_err, Context as _, Error},
fidl::handle::AsHandleRef,
fidl_fuchsia_test::{
CaseListenerRequest::Finished,
CaseListenerRequestStream, Invocation,
RunListenerRequest::{OnFinished, OnTestCaseStarted},
SuiteProxy,
},
fidl_fuchsia_test_manager::{
self as ftest_manager, HarnessProxy, LaunchOptions, LegacySuiteControllerProxy,
SuiteControllerProxy, SuiteEvent as FidlSuiteEvent,
SuiteEventPayload as FidlSuiteEventPayload,
},
fuchsia_async as fasync,
futures::{
channel::mpsc,
future::{join_all, try_join3},
io::{self, AsyncRead},
lock::Mutex,
prelude::*,
ready,
task::{Context, Poll},
},
glob,
linked_hash_map::LinkedHashMap,
log::*,
std::{
cell::RefCell, collections::HashMap, marker::Unpin, pin::Pin, sync::Arc, time::Duration,
},
};
pub use crate::diagnostics::{LogStream, LogStreamProtocol};
mod diagnostics;
/// Buffer logs for this duration before flushing them.
const LOG_BUFFERING_DURATION: std::time::Duration = std::time::Duration::from_secs(5);
/// Maximum log buffer size.
const LOG_BUFFER_SIZE: usize = 4096;
/// Duration after which to emit an excessive duration event.
const EXCESSIVE_DURATION: Duration = Duration::from_secs(60);
/// Options that apply when executing a test suite.
///
/// For the FIDL equivalent, see [`fidl_fuchsia_test::RunOptions`].
#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub struct TestRunOptions {
/// How to handle tests that were marked disabled/ignored by the developer.
pub disabled_tests: DisabledTestHandling,
/// Number of test cases to run in parallel.
pub parallel: Option<u16>,
/// Arguments passed to tests.
pub arguments: Vec<String>,
}
/// Event to send to caller of `run_test_component`
#[derive(PartialEq, Debug, Eq, Hash, Ord, PartialOrd, Clone)]
pub enum TestEvent {
/// A new test case is started.
TestCaseStarted { test_case_name: String },
/// Test case finished.
TestCaseFinished { test_case_name: String, result: TestResult },
/// Test case is still running after a long duration.
ExcessiveDuration { test_case_name: String, duration: Duration },
/// Test case produced a stdout message.
StdoutMessage { test_case_name: String, msg: String },
/// Test finishes successfully.
Finish,
}
impl TestEvent {
pub fn test_case_started(s: &str) -> TestEvent {
TestEvent::TestCaseStarted { test_case_name: s.to_string() }
}
pub fn stdout_message(name: &str, message: &str) -> TestEvent {
TestEvent::StdoutMessage { test_case_name: name.to_string(), msg: message.to_string() }
}
pub fn test_case_finished(name: &str, result: TestResult) -> TestEvent {
TestEvent::TestCaseFinished { test_case_name: name.to_string(), result: result }
}
pub fn excessive_duration(name: &str, duration: Duration) -> TestEvent {
TestEvent::ExcessiveDuration { test_case_name: name.to_string(), duration }
}
pub fn test_finished() -> TestEvent {
TestEvent::Finish
}
/// Returns the name of the test case to which the event belongs, if applicable.
pub fn test_case_name(&self) -> Option<&String> {
match self {
TestEvent::TestCaseStarted { test_case_name }
| TestEvent::TestCaseFinished { test_case_name, .. }
| TestEvent::StdoutMessage { test_case_name, .. }
| TestEvent::ExcessiveDuration { test_case_name, .. } => Some(test_case_name),
TestEvent::Finish => None,
// NOTE: If new global event types (not tied to a specific test case) are added,
// `GroupByTestCase` must also be updated so as to preserve correct event ordering.
// Otherwise, all such events will end up with a key of `None` in the map, and might
// therefore be spuriously bunched together.
}
}
/// Same as `test_case_name`, but returns an owned `Option<String>`.
pub fn owned_test_case_name(&self) -> Option<String> {
self.test_case_name().map(String::from)
}
}
/// Trait allowing iterators over `TestEvent` to be partitioned by test case name.
///
/// Note that the current implementation assumes that the only `TestEvent` type that occurs
/// _outside of test cases_ is `TestEvent::Finish`. If new global `TestEvent` types are added,
/// the implementation will have to be changed.
pub trait GroupByTestCase: Iterator<Item = TestEvent> + Sized {
/// Groups the `TestEvent`s by test case name into a map that preserves insertion order.
/// The overall order of test cases (by first event) and the orders of events within each test
/// case are preserved, but events from different test cases are effectively de-interleaved.
///
/// Example:
/// ```rust
/// use test_executor::{TestEvent, GroupByTestCase as _};
/// use linked_hash_map::LinkedHashMap;
///
/// let events: Vec<TestEvent> = get_events();
/// let grouped: LinkedHashMap<Option<String>, TestEvent> =
/// events.into_iter().group_by_test_case();
/// ```
fn group_by_test_case_ordered(self) -> LinkedHashMap<Option<String>, Vec<TestEvent>> {
let mut map = LinkedHashMap::new();
for test_event in self {
map.entry(test_event.owned_test_case_name()).or_insert(Vec::new()).push(test_event);
}
map
}
/// De-interleaves the `TestEvents` by test case. The overall order of test cases (by first
/// event) and the orders of events within each test case are preserved.
fn deinterleave(self) -> Box<dyn Iterator<Item = TestEvent>> {
Box::new(
self.group_by_test_case_ordered()
.into_iter()
.flat_map(|(_, events)| events.into_iter()),
)
}
/// Groups the `TestEvent`s by test case name into an unordered map. The orders of events within
/// each test case are preserved, but the test cases themselves are not in a defined order.
fn group_by_test_case_unordered(self) -> HashMap<Option<String>, Vec<TestEvent>> {
let mut map = HashMap::new();
for test_event in self {
map.entry(test_event.owned_test_case_name()).or_insert(Vec::new()).push(test_event);
}
map
}
}
impl<T> GroupByTestCase for T where T: Iterator<Item = TestEvent> + Sized {}
/// How to handle tests that were marked disabled/ignored by the developer.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DisabledTestHandling {
/// Skip tests that were marked disabled/ignored by the developer.
Exclude,
/// Explicitly include tests that were marked disabled/ignored by the developer.
Include,
}
impl Default for DisabledTestHandling {
fn default() -> Self {
DisabledTestHandling::Exclude
}
}
impl From<TestRunOptions> for fidl_fuchsia_test::RunOptions {
fn from(test_run_options: TestRunOptions) -> Self {
// Note: This will *not* break if new members are added to the FIDL table.
fidl_fuchsia_test::RunOptions {
parallel: test_run_options.parallel,
arguments: Some(test_run_options.arguments),
include_disabled_tests: Some(matches!(
test_run_options.disabled_tests,
DisabledTestHandling::Include
)),
..fidl_fuchsia_test::RunOptions::EMPTY
}
}
}
/// Defines the result of a test case run.
#[derive(PartialEq, Debug, Eq, Hash, Ord, PartialOrd, Copy, Clone)]
pub enum TestResult {
/// Test case passed.
Passed,
/// Test case failed.
Failed,
/// Test case skipped.
Skipped,
/// Test case did not communicate the result.
Error,
}
#[must_use = "futures/streams"]
pub struct StdoutStream {
socket: fidl::AsyncSocket,
}
impl Unpin for StdoutStream {}
thread_local! {
pub static BUFFER:
RefCell<[u8; 2048]> = RefCell::new([0; 2048]);
}
impl StdoutStream {
/// Creates a new `StdoutStream` for given `socket`.
pub fn new(socket: fidl::Socket) -> Result<StdoutStream, anyhow::Error> {
let stream = StdoutStream {
socket: fidl::AsyncSocket::from_socket(socket).context("Invalid zircon socket")?,
};
Ok(stream)
}
}
fn process_stdout_bytes(bytes: &[u8]) -> String {
// TODO(anmittal): Change this to consider break in logs and handle it.
let log = String::from_utf8_lossy(bytes);
log.to_string()
}
impl Stream for StdoutStream {
type Item = io::Result<String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
BUFFER.with(|b| {
let mut b = b.borrow_mut();
let len = ready!(Pin::new(&mut self.socket).poll_read(cx, &mut *b)?);
if len == 0 {
return Poll::Ready(None);
}
Poll::Ready(Some(process_stdout_bytes(&b[0..len])).map(Ok))
})
}
}
#[derive(Clone)]
enum LogEventSender {
TestEvent(mpsc::Sender<TestEvent>),
SuiteEvent(mpsc::Sender<SuiteEvent>),
}
impl From<mpsc::Sender<TestEvent>> for LogEventSender {
fn from(sender: mpsc::Sender<TestEvent>) -> Self {
LogEventSender::TestEvent(sender)
}
}
impl From<mpsc::Sender<SuiteEvent>> for LogEventSender {
fn from(sender: mpsc::Sender<SuiteEvent>) -> Self {
LogEventSender::SuiteEvent(sender)
}
}
struct LogOpt {
/// Socket of which test serves stdout.
stdout_socket: fidl::Socket,
/// Send Log Event over this channel.
sender: mpsc::Sender<String>,
/// Duration to buffer the logs before flushing them.
buffering_duration: std::time::Duration,
/// MAx buffer size before logs are flushed.
buffer_size: usize,
}
struct TestCaseProcessor {
f: fuchsia_async::Task<Result<(), anyhow::Error>>,
}
impl TestCaseProcessor {
/// This will start processing of stdout logs and events in the background. The owner of this
/// object should call `wait_for_finish` method to make sure all the background task completed.
pub fn new(
test_case_name: String,
listener: CaseListenerRequestStream,
stdout_socket: fidl::Socket,
mut sender: mpsc::Sender<TestEvent>,
) -> Self {
let (s, mut stdout_recv) = mpsc::channel(1024);
let stdout_fut = collect_and_send_stdout(stdout_socket, s);
let mut s_clone = sender.clone();
let test_case_name_clone = test_case_name.clone();
let stdout_collector = async move {
while let Some(msg) = stdout_recv.next().await {
s_clone
.send(TestEvent::stdout_message(&test_case_name_clone, &msg))
.await
.context("Error sending stdout msg")?;
}
Ok(())
};
let test_complete_fut = Self::listen_for_completion(listener);
let fut = async move {
let mut sender_clone = sender.clone();
let test_case_name_clone = test_case_name.clone();
let excessive_time_task = fasync::Task::spawn(async move {
fasync::Timer::new(EXCESSIVE_DURATION).await;
sender_clone
.send(TestEvent::excessive_duration(&test_case_name_clone, EXCESSIVE_DURATION))
.await
.context("Failed to send excessive duration event")
});
let ((), (), result) =
try_join3(stdout_fut, stdout_collector, test_complete_fut).await?;
excessive_time_task.cancel().await.unwrap_or(Ok(()))?;
if let Some(result) = result {
sender
.send(TestEvent::test_case_finished(&test_case_name, result))
.await
.context("Failed to send TestCaseFinished Event")
} else {
Ok(())
}
};
TestCaseProcessor { f: fuchsia_async::Task::spawn(fut) }
}
/// Listen for test completion on the given |listener|. Returns None if the channel is closed
/// before a test completion event.
async fn listen_for_completion(
mut listener: CaseListenerRequestStream,
) -> Result<Option<TestResult>, anyhow::Error> {
if let Some(request) = listener.try_next().await.context("waiting for listener")? {
let Finished { result, control_handle: _ } = request;
let result = match result.status {
Some(status) => match status {
fidl_fuchsia_test::Status::Passed => TestResult::Passed,
fidl_fuchsia_test::Status::Failed => TestResult::Failed,
fidl_fuchsia_test::Status::Skipped => TestResult::Skipped,
},
// This will happen when test protocol is not properly implemented
// by the test and it forgets to set the result.
None => TestResult::Error,
};
Ok(Some(result))
} else {
Ok(None)
}
}
/// This will wait for all the stdout logs and events to be collected
pub async fn wait_for_finish(self) -> Result<(), anyhow::Error> {
self.f.await.context("Failure waiting for stdout and events")
}
}
pub async fn collect_and_send_stdout(
socket: fidl::Socket,
sender: mpsc::Sender<String>,
) -> Result<(), anyhow::Error> {
collect_and_send_stdout_internal(LogOpt {
stdout_socket: socket,
sender,
buffering_duration: LOG_BUFFERING_DURATION,
buffer_size: LOG_BUFFER_SIZE,
})
.await
}
/// Internal method that put a listener on `stdout_socket`, process and send test stdout logs
/// asynchronously in the background. Returns immediately if the provided socket is an invalid
/// handle.
async fn collect_and_send_stdout_internal(log_opt: LogOpt) -> Result<(), anyhow::Error> {
if log_opt.stdout_socket.as_handle_ref().is_invalid() {
return Ok(());
}
let mut stream = match StdoutStream::new(log_opt.stdout_socket) {
Err(e) => {
error!("Stdout Logger: Failed to create fuchsia async socket: {:?}", e);
return Ok(());
}
Ok(stream) => stream,
};
let mut log_buffer =
StdoutBuffer::new(log_opt.buffering_duration, log_opt.sender, log_opt.buffer_size);
while let Some(log) = stream.try_next().await.context("Error reading stdout log msg")? {
log_buffer.send_log(&log).await?;
}
log_buffer.done().await
}
/// Buffers logs in memory for `duration` before sending it out.
/// This will not buffer any more logs after timer expires and will flush all
/// subsequent logs instantly.
/// Clients may call done() to obtain any errors. If not called, done() will be called when the
/// buffer is dropped and any errors will be suppressed.
struct StdoutBuffer {
inner: Arc<Mutex<StdoutBufferInner>>,
timer: fuchsia_async::Task<()>,
}
impl StdoutBuffer {
/// Crates new StdoutBuffer and starts the timer on log buffering.
/// `duration`: Buffers log for this duration or till done() is called.
/// `sender`: Channel to send logs on.
/// `max_capacity`: Flush log if buffer size exceeds this value. This will not cancel the timer
/// and all the logs would be flushed once timer expires.
pub fn new(
duration: std::time::Duration,
sender: mpsc::Sender<String>,
max_capacity: usize,
) -> Self {
let inner = StdoutBufferInner::new(sender, max_capacity);
let timer = fuchsia_async::Timer::new(duration);
let log_buffer = Arc::downgrade(&inner);
let f = async move {
timer.await;
if let Some(log_buffer) = log_buffer.upgrade() {
let mut log_buffer = log_buffer.lock().await;
if let Err(e) = log_buffer.stop_buffering().await {
log_buffer.set_error(e);
}
}
};
let timer = fuchsia_async::Task::spawn(f);
Self { inner, timer }
}
/// This will abort the timer (if not already fired) and then flush all
/// the logs in buffer.
/// This function should be called before the object is dropped.
/// Returns error due flushing the logs.
pub async fn done(self) -> Result<(), anyhow::Error> {
// abort timer so that it is not fired in the future.
self.timer.cancel().await;
let mut inner = self.inner.lock().await;
inner.flush().await?;
inner.error.take().map_or_else(|| Ok(()), Err)
}
/// This will instantly send logs over the channel if timer has already
/// fired, otherwise this will buffer the logs.
///
/// Returns error due flushing the logs.
pub async fn send_log(&mut self, message: &str) -> Result<(), anyhow::Error> {
let mut inner = self.inner.lock().await;
inner.send_log(message).await
}
}
struct StdoutBufferInner {
logs: String,
sender: mpsc::Sender<String>,
/// Whether to buffer logs or not.
buffer: bool,
error: Option<anyhow::Error>,
max_capacity: usize,
}
impl StdoutBufferInner {
fn new(sender: mpsc::Sender<String>, max_capacity: usize) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(StdoutBufferInner {
logs: String::with_capacity(max_capacity),
sender: sender,
buffer: true,
error: None,
max_capacity,
}))
}
async fn stop_buffering(&mut self) -> Result<(), anyhow::Error> {
self.buffer = false;
self.flush().await
}
fn set_error(&mut self, e: anyhow::Error) {
self.error = Some(e);
}
async fn flush(&mut self) -> Result<(), anyhow::Error> {
if !self.logs.is_empty() {
let ret = Self::send(&mut self.sender, &self.logs).await;
self.logs.truncate(0);
return ret;
}
Ok(())
}
async fn send(sender: &mut mpsc::Sender<String>, message: &str) -> Result<(), anyhow::Error> {
sender.send(message.into()).await.context("Error sending stdout msg")
}
async fn send_log(&mut self, message: &str) -> Result<(), anyhow::Error> {
if self.buffer {
self.logs.push_str(message);
if self.logs.len() >= self.max_capacity {
return self.flush().await;
}
return Ok(());
}
Self::send(&mut self.sender, message).await
}
}
impl Drop for StdoutBufferInner {
fn drop(&mut self) {
if !self.logs.is_empty() {
let message = self.logs.clone();
let mut sender = self.sender.clone();
fuchsia_async::Task::spawn(async move {
if let Err(e) = StdoutBufferInner::send(&mut sender, &message).await {
warn!("Error sending logs for {}", e);
}
})
.detach();
}
}
}
/// Encapsulates running suite instance.
pub struct SuiteInstance {
suite: SuiteProxy,
// For safekeeping so that running component remains alive.
controller: LegacySuiteControllerProxy,
log_stream: Option<LogStream>,
}
/// Options with which a `SuiteInstance` can be created
pub struct SuiteInstanceOpts<'a> {
/// The test harness connection.
pub harness: &'a HarnessProxy,
/// The URL of the test to run.
pub test_url: &'a str,
/// Whether or not to force a specific protocol streaming for fetching logs. When unset, it'll
/// default to `BatchIterator` when running on a Fuchsia system or `ArchiveIterator` when
/// running on a host system. When running on a host system, only `ArchiveIterator` can be
/// used, so this option will be ignored.
pub force_log_protocol: Option<LogStreamProtocol>,
}
impl SuiteInstance {
/// Launches the test and returns an encapsulated object.
pub async fn new(opts: SuiteInstanceOpts<'_>) -> Result<Self, anyhow::Error> {
if !opts.test_url.ends_with(".cm") {
return Err(anyhow::anyhow!(
"Tried to run a component as a v2 test that doesn't have a .cm extension"
));
}
let (suite, controller, log_stream) = Self::launch_test_suite(opts).await?;
Ok(Self { suite, controller, log_stream: Some(log_stream) })
}
/// Kill the suite instance.
pub fn kill(&self) -> Result<(), fidl::Error> {
self.controller.kill()
}
/// Take the singleton log stream associated with the current instance.
pub fn take_log_stream(&mut self) -> Option<LogStream> {
self.log_stream.take()
}
async fn launch_test_suite(
opts: SuiteInstanceOpts<'_>,
) -> Result<(SuiteProxy, LegacySuiteControllerProxy, LogStream), anyhow::Error> {
let (suite_proxy, suite_server_end) = fidl::endpoints::create_proxy().unwrap();
let (controller_proxy, controller_server_end) = fidl::endpoints::create_proxy().unwrap();
debug!("Launching test component `{}`", opts.test_url);
let mut log_stream = diagnostics::LogStream::create(opts.force_log_protocol)?;
let options = LaunchOptions {
logs_iterator: log_stream.take_iterator_server_end(),
..LaunchOptions::EMPTY
};
opts.harness
.launch_suite(&opts.test_url, options, suite_server_end, controller_server_end)
.await
.context("launch_test call failed")?
.map_err(|e: fidl_fuchsia_test_manager::LaunchError| {
anyhow::anyhow!("error launching test: {:?}", e)
})?;
Ok((suite_proxy, controller_proxy, log_stream))
}
/// Enumerates test and return invocations.
pub async fn enumerate_tests(
&self,
test_filter: &Option<&str>,
) -> Result<Vec<Invocation>, anyhow::Error> {
debug!("enumerating tests");
let (case_iterator, server_end) = fidl::endpoints::create_proxy()?;
self.suite.get_tests(server_end).map_err(suite_error).context("getting test cases")?;
let mut invocations = vec![];
let pattern = test_filter
.map(|filter| {
glob::Pattern::new(filter)
.map_err(|e| anyhow::anyhow!("Bad test filter pattern: {}", e))
})
.transpose()?;
loop {
let cases = case_iterator
.get_next()
.await
.map_err(suite_error)
.context("Error getting test cases")?;
if cases.is_empty() {
break;
}
for case in cases {
let case_name =
case.name.ok_or(format_err!("invocation should contain a name."))?;
if pattern.as_ref().map_or(true, |p| p.matches(&case_name)) {
invocations.push(Invocation {
name: Some(case_name),
tag: None,
..Invocation::EMPTY
});
}
}
}
debug!("invocations: {:#?}", invocations);
Ok(invocations)
}
/// Enumerate tests and then run all the test cases in the suite.
pub async fn run_and_collect_results(
&self,
sender: mpsc::Sender<TestEvent>,
test_filter: Option<&str>,
run_options: TestRunOptions,
) -> Result<(), anyhow::Error> {
let invocations =
self.enumerate_tests(&test_filter).await.context("Cannot enumerate tests")?;
self.run_and_collect_results_for_invocations(sender, invocations, run_options).await
}
/// Runs the test component using `suite` and collects test stdout logs and results.
pub async fn run_and_collect_results_for_invocations(
&self,
mut sender: mpsc::Sender<TestEvent>,
invocations: Vec<Invocation>,
run_options: TestRunOptions,
) -> Result<(), anyhow::Error> {
debug!("running tests");
let mut successful_completion = true; // will remain true, if there are no tests to run.
let mut invocations_iter = invocations.into_iter();
let run_options: fidl_fuchsia_test::RunOptions = run_options.into();
loop {
const INVOCATIONS_CHUNK: usize = 50;
let chunk = invocations_iter.by_ref().take(INVOCATIONS_CHUNK).collect::<Vec<_>>();
if chunk.is_empty() {
break;
}
let res = match self
.run_invocations(chunk, run_options.clone(), &mut sender)
.await
.context("Error running test cases")
{
Ok(success) => success,
Err(e) => {
return Err(e);
}
};
successful_completion &= res;
}
if successful_completion {
sender.send(TestEvent::test_finished()).await.context("sending TestFinished event")?;
}
Ok(())
}
/// Runs the test component using `suite` and collects stdout logs and results.
async fn run_invocations(
&self,
invocations: Vec<Invocation>,
run_options: fidl_fuchsia_test::RunOptions,
sender: &mut mpsc::Sender<TestEvent>,
) -> Result<bool, anyhow::Error> {
let (run_listener_client, mut run_listener) =
fidl::endpoints::create_request_stream::<fidl_fuchsia_test::RunListenerMarker>()
.context("creating request stream")?;
self.suite.run(
&mut invocations.into_iter().map(|i| i.into()),
run_options,
run_listener_client,
)?;
let mut test_case_processors = Vec::new();
let mut successful_completion = false;
while let Some(result_event) =
run_listener.try_next().await.context("waiting for listener")?
{
match result_event {
OnTestCaseStarted { invocation, primary_log, listener, control_handle: _ } => {
let name =
invocation.name.ok_or(anyhow::anyhow!("cannot find name in invocation"))?;
sender
.send(TestEvent::test_case_started(&name))
.await
.context("Failed to send TestCaseStart event")?;
let test_case_processor = TestCaseProcessor::new(
name,
listener.into_stream().context("Cannot convert listener to stream")?,
primary_log,
sender.clone(),
);
test_case_processors.push(test_case_processor);
}
OnFinished { .. } => {
successful_completion = true;
break;
}
}
}
// await for all invocations to complete for which test case never completed.
join_all(test_case_processors.into_iter().map(|i| i.wait_for_finish()))
.await
.into_iter()
.collect::<Result<Vec<()>, Error>>()
.context("Error waiting for all invocations to finish")?;
Ok(successful_completion)
}
/// Consume this instance and returns underlying proxies.
pub fn into_proxies(self) -> (SuiteProxy, LegacySuiteControllerProxy) {
return (self.suite, self.controller);
}
}
fn suite_error(err: fidl::Error) -> anyhow::Error {
match err {
fidl::Error::ClientChannelClosed { .. } => anyhow::anyhow!(
"The test protocol was closed. This may mean `fuchsia.test.Suite` was not \
configured correctly. Refer to: \
https://fuchsia.dev/fuchsia-src/development/components/v2/troubleshooting#troubleshoot-test"
),
err => err.into(),
}
}
/// Builds and runs test suite(s).
pub struct TestBuilder {
proxy: ftest_manager::RunBuilderProxy,
}
impl TestBuilder {
/// Create new instance
pub fn new(proxy: ftest_manager::RunBuilderProxy) -> Self {
Self { proxy }
}
pub fn take_proxy(self) -> ftest_manager::RunBuilderProxy {
self.proxy
}
/// Add suite to run.
pub async fn add_suite(
&self,
test_url: &str,
run_options: ftest_manager::RunOptions,
) -> Result<SuiteRunInstance, Error> {
let (controller_proxy, controller) =
fidl::endpoints::create_proxy().context("Cannot create proxy")?;
self.proxy.add_suite(test_url, run_options, controller)?;
Ok(SuiteRunInstance { controller_proxy: controller_proxy.into() })
}
/// Runs all tests to completion.
pub async fn run(self) -> Result<(), Error> {
let (controller_proxy, controller) =
fidl::endpoints::create_proxy().context("Cannot create proxy")?;
self.proxy.build(controller).context("Error starting tests")?;
// wait for test to end
let v = controller_proxy.get_events().await.context("Cannot wait for tests to end")?;
if v.len() != 0 {
return Err(anyhow::format_err!("The vector should have been empty, something wrong with test manager. Please file bug."));
}
Ok(())
}
}
/// Events produced by test suite.
pub struct SuiteEvent {
pub timestamp: Option<i64>,
pub payload: SuiteEventPayload,
}
impl SuiteEvent {
pub fn case_found(timestamp: Option<i64>, name: String) -> Self {
SuiteEvent { timestamp, payload: SuiteEventPayload::RunEvent(RunEvent::case_found(name)) }
}
pub fn case_started(timestamp: Option<i64>, name: String) -> Self {
SuiteEvent { timestamp, payload: SuiteEventPayload::RunEvent(RunEvent::case_started(name)) }
}
pub fn case_stdout<N, L>(timestamp: Option<i64>, name: N, stdout_message: L) -> Self
where
N: Into<String>,
L: Into<String>,
{
SuiteEvent {
timestamp,
payload: SuiteEventPayload::RunEvent(RunEvent::case_stdout(
name.into(),
stdout_message.into(),
)),
}
}
pub fn case_stopped(
timestamp: Option<i64>,
name: String,
status: ftest_manager::CaseStatus,
) -> Self {
SuiteEvent {
timestamp,
payload: SuiteEventPayload::RunEvent(RunEvent::case_stopped(name, status)),
}
}
pub fn case_finished(timestamp: Option<i64>, name: String) -> Self {
SuiteEvent {
timestamp,
payload: SuiteEventPayload::RunEvent(RunEvent::case_finished(name)),
}
}
pub fn suite_finished(timestamp: Option<i64>, status: ftest_manager::SuiteStatus) -> Self {
SuiteEvent {
timestamp,
payload: SuiteEventPayload::RunEvent(RunEvent::suite_finished(status)),
}
}
pub fn suite_log(timestamp: Option<i64>, log_stream: LogStream) -> Self {
SuiteEvent { timestamp, payload: SuiteEventPayload::SuiteLog { log_stream } }
}
pub fn test_case_log(timestamp: Option<i64>, name: String, log_stream: LogStream) -> Self {
SuiteEvent { timestamp, payload: SuiteEventPayload::TestCaseLog { name, log_stream } }
}
}
pub enum SuiteEventPayload {
/// Logger for test suite
SuiteLog { log_stream: LogStream },
/// Logger for a test case in suite.
TestCaseLog { name: String, log_stream: LogStream },
/// Test events.
RunEvent(RunEvent),
}
#[derive(PartialEq, Debug, Eq, Hash, Ord, PartialOrd, Clone)]
pub enum RunEvent {
CaseFound { name: String },
CaseStarted { name: String },
CaseStdout { name: String, stdout_message: String },
CaseStopped { name: String, status: ftest_manager::CaseStatus },
CaseFinished { name: String },
SuiteFinished { status: ftest_manager::SuiteStatus },
}
impl RunEvent {
pub fn case_found<S>(name: S) -> Self
where
S: Into<String>,
{
Self::CaseFound { name: name.into() }
}
pub fn case_started<S>(name: S) -> Self
where
S: Into<String>,
{
Self::CaseStarted { name: name.into() }
}
pub fn case_stdout<S, L>(name: S, stdout_message: L) -> Self
where
S: Into<String>,
L: Into<String>,
{
Self::CaseStdout { name: name.into(), stdout_message: stdout_message.into() }
}
pub fn case_stopped<S>(name: S, status: ftest_manager::CaseStatus) -> Self
where
S: Into<String>,
{
Self::CaseStopped { name: name.into(), status }
}
pub fn case_finished<S>(name: S) -> Self
where
S: Into<String>,
{
Self::CaseFinished { name: name.into() }
}
pub fn suite_finished(status: ftest_manager::SuiteStatus) -> Self {
Self::SuiteFinished { status }
}
/// Returns the name of the test case to which the event belongs, if applicable.
pub fn test_case_name(&self) -> Option<&String> {
match self {
RunEvent::CaseFound { name }
| RunEvent::CaseStarted { name }
| RunEvent::CaseStdout { name, .. }
| RunEvent::CaseStopped { name, .. }
| RunEvent::CaseFinished { name } => Some(name),
RunEvent::SuiteFinished { .. } => None,
}
}
/// Same as `test_case_name`, but returns an owned `Option<String>`.
pub fn owned_test_case_name(&self) -> Option<String> {
self.test_case_name().map(String::from)
}
}
/// Trait allowing iterators over `RunEvent` to be partitioned by test case name.
pub trait GroupRunEventByTestCase: Iterator<Item = RunEvent> + Sized {
/// Groups the `RunEvent`s by test case name into a map that preserves insertion order.
/// The overall order of test cases (by first event) and the orders of events within each test
/// case are preserved, but events from different test cases are effectively de-interleaved.
///
/// Example:
/// ```rust
/// use test_executor::{RunEvent, GroupRunEventByTestCase as _};
/// use linked_hash_map::LinkedHashMap;
///
/// let events: Vec<RunEvent> = get_events();
/// let grouped: LinkedHashMap<Option<String>, RunEvent> =
/// events.into_iter().group_by_test_case();
/// ```
fn group_by_test_case_ordered(self) -> LinkedHashMap<Option<String>, Vec<RunEvent>> {
let mut map = LinkedHashMap::new();
for run_event in self {
map.entry(run_event.owned_test_case_name()).or_insert(Vec::new()).push(run_event);
}
map
}
/// De-interleaves the `RunEvents` by test case. The overall order of test cases (by first
/// event) and the orders of events within each test case are preserved.
fn deinterleave(self) -> Box<dyn Iterator<Item = RunEvent>> {
Box::new(
self.group_by_test_case_ordered()
.into_iter()
.flat_map(|(_, events)| events.into_iter()),
)
}
/// Groups the `RunEvent`s by test case name into an unordered map. The orders of events within
/// each test case are preserved, but the test cases themselves are not in a defined order.
fn group_by_test_case_unordered(self) -> HashMap<Option<String>, Vec<RunEvent>> {
let mut map = HashMap::new();
for run_event in self {
map.entry(run_event.owned_test_case_name()).or_insert(Vec::new()).push(run_event);
}
map
}
}
impl<T> GroupRunEventByTestCase for T where T: Iterator<Item = RunEvent> + Sized {}
#[derive(Default)]
struct FidlSuiteEventProcessor {
case_map: HashMap<u32, String>,
stdout_map: HashMap<u32, Vec<fasync::Task<Result<(), Error>>>>,
}
impl FidlSuiteEventProcessor {
fn new() -> Self {
FidlSuiteEventProcessor::default()
}
fn get_test_case_name(&self, identifier: u32) -> String {
self.case_map
.get(&identifier)
.expect(&format!("invalid test case identifier: {}", identifier))
.clone()
}
async fn process(
&mut self,
event: FidlSuiteEvent,
mut sender: mpsc::Sender<SuiteEvent>,
) -> Result<(), Error> {
let timestamp = event.timestamp;
let e = match event.payload.expect("Payload cannot be null, please file bug.") {
FidlSuiteEventPayload::CaseFound(cf) => {
self.case_map.insert(cf.identifier, cf.test_case_name.clone());
SuiteEvent::case_found(timestamp, cf.test_case_name).into()
}
FidlSuiteEventPayload::CaseStarted(cs) => {
let test_case_name = self.get_test_case_name(cs.identifier);
SuiteEvent::case_started(timestamp, test_case_name).into()
}
FidlSuiteEventPayload::CaseStopped(cs) => {
let test_case_name = self.get_test_case_name(cs.identifier);
if let Some(stdouts) = self.stdout_map.remove(&cs.identifier) {
for s in stdouts {
s.await
.context(format!("error collecting stdout of {}", test_case_name))?;
}
}
SuiteEvent::case_stopped(timestamp, test_case_name, cs.status).into()
}
FidlSuiteEventPayload::CaseFinished(cf) => {
let test_case_name = self.get_test_case_name(cf.identifier);
SuiteEvent::case_finished(timestamp, test_case_name).into()
}
FidlSuiteEventPayload::CaseArtifact(ca) => {
let name = self.get_test_case_name(ca.identifier);
match ca.artifact {
ftest_manager::Artifact::Stdout(stdout) => {
let (s, mut r) = mpsc::channel(1024);
let stdout_task = fasync::Task::spawn(collect_and_send_stdout(stdout, s));
let mut sender_clone = sender.clone();
let send_stdout_task = fasync::Task::spawn(async move {
while let Some(msg) = r.next().await {
sender_clone
.send(SuiteEvent::case_stdout(None, &name, msg))
.await
.context(format!("cannot send logs for {}", name))?;
}
Ok(())
});
match self.stdout_map.get_mut(&ca.identifier) {
Some(v) => {
v.push(stdout_task);
v.push(send_stdout_task);
}
None => {
self.stdout_map
.insert(ca.identifier, vec![stdout_task, send_stdout_task]);
}
}
None
}
ftest_manager::Artifact::Stderr(_) => {
panic!("not supported")
}
ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
Ok(log_stream) => {
SuiteEvent::test_case_log(timestamp, name, log_stream).into()
}
Err(e) => {
warn!("Cannot collect logs for test suite: {:?}", e);
None
}
},
_ => {
panic!("not supported")
}
}
}
FidlSuiteEventPayload::SuiteArtifact(sa) => match sa.artifact {
ftest_manager::Artifact::Stdout(_) => {
panic!("not supported")
}
ftest_manager::Artifact::Stderr(_) => {
panic!("not supported")
}
ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
Ok(log_stream) => SuiteEvent::suite_log(timestamp, log_stream).into(),
Err(e) => {
warn!("Cannot collect logs for test suite: {:?}", e);
None
}
},
_ => {
panic!("not supported")
}
},
FidlSuiteEventPayload::SuiteFinished(sf) => SuiteEvent {
timestamp,
payload: SuiteEventPayload::RunEvent(RunEvent::SuiteFinished { status: sf.status }),
}
.into(),
};
if let Some(item) = e {
sender.send(item).await.context("Cannot send event")?;
}
Ok(())
}
}
#[derive(Debug, thiserror::Error, Eq, PartialEq, Copy, Clone)]
pub enum SuiteLaunchError {
#[error("Cannot enumerate tests")]
CaseEnumeration,
#[error("Cannot resolve test url")]
InstanceCannotResolve,
#[error("Invalid arguments passed")]
InvalidArgs,
#[error("Cannot connect to test suite")]
FailedToConnectToTestSuite,
#[error("resource unavailable")]
ResourceUnavailable,
#[error("Some internal error ocurred. Please file bug")]
InternalError,
}
impl From<ftest_manager::LaunchError> for SuiteLaunchError {
fn from(error: ftest_manager::LaunchError) -> Self {
match error {
ftest_manager::LaunchError::ResourceUnavailable => {
SuiteLaunchError::ResourceUnavailable
}
ftest_manager::LaunchError::InstanceCannotResolve => {
SuiteLaunchError::InstanceCannotResolve
}
ftest_manager::LaunchError::InvalidArgs => SuiteLaunchError::InvalidArgs,
ftest_manager::LaunchError::FailedToConnectToTestSuite => {
SuiteLaunchError::FailedToConnectToTestSuite
}
ftest_manager::LaunchError::CaseEnumeration => SuiteLaunchError::CaseEnumeration,
ftest_manager::LaunchError::InternalError => SuiteLaunchError::InternalError,
}
}
}
/// Instance to control a single test suite run.
pub struct SuiteRunInstance {
controller_proxy: Arc<SuiteControllerProxy>,
}
impl SuiteRunInstance {
pub fn controller(&self) -> Arc<SuiteControllerProxy> {
self.controller_proxy.clone()
}
pub async fn collect_events(&self, sender: mpsc::Sender<SuiteEvent>) -> Result<(), Error> {
let controller_proxy = self.controller_proxy.clone();
let mut processor = FidlSuiteEventProcessor::new();
loop {
match controller_proxy.get_events().await? {
Err(e) => return Err(SuiteLaunchError::from(e).into()),
Ok(events) => {
if events.len() == 0 {
break;
}
for event in events {
if let Err(e) = processor.process(event, sender.clone()).await {
warn!("error running test suite: {:?}", e);
let _ = controller_proxy.kill();
return Ok(());
}
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::HandleBased;
use futures::StreamExt;
use maplit::hashmap;
use pretty_assertions::assert_eq;
#[fuchsia_async::run_singlethreaded(test)]
async fn collect_test_stdout() {
let (sock_server, sock_client) =
fidl::Socket::create(fidl::SocketOpts::STREAM).expect("Failed while creating socket");
let (sender, mut recv) = mpsc::channel(1);
let fut = fuchsia_async::Task::spawn(collect_and_send_stdout_internal(LogOpt {
stdout_socket: sock_client,
sender: sender.into(),
buffering_duration: std::time::Duration::from_millis(1),
buffer_size: LOG_BUFFER_SIZE,
}));
sock_server.write(b"test message 1").expect("Can't write msg to socket");
sock_server.write(b"test message 2").expect("Can't write msg to socket");
sock_server.write(b"test message 3").expect("Can't write msg to socket");
let mut msg = recv.next().await;
assert_eq!(msg, Some("test message 1test message 2test message 3".into()));
// can receive messages multiple times
sock_server.write(b"test message 4").expect("Can't write msg to socket");
msg = recv.next().await;
assert_eq!(msg, Some("test message 4".into()));
// messages can be read after socket server is closed.
sock_server.write(b"test message 5").expect("Can't write msg to socket");
sock_server.into_handle(); // this will drop this handle and close it.
fut.await.expect("log collection should not fail");
msg = recv.next().await;
assert_eq!(msg, Some("test message 5".into()));
// socket was closed, this should return None
msg = recv.next().await;
assert_eq!(msg, None);
}
/// Host side executor doesn't have a fake timer, so these tests only run on device for now.
#[cfg(target_os = "fuchsia")]
mod stdout {
use {
super::*,
fidl::endpoints::create_proxy_and_stream,
fidl_fuchsia_test::CaseListenerMarker,
fuchsia_async::{pin_mut, TestExecutor},
fuchsia_zircon::DurationNum,
matches::assert_matches,
pretty_assertions::assert_eq,
std::ops::Add,
};
fn send_msg(
executor: &mut fuchsia_async::TestExecutor,
log_buffer: &mut StdoutBuffer,
msg: &str,
) {
let f = async {
log_buffer.send_log(&msg).await.unwrap();
};
pin_mut!(f);
assert_eq!(executor.run_until_stalled(&mut f), Poll::Ready(()));
}
fn recv_msg<T>(
executor: &mut fuchsia_async::TestExecutor,
recv: &mut mpsc::Receiver<T>,
) -> Poll<Option<T>> {
let f = recv.next();
pin_mut!(f);
executor.run_until_stalled(&mut f)
}
#[test]
fn log_buffer_without_timeout() {
let mut executor = TestExecutor::new_with_fake_time().unwrap();
let (sender, mut recv) = mpsc::channel(1);
let mut log_buffer =
StdoutBuffer::new(std::time::Duration::from_secs(5), sender.into(), 100);
let msg1 = "message1".to_owned();
let msg2 = "message2".to_owned();
send_msg(&mut executor, &mut log_buffer, &msg1);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
send_msg(&mut executor, &mut log_buffer, &msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
let f = async {
log_buffer.done().await.unwrap();
};
pin_mut!(f);
assert_eq!(executor.run_until_stalled(&mut f), Poll::Ready(()));
let mut expected_msg = msg1;
expected_msg.push_str(&msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(expected_msg)));
}
#[test]
fn log_buffer_with_timeout() {
let mut executor = TestExecutor::new_with_fake_time().unwrap();
let (sender, mut recv) = mpsc::channel(1);
let mut log_buffer =
StdoutBuffer::new(std::time::Duration::from_secs(5), sender.into(), 100);
let msg1 = "message1".to_owned();
let msg2 = "message2".to_owned();
send_msg(&mut executor, &mut log_buffer, &msg1);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
send_msg(&mut executor, &mut log_buffer, &msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
executor.set_fake_time(executor.now().add(6.seconds()));
executor.wake_next_timer();
let mut expected_msg = msg1.clone();
expected_msg.push_str(&msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(expected_msg)));
// timer fired, no more buffering should happen.
send_msg(&mut executor, &mut log_buffer, &msg1);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(msg1)));
send_msg(&mut executor, &mut log_buffer, &msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(msg2)));
let f = async {
log_buffer.done().await.unwrap();
};
pin_mut!(f);
assert_eq!(executor.run_until_stalled(&mut f), Poll::Ready(()));
}
#[test]
fn log_buffer_capacity_reached() {
let mut executor = TestExecutor::new_with_fake_time().unwrap();
let (sender, mut recv) = mpsc::channel(1);
let mut log_buffer =
StdoutBuffer::new(std::time::Duration::from_secs(5), sender.into(), 10);
let msg1 = "message1".to_owned();
let msg2 = "message2".to_owned();
send_msg(&mut executor, &mut log_buffer, &msg1);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
send_msg(&mut executor, &mut log_buffer, &msg2);
let mut expected_msg = msg1.clone();
expected_msg.push_str(&msg2);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(expected_msg)));
// capacity was reached but buffering is still on, so next msg should buffer
send_msg(&mut executor, &mut log_buffer, &msg1);
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Pending);
let f = async {
log_buffer.done().await.unwrap();
};
pin_mut!(f);
assert_eq!(executor.run_until_stalled(&mut f), Poll::Ready(()));
assert_eq!(recv_msg(&mut executor, &mut recv), Poll::Ready(Some(msg1)));
}
#[test]
fn collect_test_stdout_when_socket_closed() {
let mut executor = TestExecutor::new_with_fake_time().unwrap();
let (sock_server, sock_client) = fidl::Socket::create(fidl::SocketOpts::STREAM)
.expect("Failed while creating socket");
let (sender, mut recv) = mpsc::channel(1);
let mut fut = collect_and_send_stdout_internal(LogOpt {
stdout_socket: sock_client,
sender: sender.into(),
buffering_duration: std::time::Duration::from_secs(10),
buffer_size: LOG_BUFFER_SIZE,
})
.boxed();
sock_server.write(b"test message 1").expect("Can't write msg to socket");
sock_server.write(b"test message 2").expect("Can't write msg to socket");
sock_server.write(b"test message 3").expect("Can't write msg to socket");
sock_server.into_handle(); // this will drop this handle and close it.
// timer is never fired but we should still receive logs.
assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(())));
assert_eq!(
recv_msg(&mut executor, &mut recv),
Poll::Ready(Some("test message 1test message 2test message 3".into()))
);
}
#[test]
fn emit_excessive_runtime_event() {
let mut executor = TestExecutor::new_with_fake_time().unwrap();
let (sock_server, sock_client) = fidl::Socket::create(fidl::SocketOpts::STREAM)
.expect("Failed while creating socket");
let (listener_proxy, listener_stream) =
create_proxy_and_stream::<CaseListenerMarker>().expect("Failed to create proxy");
let name = "test_name";
let (sender, mut recv) = mpsc::channel(1);
let mut processor_fut =
TestCaseProcessor::new(name.to_string(), listener_stream, sock_client, sender)
.wait_for_finish()
.boxed();
assert_matches!(executor.run_until_stalled(&mut processor_fut), Poll::Pending);
assert_matches!(executor.run_until_stalled(&mut recv.next()), Poll::Pending);
// after LOG_BUFFERING_DURATION seconds log dump timeout is invoked
let begin_time = executor.now();
let log_dump_timer = executor.wake_next_timer().expect("Failed to wake next timer");
executor.set_fake_time(log_dump_timer);
assert_matches!(executor.run_until_stalled(&mut processor_fut), Poll::Pending);
// after EXCESSIVE_DURATION the excessive duration event is triggered
let excessive_duration_timer =
executor.wake_next_timer().expect("Failed to wake next timer");
assert_eq!(
excessive_duration_timer,
begin_time + fuchsia_zircon::Duration::from(EXCESSIVE_DURATION)
);
executor.set_fake_time(excessive_duration_timer);
assert_matches!(executor.run_until_stalled(&mut processor_fut), Poll::Pending);
assert_eq!(
executor.run_until_stalled(&mut recv.next()),
Poll::Ready(Some(TestEvent::excessive_duration(name, EXCESSIVE_DURATION)))
);
// Signal test complete and verify finished event.
drop(sock_server);
assert!(listener_proxy
.finished(fidl_fuchsia_test::Result_ {
status: Some(fidl_fuchsia_test::Status::Passed),
..fidl_fuchsia_test::Result_::EMPTY
})
.is_ok());
assert_matches!(executor.run_until_stalled(&mut processor_fut), Poll::Ready(Ok(())));
assert_eq!(
executor.run_until_stalled(&mut recv.collect::<Vec<_>>()),
Poll::Ready(vec![TestEvent::test_case_finished(name, TestResult::Passed)])
);
}
}
#[test]
fn group_by_test_case_ordered() {
let events = vec![
TestEvent::test_case_started("a::a"),
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_started("c::c"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
TestEvent::test_case_finished("a::a", TestResult::Failed),
TestEvent::test_case_finished("b::b", TestResult::Passed),
TestEvent::test_finished(),
];
let actual: Vec<(Option<String>, Vec<TestEvent>)> =
events.into_iter().group_by_test_case_ordered().into_iter().collect();
let expected = vec![
(
Some("a::a".to_string()),
vec![
TestEvent::test_case_started("a::a"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_finished("a::a", TestResult::Failed),
],
),
(
Some("b::b".to_string()),
vec![
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("b::b", TestResult::Passed),
],
),
(
Some("c::c".to_string()),
vec![
TestEvent::test_case_started("c::c"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
],
),
(None, vec![TestEvent::test_finished()]),
];
assert_eq!(actual, expected);
}
#[test]
fn deinterleave() {
let events = vec![
TestEvent::test_case_started("a::a"),
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_started("c::c"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
TestEvent::test_case_finished("a::a", TestResult::Failed),
TestEvent::test_case_finished("b::b", TestResult::Passed),
TestEvent::test_finished(),
];
let expected = vec![
TestEvent::test_case_started("a::a"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_finished("a::a", TestResult::Failed),
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("b::b", TestResult::Passed),
TestEvent::test_case_started("c::c"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
TestEvent::test_finished(),
];
let actual: Vec<TestEvent> = events.into_iter().deinterleave().collect();
assert_eq!(actual, expected);
}
#[test]
fn group_by_test_case_unordered() {
let events = vec![
TestEvent::test_case_started("a::a"),
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_started("c::c"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
TestEvent::test_case_finished("a::a", TestResult::Failed),
TestEvent::test_case_finished("b::b", TestResult::Passed),
TestEvent::test_finished(),
];
let expected = hashmap! {
Some("a::a".to_string()) => vec![
TestEvent::test_case_started("a::a"),
TestEvent::stdout_message("a::a", "log"),
TestEvent::test_case_finished("a::a", TestResult::Failed),
],
Some("b::b".to_string()) => vec![
TestEvent::test_case_started("b::b"),
TestEvent::stdout_message("b::b", "log"),
TestEvent::test_case_finished("b::b", TestResult::Passed),
],
Some("c::c".to_string()) => vec![
TestEvent::test_case_started("c::c"),
TestEvent::test_case_finished("c::c", TestResult::Passed),
],
None => vec![TestEvent::Finish],
};
let actual = events.into_iter().group_by_test_case_unordered();
assert_eq!(actual, expected)
}
}