blob: 725f1277b5380eab70d8e3e37f2709e5d386563f [file] [log] [blame] [edit]
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::{
actor::{ActorConfig, ActorError},
counter::CounterTx,
instance::InstanceUnderTest,
},
fuchsia_async::Task,
futures::lock::Mutex,
log::debug,
std::{sync::Arc, thread::sleep, time::Duration},
};
pub enum ActorRunnerResult {
/// The actor owned by this runner requires a new instance.
/// This happens when the actor returns ActorError::GetNewInstance.
GetNewInstance,
/// The actor was taken from the runner.
/// This can happen when another runner returned GetNewInstance,
/// causing ActorRunner::take() to be called on all runners.
ActorLost,
}
/// The thread that runs an actor against an instance until an error occurs.
#[derive(Clone)]
pub struct ActorRunner<I: InstanceUnderTest> {
config: Arc<Mutex<Option<ActorConfig<I>>>>,
}
impl<I: InstanceUnderTest> ActorRunner<I> {
pub fn new(
instance_id: u64,
config: ActorConfig<I>,
instance: I,
counter_tx: CounterTx,
) -> (Self, Task<ActorRunnerResult>) {
let runner = Self { config: Arc::new(Mutex::new(Some(config))) };
let task = runner.clone().run(instance_id, instance, counter_tx);
(runner, task)
}
/// Takes ownership of the actor away from this runner.
/// Blocks if the actor is currently performing.
/// Panics if the actor was already taken.
///
/// If any actor requires a new instance, all actors are taken from their runners.
/// A new instance is created and the actors are given new runners that operate on
/// the new instance.
///
/// Runners that no longer have an actor will terminate with ActorRunnerResult::ActorLost.
pub async fn take(self) -> ActorConfig<I> {
let mut lock = self.config.lock().await;
lock.take().expect("Actor was already taken!")
}
/// Gets the actor and makes it perform a single operation against a given instance.
///
/// Possible results:
/// * Some(Ok(())) -> Actor was present and operation succeeded
/// * Some(Err(ActorError)) -> Actor was present but operation failed
/// * None -> Actor was taken
async fn perform(&self, instance: &mut I) -> Option<Result<(), ActorError>> {
let mut lock = self.config.lock().await;
if let Some(config) = &mut *lock {
// At this point, the actor must perform.
// If any other thread is attempting to take the actor, they must
// wait until the actor is done performing.
Some(config.actor.perform(instance).await)
} else {
None
}
}
/// Run the actor against a given instance on a new thread indefinitely.
/// The runner will stop if the actor returns an error or if the actor is taken.
fn run(
self,
instance_id: u64,
mut instance: I,
counter_tx: CounterTx,
) -> Task<ActorRunnerResult> {
Task::blocking(async move {
let (name, delay) = {
let lock = self.config.lock().await;
let config = lock.as_ref().unwrap();
(config.name.clone(), config.delay)
};
let mut local_count: u64 = 0;
loop {
if delay > 0 {
debug!(
"[{}][{}][{}] Sleeping for {} seconds",
name, instance_id, local_count, delay
);
sleep(Duration::from_secs(delay));
}
debug!("[{}][{}][{}] Performing...", name, instance_id, local_count);
let result = self.perform(&mut instance).await;
match result {
Some(Ok(())) => {
// Count this iteration towards the global count
let _ = counter_tx.unbounded_send(name.clone());
debug!("[{}][{}][{}] Done!", name, instance_id, local_count);
}
Some(Err(ActorError::DoNotCount)) => {
// Do not count this iteration towards global count
}
Some(Err(ActorError::GetNewInstance)) => {
// Actor needs new instance. Stop the runner
debug!("[{}][{}][{}] Needs new instance!", name, local_count, instance_id);
return ActorRunnerResult::GetNewInstance;
}
None => {
// The actor was taken. Stop the runner
debug!("[{}][{}][{}] Actor lost!", name, instance_id, local_count);
return ActorRunnerResult::ActorLost;
}
}
local_count += 1;
}
})
}
}