blob: 8a93731a4e036e4a084592e8a15b0ee725de2439 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::model::collector::DataCollector,
crate::model::model::DataModel,
anyhow::Result,
log::{error, info},
serde::{Deserialize, Serialize},
std::collections::{HashMap, HashSet},
std::fmt,
std::sync::{mpsc, Arc, Condvar, Mutex},
std::thread,
thiserror::Error,
uuid::Uuid,
};
#[derive(Error, Debug)]
pub enum SchedulerError {
#[error("Scheduled tasks have unmet dependencies")]
UnmetDependencies,
}
#[derive(PartialEq, Eq, Debug, Copy, Clone, Deserialize, Serialize)]
pub enum CollectorState {
Scheduled,
Running,
Idle,
Terminated,
}
impl fmt::Display for CollectorState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CollectorState::Scheduled => write!(f, "Scheduled"),
CollectorState::Running => write!(f, "Running"),
CollectorState::Idle => write!(f, "Idle"),
CollectorState::Terminated => write!(f, "Terminated"),
}
}
}
enum CollectorMessage {
Collect(Arc<DataModel>),
Terminate,
}
/// A CollectorInstance runs a single `DataCollector` on its own single thread.
/// The CollectorInstance manages all messaging to the worker thread which
/// includes non-preemptive thread termination when the instance is dropped.
struct CollectorInstance {
pub instance_id: Uuid,
pub name: String,
pub dependencies: HashSet<Uuid>,
pub state: Arc<(Mutex<CollectorState>, Condvar)>,
sender: mpsc::Sender<CollectorMessage>,
worker: Option<thread::JoinHandle<()>>,
}
impl CollectorInstance {
pub fn new(
instance_id: Uuid,
name: String,
dependencies: HashSet<Uuid>,
collector: Arc<dyn DataCollector>,
) -> Self {
let (sender, recv) = mpsc::channel();
let state = Arc::new((Mutex::new(CollectorState::Idle), Condvar::new()));
// Clone these so that we can pass references to the worker thread.
let collector_state = Arc::clone(&state);
let data_collector = Arc::clone(&collector);
let worker = thread::spawn(move || loop {
let message = recv.recv().unwrap();
match message {
CollectorMessage::Collect(model) => {
{
let (state_lock, cvar) = &*collector_state;
let mut state = state_lock.lock().unwrap();
*state = CollectorState::Running;
cvar.notify_one();
}
if let Err(e) = data_collector.collect(Arc::clone(&model)) {
error!("Collector failed with error {}", e);
}
{
let (state_lock, cvar) = &*collector_state;
let mut state = state_lock.lock().unwrap();
*state = CollectorState::Idle;
cvar.notify_one();
}
}
CollectorMessage::Terminate => {
let (state_lock, cvar) = &*collector_state;
let mut state = state_lock.lock().unwrap();
*state = CollectorState::Terminated;
cvar.notify_one();
break;
}
}
});
Self { instance_id, name, dependencies, state, sender, worker: Some(worker) }
}
/// Sends a message to the collector instance worker to run this collector.
/// If the worker is currently busy this message will be queued.
pub fn run(&self, model: Arc<DataModel>) {
let (state_lock, cvar) = &*self.state;
let mut state = state_lock.lock().unwrap();
if *state != CollectorState::Terminated {
*state = CollectorState::Scheduled;
cvar.notify_one();
}
self.sender.send(CollectorMessage::Collect(model)).unwrap();
}
}
impl Drop for CollectorInstance {
fn drop(&mut self) {
self.sender.send(CollectorMessage::Terminate).unwrap();
if let Some(worker) = self.worker.take() {
worker.join().unwrap();
}
}
}
type CollectorHandle = usize;
/// The `CollectorScheduler` contains all of the `DataCollectors` registered by `Plugins`.
/// It provides a simple way to collectively run the data collectors.
pub struct CollectorScheduler {
model: Arc<DataModel>,
collectors: Arc<Mutex<HashMap<CollectorHandle, CollectorInstance>>>,
next_handle: usize,
}
impl CollectorScheduler {
pub fn new(model: Arc<DataModel>) -> Self {
Self { model: model, collectors: Arc::new(Mutex::new(HashMap::new())), next_handle: 1 }
}
/// Adds a collector associated with a particular `instance_id` to the collector
/// scheduler. An internal handle is returned that can be used to reference
/// the specific Collector instance in the future.
pub fn add(
&mut self,
instance_id: Uuid,
name: impl Into<String>,
dependencies: HashSet<Uuid>,
collector: Arc<dyn DataCollector>,
) -> CollectorHandle {
let mut collectors = self.collectors.lock().unwrap();
let handle = self.next_handle;
self.next_handle += 1;
collectors.insert(
handle,
CollectorInstance::new(instance_id, name.into(), dependencies, collector),
);
handle
}
/// Returns a list of all the scheduled collectors.
pub fn collectors_all(&self) -> Vec<(CollectorHandle, String)> {
let collectors = self.collectors.lock().unwrap();
let mut collector_list = Vec::new();
for (handle, instance) in collectors.iter() {
collector_list.push((handle.clone(), instance.name.clone()))
}
collector_list
}
/// Returns a list of all associated collector handles and names for each
/// collector. Collector names should be unique per instance_id but not
/// globally.
pub fn collectors(&self, instance_id: Uuid) -> Vec<(CollectorHandle, String)> {
let collectors = self.collectors.lock().unwrap();
let mut collector_list = Vec::new();
for (handle, instance) in collectors.iter() {
if instance.instance_id == instance_id {
collector_list.push((handle.clone(), instance.name.clone()))
}
}
collector_list
}
/// Removes all `CollectorInstance` objects with a matching instance-id.
/// This effectively unhooks all the plugins collectors.
pub fn remove_all(&mut self, instance_id: Uuid) {
let mut collectors = self.collectors.lock().unwrap();
collectors.retain(|_, v| v.instance_id != instance_id);
}
/// Removes all `CollectorInstance` objects with a matching instance-id.
/// This effectively unhooks all the plugins collectors.
pub fn remove(&mut self, handle: &CollectorHandle) {
let mut collectors = self.collectors.lock().unwrap();
collectors.remove(handle);
}
/// Runs all of the collector instances taking into account dependencies.
/// For example if Collector A depends on Collector B the scheduler will
/// first run Collector A wait until it has finished and then Schedule
/// Collector B. This schedule will terminate once all collectors
/// have been run.
pub fn schedule(&self) -> Result<()> {
let collectors = self.collectors.lock().unwrap();
info!("Collector Scheduler: Scheduling {} Tasks", collectors.len());
// The set of instances that have finished.
let mut collectors_to_schedule: HashSet<CollectorHandle> =
collectors.iter().map(|(handle, _)| handle.clone()).collect();
let mut collector_finished = HashSet::new();
while !collectors_to_schedule.is_empty() {
let mut collectors_to_run = HashSet::new();
// Select all collectors that have their dependencies met.
for handle in collectors_to_schedule.iter() {
let collector_deps = &collectors.get(handle).unwrap().dependencies;
if collector_deps.iter().all(|&id| collector_finished.contains(&id)) {
collectors_to_run.insert(handle.clone());
}
}
// Safe guard against infinite looping.
if collectors_to_run.is_empty() && !collectors_to_schedule.is_empty() {
error!(
"Collector Scheduler: Fatal error {} tasks have unmet dependencies.",
collectors_to_schedule.len()
);
for handle in collectors_to_schedule.iter() {
error!("Failed to schedule: {}", collectors.get(handle).unwrap().name);
}
error!("Collector Scheduler: Aborting collection tasks");
return Err(SchedulerError::UnmetDependencies.into());
}
// Execute the current batch of collectors that are unblocked.
info!("Collector Scheduler: Batching {} Independent Tasks", collectors_to_run.len());
for handle in collectors_to_run.iter() {
let collector = collectors.get(handle).unwrap();
info!(
"Running Collector {} from Plugin Instance {}",
collector.name, collector.instance_id
);
collector.run(Arc::clone(&self.model));
}
// Wait for the current set of handles to finish. A cvar is used
// to sleep this thread waking it up only on state changes from
// one of the collectors.
info!("Collector Scheduler: Batched Tasks Started");
for handle in collectors_to_run.iter() {
let collector = collectors.get(handle).unwrap();
let (state_lock, cvar) = &*collector.state;
let mut state = state_lock.lock().unwrap();
while *state != CollectorState::Idle {
state = cvar.wait(state).unwrap();
}
}
info!("Collector Scheduler: Batched Tasks Finished");
// Update the collector state sets.
for handle in collectors_to_run.iter() {
let instance_id = collectors.get(handle).unwrap().instance_id;
collector_finished.insert(instance_id);
collectors_to_schedule.remove(handle);
}
}
info!("Collector Scheduler: Finished {} Tasks", collectors.len());
Ok(())
}
/// Returns true if all collectors are currently idle. This function is not
/// thread safe.
pub fn all_idle(&self) -> bool {
let collectors = self.collectors.lock().unwrap();
for (_, instance) in collectors.iter() {
let (state_lock, _cvar) = &*instance.state;
if *state_lock.lock().unwrap() == CollectorState::Running {
return false;
}
}
return true;
}
/// Retrieve the collector state of a particular collector.
pub fn state(&self, handle: &CollectorHandle) -> Option<CollectorState> {
let collectors = self.collectors.lock().unwrap();
if let Some(collector) = collectors.get(handle) {
let (state_lock, _cvar) = &*collector.state;
let state = state_lock.lock().unwrap();
Some(*state)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use {super::*, crate::model::model::ModelEnvironment, tempfile::tempdir};
struct MockCollector {
id: u32,
sequence: Arc<Mutex<Vec<u32>>>,
}
impl MockCollector {
/// The default collector does not block.
pub fn default() -> Self {
MockCollector::new(0, Arc::new(Mutex::new(Vec::new())))
}
pub fn new(id: u32, sequence: Arc<Mutex<Vec<u32>>>) -> Self {
Self { id, sequence }
}
}
impl DataCollector for MockCollector {
fn collect(&self, _: Arc<DataModel>) -> Result<()> {
// Push the id to the shared sequence.
self.sequence.lock().unwrap().push(self.id);
Ok(())
}
}
/// Utility function to create a temporary collector.
fn create_scheduler() -> CollectorScheduler {
let store_dir = tempdir().unwrap();
let build_tmp_dir = tempdir().unwrap();
let uri = store_dir.into_path().into_os_string().into_string().unwrap();
let build_path = build_tmp_dir.into_path();
let model = Arc::new(DataModel::connect(ModelEnvironment { uri, build_path }).unwrap());
CollectorScheduler::new(model)
}
#[test]
fn test_task_add_idle() {
let mut scheduler = create_scheduler();
let collector = Arc::new(MockCollector::default());
let instance_id = Uuid::new_v4();
let handle = scheduler.add(instance_id.clone(), "foo", HashSet::new(), collector.clone());
let state = scheduler.state(&handle).unwrap();
assert_eq!(state, CollectorState::Idle);
}
#[test]
fn test_idle_all() {
let mut scheduler = create_scheduler();
let collector = Arc::new(MockCollector::default());
let instance_id = Uuid::new_v4();
scheduler.add(instance_id.clone(), "foo", HashSet::new(), collector.clone());
assert_eq!(scheduler.all_idle(), true);
}
#[test]
fn test_task_remove() {
let mut scheduler = create_scheduler();
let collector = Arc::new(MockCollector::default());
let instance_id = Uuid::new_v4();
let handle = scheduler.add(instance_id.clone(), "foo", HashSet::new(), collector.clone());
scheduler.remove(&handle);
assert_eq!(scheduler.state(&handle).is_none(), true);
}
#[test]
fn test_task_remove_all() {
let mut scheduler = create_scheduler();
let collector = Arc::new(MockCollector::default());
let instance_id = Uuid::new_v4();
let handle = scheduler.add(instance_id.clone(), "foo", HashSet::new(), collector.clone());
scheduler.remove_all(instance_id);
assert_eq!(scheduler.state(&handle).is_none(), true);
}
#[test]
fn test_plugin_dependency_ordering() {
let sequence = Arc::new(Mutex::new(Vec::new()));
let mut scheduler = create_scheduler();
let instance_id_a = Uuid::new_v4();
let collector_a = Arc::new(MockCollector::new(1, Arc::clone(&sequence)));
let plugin_a_deps = HashSet::new();
let instance_id_b = Uuid::new_v4();
let collector_b = Arc::new(MockCollector::new(2, Arc::clone(&sequence)));
let mut plugin_b_deps = HashSet::new();
plugin_b_deps.insert(instance_id_a);
scheduler.add(instance_id_a.clone(), "A", plugin_a_deps, collector_a.clone());
scheduler.add(instance_id_b.clone(), "B", plugin_b_deps, collector_b.clone());
scheduler.schedule().unwrap();
assert_eq!(*sequence.lock().unwrap(), vec![1, 2]);
}
}