blob: 9a7035d731f368bf6fd40437bd4a946876385e4b [file] [log] [blame]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{cancel::OrCancel, trace::duration},
anyhow::Error,
diagnostics_data::{LogsData, Severity},
fidl_fuchsia_test_manager::LogsIteratorOption,
fuchsia_async as fasync,
futures::{Future, FutureExt, Stream, TryStreamExt},
log::warn,
std::{io::Write, time::Duration},
};
// TODO(fxbug.dev/54198, fxbug.dev/70581): deprecate this when implementing metadata selectors for
// logs or when we support OnRegisterInterest that can be sent to *all* test components.
#[derive(Clone, Default)]
pub(crate) struct LogCollectionOptions {
pub min_severity: Option<Severity>,
pub max_severity: Option<Severity>,
}
/// Options for timing out log collection.
/// Log collection is timed out when both |timeout_fut| is resolved and
/// |time_between_logs| has passed since either |timeout_fut| was signalled or
/// the last log was processed.
/// TODO(fxbug.dev/98223): remove timeouts once logs no longer hang.
pub(crate) struct LogTimeoutOptions<F: Future> {
pub timeout_fut: F,
pub time_between_logs: Duration,
}
impl LogCollectionOptions {
fn is_restricted_log(&self, log: &LogsData) -> bool {
let severity = log.metadata.severity;
matches!(self.max_severity, Some(max) if severity > max)
}
fn should_display(&self, log: &LogsData) -> bool {
let severity = log.metadata.severity;
matches!(self.min_severity, None)
|| matches!(self.min_severity, Some(min) if severity >= min)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum LogCollectionOutcome {
Error { restricted_logs: Vec<String> },
Passed,
}
impl From<Vec<String>> for LogCollectionOutcome {
fn from(restricted_logs: Vec<String>) -> Self {
if restricted_logs.is_empty() {
LogCollectionOutcome::Passed
} else {
LogCollectionOutcome::Error { restricted_logs }
}
}
}
/// Collects logs from |stream|, filters out low severity logs, and stores the results
/// in |log_artifact|. Returns any high severity restricted logs that are encountered.
pub(crate) async fn collect_logs<S, W, F>(
mut stream: S,
mut log_artifact: W,
options: LogCollectionOptions,
timeout_options: LogTimeoutOptions<F>,
) -> Result<LogCollectionOutcome, Error>
where
S: Stream<Item = Result<LogsData, Error>> + Unpin,
W: Write,
F: Future,
{
duration!("collect_logs");
let timeout_options = LogTimeoutOptions {
timeout_fut: timeout_options.timeout_fut.map(|_| ()).shared(),
time_between_logs: timeout_options.time_between_logs,
};
let mut restricted_logs = vec![];
while let Some(mut log) = next_log_or_timeout(&mut stream, &timeout_options).await? {
duration!("process_single_log");
let is_restricted = options.is_restricted_log(&log);
let should_display = options.should_display(&log);
if !should_display && !is_restricted {
continue;
}
if log.moniker == "test_root" {
log.moniker = "<root>".to_string();
} else if log.moniker.starts_with("test_root/") {
log.moniker = log.moniker.replace("test_root/", "");
}
let log_repr = format!("{}", log);
if should_display {
writeln!(log_artifact, "{}", log_repr)?;
}
if is_restricted {
restricted_logs.push(log_repr);
}
}
Ok(restricted_logs.into())
}
async fn next_log_or_timeout<S, F>(
stream: &mut S,
timeout_options: &LogTimeoutOptions<futures::future::Shared<F>>,
) -> Result<Option<LogsData>, Error>
where
S: Stream<Item = Result<LogsData, Error>> + Unpin,
F: Future,
<F as Future>::Output: std::clone::Clone,
{
let timeout = async {
timeout_options.timeout_fut.clone().await;
fasync::Timer::new(timeout_options.time_between_logs).await;
warn!("Log timeout invoked")
};
stream.try_next().or_cancelled(timeout).await.unwrap_or(Ok(None))
}
#[cfg(target_os = "fuchsia")]
pub fn get_type() -> LogsIteratorOption {
LogsIteratorOption::BatchIterator
}
#[cfg(not(target_os = "fuchsia"))]
pub fn get_type() -> LogsIteratorOption {
LogsIteratorOption::ArchiveIterator
}
#[cfg(test)]
mod test {
use {
super::*,
diagnostics_data::{BuilderArgs, LogsDataBuilder},
};
fn infinite_log_timeout() -> LogTimeoutOptions<futures::future::Pending<()>> {
LogTimeoutOptions {
timeout_fut: futures::future::pending::<()>(),
time_between_logs: Duration::ZERO,
}
}
#[fuchsia::test]
async fn simplify_log_moniker() {
let unaltered_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("my info log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root/child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Warn,
})
.set_message("my warn log")
.build(),
];
let altered_moniker_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("my info log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Warn,
})
.set_message("my warn log")
.build(),
];
let mut log_artifact = vec![];
assert_eq!(
collect_logs(
futures::stream::iter(unaltered_logs.into_iter().map(Ok)),
&mut log_artifact,
LogCollectionOptions { min_severity: None, max_severity: None },
infinite_log_timeout(),
)
.await
.unwrap(),
LogCollectionOutcome::Passed
);
assert_eq!(
String::from_utf8(log_artifact).unwrap(),
altered_moniker_logs
.iter()
.map(|log| format!("{}\n", log))
.collect::<Vec<_>>()
.concat()
);
}
#[fuchsia::test]
async fn filter_low_severity() {
let input_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("my info log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root/child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Warn,
})
.set_message("my info log")
.build(),
];
let displayed_logs = vec![LogsDataBuilder::new(BuilderArgs {
moniker: "child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Warn,
})
.set_message("my info log")
.build()];
let mut log_artifact = vec![];
assert_eq!(
collect_logs(
futures::stream::iter(input_logs.into_iter().map(Ok)),
&mut log_artifact,
LogCollectionOptions { min_severity: Severity::Warn.into(), max_severity: None },
infinite_log_timeout(),
)
.await
.unwrap(),
LogCollectionOutcome::Passed
);
assert_eq!(
String::from_utf8(log_artifact).unwrap(),
displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
);
}
#[fuchsia::test]
async fn display_restricted_logs() {
let input_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("my info log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "test_root/child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Error,
})
.set_message("my error log")
.build(),
];
let displayed_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("my info log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "child".into(),
timestamp_nanos: 1000i64.into(),
component_url: "test-child-url".to_string().into(),
severity: Severity::Error,
})
.set_message("my error log")
.build(),
];
let mut log_artifact = vec![];
assert_eq!(
collect_logs(
futures::stream::iter(input_logs.into_iter().map(Ok)),
&mut log_artifact,
LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
infinite_log_timeout(),
)
.await
.unwrap(),
LogCollectionOutcome::Error { restricted_logs: vec![format!("{}", displayed_logs[1])] }
);
assert_eq!(
String::from_utf8(log_artifact).unwrap(),
displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
);
}
// fuchsia only as TestExecutor is not available on host.
#[cfg(target_os = "fuchsia")]
mod fuchsia_tests {
use super::*;
use fuchsia_zircon as zx;
use futures::channel::mpsc;
#[fuchsia::test]
fn terminate_on_timeout() {
const TIMEOUT_BETWEEN_LOGS: zx::Duration = zx::Duration::from_seconds(5);
let mut executor = fasync::TestExecutor::new_with_fake_time().expect("create executor");
let (mut log_sender, log_recv) = mpsc::channel(5);
let timeout_signal = async_utils::event::Event::new();
let mut log_artifact = vec![];
let mut collect_logs_fut = collect_logs(
log_recv,
&mut log_artifact,
LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
LogTimeoutOptions {
timeout_fut: timeout_signal.wait(),
time_between_logs: Duration::from_secs(
TIMEOUT_BETWEEN_LOGS.into_seconds() as u64
),
},
)
.boxed();
// send first log
log_sender
.try_send(Ok(LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("first log")
.build()))
.expect("send log");
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// advancing time past the time between logs shouldn't timeout logs if the
// timeout signal isn't triggered yet
executor.set_fake_time(
executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
);
assert!(!executor.wake_expired_timers());
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// second log should be sent
log_sender
.try_send(Ok(LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("second log")
.build()))
.expect("send log");
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// after triggering timeout_signal, logs still received so long as time_between_logs
// doesn't pass
timeout_signal.signal();
log_sender
.try_send(Ok(LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("third log")
.build()))
.expect("send log");
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
executor.set_fake_time(
executor.now() + TIMEOUT_BETWEEN_LOGS - zx::Duration::from_seconds(1),
);
assert!(!executor.wake_expired_timers());
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// timeout period hasn't elapsed, so logs still accepted
log_sender
.try_send(Ok(LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("fourth log")
.build()))
.expect("send log");
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// after timeout period elapses, stop collecting logs even if stream is still open
executor.set_fake_time(
executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
);
assert!(executor.wake_expired_timers());
match executor.run_until_stalled(&mut collect_logs_fut) {
std::task::Poll::Ready(Ok(LogCollectionOutcome::Passed)) => (),
_ => panic!("Expected future to complete successfully"),
}
drop(collect_logs_fut);
let displayed_logs = vec![
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("first log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("second log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("third log")
.build(),
LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("fourth log")
.build(),
];
assert_eq!(
String::from_utf8(log_artifact).unwrap(),
displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
);
}
#[fuchsia::test]
fn timeout_not_triggered_until_timeout_signal_given() {
const TIMEOUT_BETWEEN_LOGS: zx::Duration = zx::Duration::from_seconds(5);
let mut executor = fasync::TestExecutor::new_with_fake_time().expect("create executor");
let (mut log_sender, log_recv) = mpsc::channel(5);
let timeout_signal = async_utils::event::Event::new();
let mut log_artifact = vec![];
let mut collect_logs_fut = collect_logs(
log_recv,
&mut log_artifact,
LogCollectionOptions { min_severity: None, max_severity: Severity::Warn.into() },
LogTimeoutOptions {
timeout_fut: timeout_signal.wait(),
time_between_logs: Duration::from_secs(
TIMEOUT_BETWEEN_LOGS.into_seconds() as u64
),
},
)
.boxed();
// send first log
log_sender
.try_send(Ok(LogsDataBuilder::new(BuilderArgs {
moniker: "test_root".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("first log")
.build()))
.expect("send log");
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// advancing time past the time between logs shouldn't timeout logs if the
// timeout signal isn't triggered yet
executor.set_fake_time(
executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
);
assert!(!executor.wake_expired_timers());
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
// when timeout is triggered, logs should be polled for an additional
// TIMEOUT_BETWEEN_LOGS
timeout_signal.signal();
assert!(executor.run_until_stalled(&mut collect_logs_fut).is_pending());
assert!(!executor.wake_expired_timers());
// After an additional TIMEOUT_BETWEEN_LOGS elapses logs should stop
executor.set_fake_time(
executor.now() + TIMEOUT_BETWEEN_LOGS + zx::Duration::from_seconds(1),
);
assert!(executor.wake_expired_timers());
match executor.run_until_stalled(&mut collect_logs_fut) {
std::task::Poll::Ready(Ok(LogCollectionOutcome::Passed)) => (),
_ => panic!("Expected future to complete successfully"),
}
drop(collect_logs_fut);
let displayed_logs = vec![LogsDataBuilder::new(BuilderArgs {
moniker: "<root>".into(),
timestamp_nanos: 0i64.into(),
component_url: "test-root-url".to_string().into(),
severity: Severity::Info,
})
.set_message("first log")
.build()];
assert_eq!(
String::from_utf8(log_artifact).unwrap(),
displayed_logs.iter().map(|log| format!("{}\n", log)).collect::<Vec<_>>().concat()
);
}
}
}