[component_manager] Code organization and additional unit test for cpu stats
Bug: 56570
Tested: existing tests + new tests
Change-Id: I3325a1f8c7cbf9e133af2bf56572285d542a1217
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/507701
Commit-Queue: Miguel Flores <miguelfrde@google.com>
Fuchsia-Auto-Submit: Miguel Flores <miguelfrde@google.com>
Reviewed-by: Gary Bressler <geb@google.com>
diff --git a/src/sys/component_manager/BUILD.gn b/src/sys/component_manager/BUILD.gn
index 7a8b467..0b71664 100644
--- a/src/sys/component_manager/BUILD.gn
+++ b/src/sys/component_manager/BUILD.gn
@@ -112,9 +112,16 @@
"src/capability.rs",
"src/capability_ready_notifier.rs",
"src/channel.rs",
- "src/component_tree_stats.rs",
"src/config.rs",
"src/constants.rs",
+ "src/diagnostics/component_stats.rs",
+ "src/diagnostics/component_tree_stats.rs",
+ "src/diagnostics/constants.rs",
+ "src/diagnostics/measurement.rs",
+ "src/diagnostics/mod.rs",
+ "src/diagnostics/runtime_stats_source.rs",
+ "src/diagnostics/task_info.rs",
+ "src/diagnostics/testing.rs",
"src/elf_runner/component.rs",
"src/elf_runner/config.rs",
"src/elf_runner/error.rs",
diff --git a/src/sys/component_manager/src/builtin_environment.rs b/src/sys/component_manager/src/builtin_environment.rs
index 3fb0f78..95c92cb 100644
--- a/src/sys/component_manager/src/builtin_environment.rs
+++ b/src/sys/component_manager/src/builtin_environment.rs
@@ -25,8 +25,8 @@
vmex_resource::VmexResource,
},
capability_ready_notifier::CapabilityReadyNotifier,
- component_tree_stats::ComponentTreeStats,
config::RuntimeConfig,
+ diagnostics::ComponentTreeStats,
elf_runner::ElfRunner,
framework::RealmCapabilityHost,
fuchsia_boot_resolver, fuchsia_pkg_resolver,
diff --git a/src/sys/component_manager/src/diagnostics/component_stats.rs b/src/sys/component_manager/src/diagnostics/component_stats.rs
new file mode 100644
index 0000000..a6597c4
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/component_stats.rs
@@ -0,0 +1,96 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::diagnostics::{
+ measurement::Measurement, runtime_stats_source::RuntimeStatsSource, task_info::TaskInfo,
+ },
+ fidl_fuchsia_diagnostics_types::{ComponentTasks, Task},
+ fuchsia_async as fasync, fuchsia_inspect as inspect,
+ futures::future::join_all,
+};
+
+pub struct ComponentStats<T: RuntimeStatsSource> {
+ tasks: Vec<TaskInfo<T>>,
+ _task: Option<fasync::Task<()>>,
+ is_measuring: bool,
+}
+
+impl<T: RuntimeStatsSource> ComponentStats<T> {
+ /// Creates a new `ComponentStats` awaiting the component tasks not ready to take measurements
+ /// yet.
+ pub fn pending(task: fasync::Task<()>) -> Self {
+ Self { tasks: Vec::new(), _task: Some(task), is_measuring: false }
+ }
+
+ /// Creates a new `ComponentStats` and starts taking measurements.
+ pub fn ready(task: TaskInfo<T>) -> Self {
+ Self { tasks: vec![task], is_measuring: true, _task: None }
+ }
+
+ /// Takes a runtime info measurement and records it. Drops old ones if the maximum amount is
+ /// exceeded.
+ pub async fn measure(&mut self) -> Measurement {
+ let measurements = join_all(self.tasks.iter_mut().map(|task| task.measure())).await;
+ let mut result = Measurement::default();
+ for measurement in measurements {
+ if let Some(m) = measurement {
+ result += m;
+ }
+ }
+ result
+ }
+
+ /// A `ComponentStats` is alive when:
+ /// - It has not started measuring yet: this means we are still waiting for the diagnostics
+ /// data to arrive from the runner, or
+ /// - Any of its tasks are alive.
+ pub fn is_alive(&self) -> bool {
+ !self.is_measuring || self.tasks.iter().any(|task| task.is_alive())
+ }
+
+ /// Removes all tasks that are not alive.
+ pub fn clean_stale(&mut self) {
+ self.tasks.retain(|task| task.is_alive());
+ }
+
+ /// Writes the stats to inspect under the given node. Returns the number of tasks that were
+ /// written.
+ pub fn record_to_node(&self, node: &inspect::Node) -> u64 {
+ let mut task_count = 0;
+ for task in &self.tasks {
+ task.record_to_node(&node);
+ task_count += 1;
+ }
+ task_count
+ }
+
+ pub fn is_measuring(&self) -> bool {
+ self.is_measuring
+ }
+
+ #[cfg(test)]
+ pub fn tasks_mut(&mut self) -> &mut [TaskInfo<T>] {
+ &mut self.tasks
+ }
+
+ #[cfg(test)]
+ pub fn total_measurements(&self) -> usize {
+ self.tasks.iter().map(|task| task.total_measurements()).sum()
+ }
+}
+
+impl ComponentStats<Task> {
+ /// Start reading CPU stats.
+ pub async fn start_measuring(&mut self, tasks: ComponentTasks) {
+ if let Some(task) = tasks.component_task.and_then(|task| TaskInfo::try_from(task).ok()) {
+ self.tasks.push(task);
+ self.measure().await;
+ }
+ // Still mark is_measuring as true, if we failed to convert to a TaskInfo it means the
+ // component already died since we couldn't query its basic info so we should make this
+ // true so at least we clean up.
+ self.is_measuring = true;
+ }
+}
diff --git a/src/sys/component_manager/src/component_tree_stats.rs b/src/sys/component_manager/src/diagnostics/component_tree_stats.rs
similarity index 67%
rename from src/sys/component_manager/src/component_tree_stats.rs
rename to src/sys/component_manager/src/diagnostics/component_tree_stats.rs
index 105cff6..176e5a1 100644
--- a/src/sys/component_manager/src/component_tree_stats.rs
+++ b/src/sys/component_manager/src/diagnostics/component_tree_stats.rs
@@ -4,34 +4,30 @@
use {
crate::{
+ diagnostics::{
+ component_stats::ComponentStats, constants::*, measurement::Measurement,
+ runtime_stats_source::RuntimeStatsSource, task_info::TaskInfo,
+ },
model::error::ModelError,
model::hooks::{
Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration, RuntimeInfo,
},
},
async_trait::async_trait,
- fidl_fuchsia_diagnostics_types::{
- ComponentDiagnostics, ComponentTasks, Task as DiagnosticsTask, TaskUnknown,
- },
+ fidl_fuchsia_diagnostics_types::{ComponentDiagnostics, Task as DiagnosticsTask},
fuchsia_async as fasync,
fuchsia_inspect::{self as inspect, HistogramProperty},
fuchsia_inspect_contrib::nodes::BoundedListNode,
- fuchsia_zircon::{self as zx, AsHandleRef, HandleBased, Task},
- fuchsia_zircon_sys as zx_sys,
+ fuchsia_zircon::{self as zx, HandleBased},
futures::{lock::Mutex, FutureExt},
log::warn,
moniker::{AbsoluteMoniker, ExtendedMoniker},
std::{
- collections::{BTreeMap, VecDeque},
- ops::{AddAssign, Deref, DerefMut},
+ collections::BTreeMap,
sync::{Arc, Weak},
- time::Duration,
},
};
-const CPU_SAMPLE_PERIOD_SECONDS: Duration = Duration::from_secs(60);
-const COMPONENT_CPU_MAX_SAMPLES: usize = 60;
-
/// Provides stats for all components running in the system.
pub struct ComponentTreeStats<T: RuntimeStatsSource> {
/// Map from a moniker of a component running in the system to its stats.
@@ -50,7 +46,7 @@
totals: Mutex<AggregatedStats>,
}
-impl<T: 'static + RuntimeStatsSource + Send> ComponentTreeStats<T> {
+impl<T: 'static + RuntimeStatsSource + Send + Sync> ComponentTreeStats<T> {
pub async fn new(node: inspect::Node) -> Arc<Self> {
let processing_times = node.create_int_exponential_histogram(
"processing_times_ns",
@@ -107,7 +103,7 @@
async fn track_ready(&self, moniker: ExtendedMoniker, source: T) {
if let Ok(task_info) = TaskInfo::try_from(source) {
let mut stats = ComponentStats::ready(task_info);
- stats.measure();
+ stats.measure().await;
self.tree.lock().await.insert(moniker.clone(), Arc::new(Mutex::new(stats)));
}
}
@@ -136,7 +132,7 @@
let mut task_count = 0;
for (moniker, stats) in self.tree.lock().await.iter() {
let stats_guard = stats.lock().await;
- if stats_guard.is_measuring {
+ if stats_guard.is_measuring() {
// TODO(fxbug.dev/73169): unify diagnostics and component manager monikers.
let key = match moniker {
ExtendedMoniker::ComponentManager => moniker.to_string(),
@@ -149,7 +145,7 @@
}
};
let child = node.create_child(key);
- task_count += stats_guard.write_inspect_to(&child);
+ task_count += stats_guard.record_to_node(&child);
node.record(child);
}
}
@@ -166,7 +162,7 @@
let mut to_remove = Vec::new();
for (moniker, stat) in stats.into_iter() {
let mut stat_guard = stat.lock().await;
- aggregated += &stat_guard.measure();
+ aggregated += &stat_guard.measure().await;
stat_guard.clean_stale();
if !stat_guard.is_alive() {
to_remove.push(moniker);
@@ -247,7 +243,7 @@
if let Some(ComponentDiagnostics { tasks: Some(tasks), .. }) = receiver.await.ok() {
if let Some(this) = weak_self.upgrade() {
if let Some(stats) = this.tree.lock().await.get_mut(&moniker_for_fut) {
- stats.lock().await.start_measuring(tasks);
+ stats.lock().await.start_measuring(tasks).await;
}
}
}
@@ -299,278 +295,23 @@
// automatically a timestamp for the entry.
let mut node_writer = self.node.create_entry();
node_writer
- .create_int("timestamp", measurement.timestamp.into_nanos())
- .create_int("cpu_time", measurement.cpu_time.into_nanos())
- .create_int("queue_time", measurement.queue_time.into_nanos());
+ .create_int("timestamp", measurement.timestamp().into_nanos())
+ .create_int("cpu_time", measurement.cpu_time().into_nanos())
+ .create_int("queue_time", measurement.queue_time().into_nanos());
self.previous_measurement = self.recent_measurement.take();
self.recent_measurement = Some(measurement);
}
fn write_recents_to(&self, node: &inspect::Node) {
if let Some(measurement) = &self.previous_measurement {
- node.record_int("previous_cpu_time", measurement.cpu_time.into_nanos());
- node.record_int("previous_queue_time", measurement.queue_time.into_nanos());
- node.record_int("previous_timestamp", measurement.timestamp.into_nanos());
+ node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
+ node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
+ node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
}
if let Some(measurement) = &self.recent_measurement {
- node.record_int("recent_cpu_time", measurement.cpu_time.into_nanos());
- node.record_int("recent_queue_time", measurement.queue_time.into_nanos());
- node.record_int("recent_timestamp", measurement.timestamp.into_nanos());
- }
- }
-}
-
-struct ComponentStats<T: RuntimeStatsSource> {
- tasks: Vec<TaskInfo<T>>,
- _task: Option<fasync::Task<()>>,
- is_measuring: bool,
-}
-
-impl<T: RuntimeStatsSource> ComponentStats<T> {
- /// Creates a new `ComponentStats` awaiting the component tasks not ready to take measurements
- /// yet.
- pub fn pending(task: fasync::Task<()>) -> Self {
- Self { tasks: Vec::new(), _task: Some(task), is_measuring: false }
- }
-
- /// Creates a new `ComponentStats` and starts taking measurements.
- pub fn ready(task: TaskInfo<T>) -> Self {
- Self { tasks: vec![task], is_measuring: true, _task: None }
- }
-
- /// Takes a runtime info measurement and records it. Drops old ones if the maximum amount is
- /// exceeded.
- pub fn measure(&mut self) -> Measurement {
- let mut result = Measurement::default();
- for task in self.tasks.iter_mut() {
- task.measure().map(|measurement| {
- result += measurement;
- });
- }
-
- result
- }
-
- /// A `ComponentStats` is alive when:
- /// - It has not started measuring yet: this means we are still waiting for the diagnostics
- /// data to arrive from the runner, or
- /// - Any of its tasks are alive.
- pub fn is_alive(&self) -> bool {
- !self.is_measuring || self.tasks.iter().any(|task| task.is_alive())
- }
-
- /// Removes all tasks that are not alive.
- pub fn clean_stale(&mut self) {
- self.tasks.retain(|task| task.is_alive());
- }
-
- /// Writes the stats to inspect under the given node. Returns the number of tasks that were
- /// written.
- pub fn write_inspect_to(&self, node: &inspect::Node) -> u64 {
- let mut task_count = 0;
- for task in &self.tasks {
- task.write_inspect_to(&node);
- task_count += 1;
- }
- task_count
- }
-
- #[cfg(test)]
- fn total_measurements(&self) -> usize {
- self.tasks.iter().map(|task| task.measurements.len()).sum()
- }
-}
-
-impl ComponentStats<DiagnosticsTask> {
- /// Start reading CPU stats.
- pub fn start_measuring(&mut self, tasks: ComponentTasks) {
- if let Some(task) = tasks.component_task.and_then(|task| TaskInfo::try_from(task).ok()) {
- self.tasks.push(task);
- self.measure();
- }
- // Still mark is_measuring as true, if we failed to convert to a TaskInfo it means the
- // component already died since we couldn't query its basic info so we should make this
- // true so at least we clean up.
- self.is_measuring = true;
- }
-}
-
-#[derive(Default)]
-struct Measurement {
- timestamp: zx::Time,
- cpu_time: zx::Duration,
- queue_time: zx::Duration,
-}
-
-impl Measurement {
- fn empty(timestamp: zx::Time) -> Self {
- Self {
- timestamp,
- cpu_time: zx::Duration::from_nanos(0),
- queue_time: zx::Duration::from_nanos(0),
- }
- }
-}
-
-impl AddAssign<&Measurement> for Measurement {
- fn add_assign(&mut self, other: &Measurement) {
- *self = Self {
- timestamp: self.timestamp,
- cpu_time: self.cpu_time + other.cpu_time,
- queue_time: self.queue_time + other.queue_time,
- };
- }
-}
-
-impl From<zx::TaskRuntimeInfo> for Measurement {
- fn from(info: zx::TaskRuntimeInfo) -> Self {
- Self {
- timestamp: zx::Time::get_monotonic(),
- cpu_time: zx::Duration::from_nanos(info.cpu_time),
- queue_time: zx::Duration::from_nanos(info.queue_time),
- }
- }
-}
-
-struct TaskInfo<T: RuntimeStatsSource> {
- koid: zx_sys::zx_koid_t,
- task: T,
- measurements: MeasurementsQueue,
- should_drop_old_measurements: bool,
- post_invalidation_measurements: usize,
-}
-
-impl<T: RuntimeStatsSource> TaskInfo<T> {
- /// Creates a new `TaskInfo` from the given cpu stats provider.
- // Due to https://github.com/rust-lang/rust/issues/50133 we cannot just derive TryFrom on a
- // generic type given a collision with the blanket implementation.
- pub fn try_from(task: T) -> Result<Self, zx::Status> {
- Ok(Self {
- koid: task.koid()?,
- task,
- measurements: MeasurementsQueue::new(),
- should_drop_old_measurements: false,
- post_invalidation_measurements: 0,
- })
- }
-
- /// Take a new measurement. If the handle of this task is invalid, then it keeps track of how
- /// many measurements would have been done. When the maximum amount allowed is hit, then it
- /// drops the oldest measurement.
- pub fn measure(&mut self) -> Option<&Measurement> {
- if self.task.handle_is_invalid() {
- if self.should_drop_old_measurements {
- self.measurements.pop_front();
- } else {
- self.post_invalidation_measurements += 1;
- self.should_drop_old_measurements = self.post_invalidation_measurements
- + self.measurements.len()
- >= COMPONENT_CPU_MAX_SAMPLES;
- }
- return None;
- }
- if let Ok(runtime_info) = self.task.get_runtime_info() {
- let measurement = runtime_info.into();
- self.measurements.insert(measurement);
- return self.measurements.back();
- }
- None
- }
-
- /// A task is alive when:
- /// - Its handle is valid, or
- /// - There's at least one measurement saved.
- pub fn is_alive(&self) -> bool {
- return !self.task.handle_is_invalid() || !self.measurements.is_empty();
- }
-
- /// Writes the task measurements under the given inspect node `parent`.
- pub fn write_inspect_to(&self, parent: &inspect::Node) {
- let node = parent.create_child(self.koid.to_string());
- let samples = node.create_child("@samples");
- for (i, measurement) in self.measurements.iter().enumerate() {
- let child = samples.create_child(i.to_string());
- child.record_int("timestamp", measurement.timestamp.into_nanos());
- child.record_int("cpu_time", measurement.cpu_time.into_nanos());
- child.record_int("queue_time", measurement.queue_time.into_nanos());
- samples.record(child);
- }
- node.record(samples);
- parent.record(node);
- }
-}
-
-struct MeasurementsQueue {
- values: VecDeque<Measurement>,
-}
-
-impl Deref for MeasurementsQueue {
- type Target = VecDeque<Measurement>;
- fn deref(&self) -> &Self::Target {
- &self.values
- }
-}
-
-impl DerefMut for MeasurementsQueue {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.values
- }
-}
-
-impl MeasurementsQueue {
- pub fn new() -> Self {
- Self { values: VecDeque::new() }
- }
-
- pub fn insert(&mut self, measurement: Measurement) {
- self.values.push_back(measurement);
- while self.values.len() > COMPONENT_CPU_MAX_SAMPLES {
- self.values.pop_front();
- }
- }
-}
-
-pub trait RuntimeStatsSource {
- /// The koid of the Cpu stats source.
- fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status>;
- /// Whether the handle backing up this source is invalid.
- fn handle_is_invalid(&self) -> bool;
- /// Provides the runtime info containing the stats.
- fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status>;
-}
-
-impl RuntimeStatsSource for DiagnosticsTask {
- fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
- let info = match &self {
- DiagnosticsTask::Job(job) => job.basic_info(),
- DiagnosticsTask::Process(process) => process.basic_info(),
- DiagnosticsTask::Thread(thread) => thread.basic_info(),
- TaskUnknown!() => {
- unreachable!("only jobs, threads and processes are tasks");
- }
- }?;
- Ok(info.koid.raw_koid())
- }
-
- fn handle_is_invalid(&self) -> bool {
- match &self {
- DiagnosticsTask::Job(job) => job.as_handle_ref().is_invalid(),
- DiagnosticsTask::Process(process) => process.as_handle_ref().is_invalid(),
- DiagnosticsTask::Thread(thread) => thread.as_handle_ref().is_invalid(),
- TaskUnknown!() => {
- unreachable!("only jobs, threads and processes are tasks");
- }
- }
- }
-
- fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
- match &self {
- DiagnosticsTask::Job(job) => job.get_runtime_info(),
- DiagnosticsTask::Process(process) => process.get_runtime_info(),
- DiagnosticsTask::Thread(thread) => thread.get_runtime_info(),
- TaskUnknown!() => {
- unreachable!("only jobs, threads and processes are tasks");
- }
+ node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
+ node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
+ node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
}
}
}
@@ -579,85 +320,22 @@
mod tests {
use {
super::*,
- crate::model::{
- actions::{ActionSet, StopAction},
- testing::{
- routing_test_helpers::RoutingTest,
- test_helpers::{component_decl_with_test_runner, ComponentDeclBuilder},
+ crate::{
+ diagnostics::testing::FakeTask,
+ model::{
+ actions::{ActionSet, StopAction},
+ testing::{
+ routing_test_helpers::RoutingTest,
+ test_helpers::{component_decl_with_test_runner, ComponentDeclBuilder},
+ },
},
},
diagnostics_hierarchy::DiagnosticsHierarchy,
fuchsia_inspect::testing::{assert_inspect_tree, AnyProperty},
+ fuchsia_zircon::AsHandleRef,
moniker::AbsoluteMoniker,
};
- #[derive(Default)]
- struct FakeTask {
- values: Arc<std::sync::Mutex<VecDeque<zx::TaskRuntimeInfo>>>,
- koid: zx_sys::zx_koid_t,
- invalid_handle: bool,
- }
-
- impl FakeTask {
- fn new(koid: zx_sys::zx_koid_t, values: Vec<zx::TaskRuntimeInfo>) -> Self {
- Self {
- koid,
- invalid_handle: false,
- values: Arc::new(std::sync::Mutex::new(values.into())),
- }
- }
- }
-
- impl RuntimeStatsSource for FakeTask {
- fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
- Ok(self.koid.clone())
- }
- fn handle_is_invalid(&self) -> bool {
- self.invalid_handle
- }
- fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
- Ok(self.values.lock().unwrap().pop_front().unwrap_or(zx::TaskRuntimeInfo::default()))
- }
- }
-
- #[fuchsia::test]
- async fn rotates_measurements_per_task() {
- // Set up test
- let mut task: TaskInfo<FakeTask> = TaskInfo::try_from(FakeTask::default()).unwrap();
- assert!(task.is_alive());
-
- // Take three measurements.
- task.measure();
- assert_eq!(task.measurements.len(), 1);
- task.measure();
- assert_eq!(task.measurements.len(), 2);
- task.measure();
- assert!(task.is_alive());
- assert_eq!(task.measurements.len(), 3);
-
- // Invalidate the handle
- task.task.invalid_handle = true;
-
- // Allow MAX-N (N=3 here) measurements to be taken until we start dropping.
- for i in 3..COMPONENT_CPU_MAX_SAMPLES {
- task.measure();
- assert_eq!(task.measurements.len(), 3);
- assert_eq!(task.post_invalidation_measurements, i - 2);
- }
-
- task.measure(); // 1 dropped, 2 left
- assert!(task.is_alive());
- assert_eq!(task.measurements.len(), 2);
- task.measure(); // 2 dropped, 1 left
- assert!(task.is_alive());
- assert_eq!(task.measurements.len(), 1);
-
- // Take one last measure.
- task.measure(); // 3 dropped, 0 left
- assert!(!task.is_alive());
- assert_eq!(task.measurements.len(), 0);
- }
-
#[fuchsia::test]
async fn components_are_deleted_when_all_tasks_are_gone() {
let inspector = inspect::Inspector::new();
@@ -680,8 +358,10 @@
);
// Invalidate the handle, to simulate that the component stopped.
- for task in stats.tree.lock().await.get(&moniker).unwrap().lock().await.tasks.iter_mut() {
- task.task.invalid_handle = true;
+ for task in
+ stats.tree.lock().await.get(&moniker).unwrap().lock().await.tasks_mut().iter_mut()
+ {
+ task.task_mut().invalid_handle = true;
}
for i in 0..COMPONENT_CPU_MAX_SAMPLES {
@@ -955,11 +635,19 @@
// Verify that after invalidating and an mmediate restart the data stays there if
// there was no measurement in between.
- for task in
- stats.tree.lock().await.get(&extended_moniker).unwrap().lock().await.tasks.iter_mut()
+ for task in stats
+ .tree
+ .lock()
+ .await
+ .get(&extended_moniker)
+ .unwrap()
+ .lock()
+ .await
+ .tasks_mut()
+ .iter_mut()
{
let job = zx::Job::from(zx::Handle::invalid());
- task.task = DiagnosticsTask::Job(job);
+ task.set_task(DiagnosticsTask::Job(job));
}
assert_inspect_tree!(test.builtin_environment.inspector, root: contains {
cpu_stats: contains {
diff --git a/src/sys/component_manager/src/diagnostics/constants.rs b/src/sys/component_manager/src/diagnostics/constants.rs
new file mode 100644
index 0000000..0004687
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/constants.rs
@@ -0,0 +1,8 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::time::Duration;
+
+pub const CPU_SAMPLE_PERIOD_SECONDS: Duration = Duration::from_secs(60);
+pub const COMPONENT_CPU_MAX_SAMPLES: usize = 60;
diff --git a/src/sys/component_manager/src/diagnostics/measurement.rs b/src/sys/component_manager/src/diagnostics/measurement.rs
new file mode 100644
index 0000000..2b1d024
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/measurement.rs
@@ -0,0 +1,102 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::diagnostics::constants::COMPONENT_CPU_MAX_SAMPLES,
+ fuchsia_inspect as inspect, fuchsia_zircon as zx,
+ std::{
+ collections::VecDeque,
+ ops::{AddAssign, Deref, DerefMut},
+ },
+};
+
+#[derive(Default)]
+pub struct Measurement {
+ timestamp: zx::Time,
+ cpu_time: zx::Duration,
+ queue_time: zx::Duration,
+}
+
+impl Measurement {
+ /// An empty measurement (zeros) at the given `timestamp`.
+ pub fn empty(timestamp: zx::Time) -> Self {
+ Self {
+ timestamp,
+ cpu_time: zx::Duration::from_nanos(0),
+ queue_time: zx::Duration::from_nanos(0),
+ }
+ }
+
+ /// Records the measurement data to the given inspect `node`.
+ pub fn record_to_node(&self, node: &inspect::Node) {
+ node.record_int("timestamp", self.timestamp.into_nanos());
+ node.record_int("cpu_time", self.cpu_time.into_nanos());
+ node.record_int("queue_time", self.queue_time.into_nanos());
+ }
+
+ /// The measured cpu time.
+ pub fn cpu_time(&self) -> &zx::Duration {
+ &self.cpu_time
+ }
+
+ /// The measured queue time.
+ pub fn queue_time(&self) -> &zx::Duration {
+ &self.queue_time
+ }
+
+ /// Time when the measurement was taken.
+ pub fn timestamp(&self) -> &zx::Time {
+ &self.timestamp
+ }
+}
+
+impl AddAssign<&Measurement> for Measurement {
+ fn add_assign(&mut self, other: &Measurement) {
+ *self = Self {
+ timestamp: self.timestamp,
+ cpu_time: self.cpu_time + other.cpu_time,
+ queue_time: self.queue_time + other.queue_time,
+ };
+ }
+}
+
+impl From<zx::TaskRuntimeInfo> for Measurement {
+ fn from(info: zx::TaskRuntimeInfo) -> Self {
+ Self {
+ timestamp: zx::Time::get_monotonic(),
+ cpu_time: zx::Duration::from_nanos(info.cpu_time),
+ queue_time: zx::Duration::from_nanos(info.queue_time),
+ }
+ }
+}
+
+pub struct MeasurementsQueue {
+ values: VecDeque<Measurement>,
+}
+
+impl Deref for MeasurementsQueue {
+ type Target = VecDeque<Measurement>;
+ fn deref(&self) -> &Self::Target {
+ &self.values
+ }
+}
+
+impl DerefMut for MeasurementsQueue {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.values
+ }
+}
+
+impl MeasurementsQueue {
+ pub fn new() -> Self {
+ Self { values: VecDeque::new() }
+ }
+
+ pub fn insert(&mut self, measurement: Measurement) {
+ self.values.push_back(measurement);
+ while self.values.len() > COMPONENT_CPU_MAX_SAMPLES {
+ self.values.pop_front();
+ }
+ }
+}
diff --git a/src/sys/component_manager/src/diagnostics/mod.rs b/src/sys/component_manager/src/diagnostics/mod.rs
new file mode 100644
index 0000000..2d83a81
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/mod.rs
@@ -0,0 +1,13 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub use crate::diagnostics::component_tree_stats::ComponentTreeStats;
+
+mod component_stats;
+mod component_tree_stats;
+mod constants;
+mod measurement;
+pub mod runtime_stats_source;
+mod task_info;
+mod testing;
diff --git a/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs b/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs
new file mode 100644
index 0000000..43ec3a3
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs
@@ -0,0 +1,57 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ async_trait::async_trait,
+ fidl_fuchsia_diagnostics_types::{Task as DiagnosticsTask, TaskUnknown},
+ fuchsia_zircon::{self as zx, AsHandleRef, Task},
+ fuchsia_zircon_sys as zx_sys,
+};
+
+#[async_trait]
+pub trait RuntimeStatsSource {
+ /// The koid of the Cpu stats source.
+ fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status>;
+ /// Whether the handle backing up this source is invalid.
+ fn handle_is_invalid(&self) -> bool;
+ /// Provides the runtime info containing the stats.
+ async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status>;
+}
+
+#[async_trait]
+impl RuntimeStatsSource for DiagnosticsTask {
+ fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
+ let info = match &self {
+ DiagnosticsTask::Job(job) => job.basic_info(),
+ DiagnosticsTask::Process(process) => process.basic_info(),
+ DiagnosticsTask::Thread(thread) => thread.basic_info(),
+ TaskUnknown!() => {
+ unreachable!("only jobs, threads and processes are tasks");
+ }
+ }?;
+ Ok(info.koid.raw_koid())
+ }
+
+ fn handle_is_invalid(&self) -> bool {
+ match &self {
+ DiagnosticsTask::Job(job) => job.as_handle_ref().is_invalid(),
+ DiagnosticsTask::Process(process) => process.as_handle_ref().is_invalid(),
+ DiagnosticsTask::Thread(thread) => thread.as_handle_ref().is_invalid(),
+ TaskUnknown!() => {
+ unreachable!("only jobs, threads and processes are tasks");
+ }
+ }
+ }
+
+ async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
+ match &self {
+ DiagnosticsTask::Job(job) => job.get_runtime_info(),
+ DiagnosticsTask::Process(process) => process.get_runtime_info(),
+ DiagnosticsTask::Thread(thread) => thread.get_runtime_info(),
+ TaskUnknown!() => {
+ unreachable!("only jobs, threads and processes are tasks");
+ }
+ }
+ }
+}
diff --git a/src/sys/component_manager/src/diagnostics/task_info.rs b/src/sys/component_manager/src/diagnostics/task_info.rs
new file mode 100644
index 0000000..6175b52
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/task_info.rs
@@ -0,0 +1,172 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+ crate::diagnostics::{
+ constants::COMPONENT_CPU_MAX_SAMPLES,
+ measurement::{Measurement, MeasurementsQueue},
+ runtime_stats_source::RuntimeStatsSource,
+ },
+ fuchsia_inspect as inspect, fuchsia_zircon as zx, fuchsia_zircon_sys as zx_sys,
+};
+
+pub struct TaskInfo<T: RuntimeStatsSource> {
+ koid: zx_sys::zx_koid_t,
+ task: T,
+ measurements: MeasurementsQueue,
+ should_drop_old_measurements: bool,
+ post_invalidation_measurements: usize,
+}
+
+impl<T: RuntimeStatsSource> TaskInfo<T> {
+ /// Creates a new `TaskInfo` from the given cpu stats provider.
+ // Due to https://github.com/rust-lang/rust/issues/50133 we cannot just derive TryFrom on a
+ // generic type given a collision with the blanket implementation.
+ pub fn try_from(task: T) -> Result<Self, zx::Status> {
+ Ok(Self {
+ koid: task.koid()?,
+ task,
+ measurements: MeasurementsQueue::new(),
+ should_drop_old_measurements: false,
+ post_invalidation_measurements: 0,
+ })
+ }
+
+ /// Take a new measurement. If the handle of this task is invalid, then it keeps track of how
+ /// many measurements would have been done. When the maximum amount allowed is hit, then it
+ /// drops the oldest measurement.
+ pub async fn measure(&mut self) -> Option<&Measurement> {
+ if self.task.handle_is_invalid() {
+ if self.should_drop_old_measurements {
+ self.measurements.pop_front();
+ } else {
+ self.post_invalidation_measurements += 1;
+ self.should_drop_old_measurements = self.post_invalidation_measurements
+ + self.measurements.len()
+ >= COMPONENT_CPU_MAX_SAMPLES;
+ }
+ return None;
+ }
+ if let Ok(runtime_info) = self.task.get_runtime_info().await {
+ let measurement = runtime_info.into();
+ self.measurements.insert(measurement);
+ return self.measurements.back();
+ }
+ None
+ }
+
+ /// A task is alive when:
+ /// - Its handle is valid, or
+ /// - There's at least one measurement saved.
+ pub fn is_alive(&self) -> bool {
+ return !self.task.handle_is_invalid() || !self.measurements.is_empty();
+ }
+
+ /// Writes the task measurements under the given inspect node `parent`.
+ pub fn record_to_node(&self, parent: &inspect::Node) {
+ let node = parent.create_child(self.koid.to_string());
+ let samples = node.create_child("@samples");
+ for (i, measurement) in self.measurements.iter().enumerate() {
+ let child = samples.create_child(i.to_string());
+ measurement.record_to_node(&child);
+ samples.record(child);
+ }
+ node.record(samples);
+ parent.record(node);
+ }
+
+ #[cfg(test)]
+ pub fn total_measurements(&self) -> usize {
+ self.measurements.len()
+ }
+
+ #[cfg(test)]
+ pub fn set_task(&mut self, task: T) {
+ self.task = task;
+ }
+
+ #[cfg(test)]
+ pub fn task_mut(&mut self) -> &mut T {
+ &mut self.task
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::diagnostics::testing::FakeTask;
+ use fuchsia_inspect::testing::{assert_inspect_tree, AnyProperty};
+
+ #[fuchsia::test]
+ async fn rotates_measurements_per_task() {
+ // Set up test
+ let mut task: TaskInfo<FakeTask> = TaskInfo::try_from(FakeTask::default()).unwrap();
+ assert!(task.is_alive());
+
+ // Take three measurements.
+ task.measure().await;
+ assert_eq!(task.measurements.len(), 1);
+ task.measure().await;
+ assert_eq!(task.measurements.len(), 2);
+ task.measure().await;
+ assert!(task.is_alive());
+ assert_eq!(task.measurements.len(), 3);
+
+ // Invalidate the handle
+ task.task.invalid_handle = true;
+
+ // Allow MAX-N (N=3 here) measurements to be taken until we start dropping.
+ for i in 3..COMPONENT_CPU_MAX_SAMPLES {
+ task.measure().await;
+ assert_eq!(task.measurements.len(), 3);
+ assert_eq!(task.post_invalidation_measurements, i - 2);
+ }
+
+ task.measure().await; // 1 dropped, 2 left
+ assert!(task.is_alive());
+ assert_eq!(task.measurements.len(), 2);
+ task.measure().await; // 2 dropped, 1 left
+ assert!(task.is_alive());
+ assert_eq!(task.measurements.len(), 1);
+
+ // Take one last measure.
+ task.measure().await; // 3 dropped, 0 left
+ assert!(!task.is_alive());
+ assert_eq!(task.measurements.len(), 0);
+ }
+
+ #[fuchsia::test]
+ async fn write_inspect() {
+ let mut task = TaskInfo::try_from(FakeTask::new(
+ 1,
+ vec![
+ zx::TaskRuntimeInfo { cpu_time: 2, queue_time: 4 },
+ zx::TaskRuntimeInfo { cpu_time: 6, queue_time: 8 },
+ ],
+ ))
+ .unwrap();
+
+ task.measure().await;
+ task.measure().await;
+
+ let inspector = inspect::Inspector::new();
+ task.record_to_node(inspector.root());
+ assert_inspect_tree!(inspector, root: {
+ "1": {
+ "@samples": {
+ "0": {
+ cpu_time: 2i64,
+ queue_time: 4i64,
+ timestamp: AnyProperty,
+ },
+ "1": {
+ cpu_time: 6i64,
+ queue_time: 8i64,
+ timestamp: AnyProperty,
+ }
+ }
+ }
+ });
+ }
+}
diff --git a/src/sys/component_manager/src/diagnostics/testing.rs b/src/sys/component_manager/src/diagnostics/testing.rs
new file mode 100644
index 0000000..80d50571
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/testing.rs
@@ -0,0 +1,39 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#![cfg(test)]
+
+use {
+ crate::diagnostics::runtime_stats_source::RuntimeStatsSource,
+ async_trait::async_trait,
+ fuchsia_zircon as zx, fuchsia_zircon_sys as zx_sys,
+ futures::lock::Mutex,
+ std::{collections::VecDeque, sync::Arc},
+};
+
+#[derive(Default)]
+pub struct FakeTask {
+ values: Arc<Mutex<VecDeque<zx::TaskRuntimeInfo>>>,
+ koid: zx_sys::zx_koid_t,
+ pub invalid_handle: bool,
+}
+
+impl FakeTask {
+ pub fn new(koid: zx_sys::zx_koid_t, values: Vec<zx::TaskRuntimeInfo>) -> Self {
+ Self { koid, invalid_handle: false, values: Arc::new(Mutex::new(values.into())) }
+ }
+}
+
+#[async_trait]
+impl RuntimeStatsSource for FakeTask {
+ fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
+ Ok(self.koid.clone())
+ }
+ fn handle_is_invalid(&self) -> bool {
+ self.invalid_handle
+ }
+ async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
+ Ok(self.values.lock().await.pop_front().unwrap_or(zx::TaskRuntimeInfo::default()))
+ }
+}
diff --git a/src/sys/component_manager/src/lib.rs b/src/sys/component_manager/src/lib.rs
index 23c8975..b83cf24 100644
--- a/src/sys/component_manager/src/lib.rs
+++ b/src/sys/component_manager/src/lib.rs
@@ -18,7 +18,6 @@
pub(crate) mod capability_ready_notifier;
pub(crate) mod channel;
-pub(crate) mod component_tree_stats;
pub(crate) mod framework;
pub(crate) mod fuchsia_boot_resolver;
pub(crate) mod fuchsia_pkg_resolver;
@@ -27,3 +26,4 @@
mod builtin;
mod constants;
+mod diagnostics;