blob: 715d34257c46301e907b5961cc3b5fd92b05102f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod actor;
pub mod environment;
mod actor_runner;
mod counter;
use {
crate::{counter::start_counter, environment::Environment},
fuchsia_async::{Time, Timer},
futures::{
future::{select, Aborted, Either},
stream::FuturesUnordered,
StreamExt,
},
log::{error, info, set_logger, set_max_level, LevelFilter},
rand::{rngs::SmallRng, Rng, SeedableRng},
std::{
io::{stdout, Write},
time::Duration,
},
};
// A simple logger that prints to stdout
pub struct StdoutLogger;
impl StdoutLogger {
pub fn init(filter: LevelFilter) {
set_logger(&StdoutLogger).expect("Failed to set StdoutLogger as global logger");
set_max_level(filter);
}
}
impl log::Log for StdoutLogger {
fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
true
}
fn log(&self, record: &log::Record<'_>) {
if self.enabled(record.metadata()) {
match record.level() {
log::Level::Info => {
println!("{}", record.args());
}
log::Level::Error => {
eprintln!("{}: {}", record.level(), record.args());
}
_ => {
println!("{}: {}", record.level(), record.args());
}
}
}
}
fn flush(&self) {
stdout().flush().unwrap();
}
}
/// Use entropy to generate a random seed
pub fn random_seed() -> u64 {
let mut temp_rng = SmallRng::from_entropy();
temp_rng.gen()
}
/// Runs the test loop for the given environment to completion.
pub async fn run_test<E: 'static + Environment>(mut env: E) {
let env_string = format!("{:#?}", env);
info!("--------------------- stressor is starting -----------------------");
info!("{}", env_string);
info!("------------------------------------------------------------------");
{
// Setup a panic handler that prints out details of this invocation on crash
let default_panic_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
error!("");
error!("--------------------- stressor has crashed -----------------------");
error!("{}", env_string);
error!("------------------------------------------------------------------");
error!("");
default_panic_hook(panic_info);
}));
}
// Extract the data from the environment
// Defaults:
// - target_operations: 2^64
// - timeout_secs: 24 hours
let target_operations = env.target_operations().unwrap_or(u64::MAX);
let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
// Start the counter thread
// The counter thread keeps track of the global operation count.
// Each actor will send a message to the counter thread when an operation is completed.
// When the target operation count is hit, the counter task exits.
let (counter_task, counter_tx) = start_counter(target_operations);
// Create a timeout task
let timeout = Timer::new(Time::after(timeout_secs.into()));
let mut test_end = select(counter_task, timeout);
// A monotonically increasing counter representing the current generation.
// On every environment reset, the generation is incremented.
let mut generation: u64 = 0;
// Start all the runners
let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) =
env.actor_runners().into_iter().map(|r| r.run(counter_tx.clone(), generation)).unzip();
loop {
// Wait for one of the runners, counter task or timer to return
let either = select(test_end, runner_tasks.next()).await;
match either {
Either::Left((test_end_either, _next)) => {
let reason = match test_end_either {
Either::Left(..) => "operation count",
Either::Right(..) => "timeout",
};
// The counter/timer task returned.
// The target operation count was hit or the timer expired.
// The test has completed.
info!("Stress test has completed because of {}!", reason);
for abort in runner_abort {
abort.abort();
}
// We don't care if tasks finished or were aborted, but we want them not running
// anymore before we return.
//
// Runaway threads can cause problems if they're using objects from the main
// executor, and it's generally a good idea to clean up after ourselves here.
let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
break;
}
Either::Right((None, _counter_task)) => {
info!("No runners to operate");
break;
}
Either::Right((Some(result), task)) => {
let (runner, runner_generation) = result.expect("no tasks have been aborted");
// Normally, actor runners run indefinitely.
// However, one of the actor runners has returned.
// This is because an actor has requested an environment reset.
// Move the counter/timer back
test_end = task;
// Did the runner request a reset at the current generation?
if runner_generation == generation {
// Reset the environment
info!("Resetting environment");
env.reset().await;
// Advance the generation
generation += 1;
}
// Restart this runner with the current generation
let (task, abort) = runner.run(counter_tx.clone(), generation);
runner_tasks.push(task);
runner_abort.push(abort);
}
}
}
}