[component_manager] Code organization and additional unit test for cpu stats

Bug: 56570
Tested: existing tests + new tests

Change-Id: I3325a1f8c7cbf9e133af2bf56572285d542a1217
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/507701
Commit-Queue: Miguel Flores <miguelfrde@google.com>
Fuchsia-Auto-Submit: Miguel Flores <miguelfrde@google.com>
Reviewed-by: Gary Bressler <geb@google.com>
diff --git a/src/sys/component_manager/BUILD.gn b/src/sys/component_manager/BUILD.gn
index 7a8b467..0b71664 100644
--- a/src/sys/component_manager/BUILD.gn
+++ b/src/sys/component_manager/BUILD.gn
@@ -112,9 +112,16 @@
     "src/capability.rs",
     "src/capability_ready_notifier.rs",
     "src/channel.rs",
-    "src/component_tree_stats.rs",
     "src/config.rs",
     "src/constants.rs",
+    "src/diagnostics/component_stats.rs",
+    "src/diagnostics/component_tree_stats.rs",
+    "src/diagnostics/constants.rs",
+    "src/diagnostics/measurement.rs",
+    "src/diagnostics/mod.rs",
+    "src/diagnostics/runtime_stats_source.rs",
+    "src/diagnostics/task_info.rs",
+    "src/diagnostics/testing.rs",
     "src/elf_runner/component.rs",
     "src/elf_runner/config.rs",
     "src/elf_runner/error.rs",
diff --git a/src/sys/component_manager/src/builtin_environment.rs b/src/sys/component_manager/src/builtin_environment.rs
index 3fb0f78..95c92cb 100644
--- a/src/sys/component_manager/src/builtin_environment.rs
+++ b/src/sys/component_manager/src/builtin_environment.rs
@@ -25,8 +25,8 @@
             vmex_resource::VmexResource,
         },
         capability_ready_notifier::CapabilityReadyNotifier,
-        component_tree_stats::ComponentTreeStats,
         config::RuntimeConfig,
+        diagnostics::ComponentTreeStats,
         elf_runner::ElfRunner,
         framework::RealmCapabilityHost,
         fuchsia_boot_resolver, fuchsia_pkg_resolver,
diff --git a/src/sys/component_manager/src/diagnostics/component_stats.rs b/src/sys/component_manager/src/diagnostics/component_stats.rs
new file mode 100644
index 0000000..a6597c4
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/component_stats.rs
@@ -0,0 +1,96 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::diagnostics::{
+        measurement::Measurement, runtime_stats_source::RuntimeStatsSource, task_info::TaskInfo,
+    },
+    fidl_fuchsia_diagnostics_types::{ComponentTasks, Task},
+    fuchsia_async as fasync, fuchsia_inspect as inspect,
+    futures::future::join_all,
+};
+
+pub struct ComponentStats<T: RuntimeStatsSource> {
+    tasks: Vec<TaskInfo<T>>,
+    _task: Option<fasync::Task<()>>,
+    is_measuring: bool,
+}
+
+impl<T: RuntimeStatsSource> ComponentStats<T> {
+    /// Creates a new `ComponentStats` awaiting the component tasks not ready to take measurements
+    /// yet.
+    pub fn pending(task: fasync::Task<()>) -> Self {
+        Self { tasks: Vec::new(), _task: Some(task), is_measuring: false }
+    }
+
+    /// Creates a new `ComponentStats` and starts taking measurements.
+    pub fn ready(task: TaskInfo<T>) -> Self {
+        Self { tasks: vec![task], is_measuring: true, _task: None }
+    }
+
+    /// Takes a runtime info measurement and records it. Drops old ones if the maximum amount is
+    /// exceeded.
+    pub async fn measure(&mut self) -> Measurement {
+        let measurements = join_all(self.tasks.iter_mut().map(|task| task.measure())).await;
+        let mut result = Measurement::default();
+        for measurement in measurements {
+            if let Some(m) = measurement {
+                result += m;
+            }
+        }
+        result
+    }
+
+    /// A `ComponentStats` is alive when:
+    /// - It has not started measuring yet: this means we are still waiting for the diagnostics
+    ///   data to arrive from the runner, or
+    /// - Any of its tasks are alive.
+    pub fn is_alive(&self) -> bool {
+        !self.is_measuring || self.tasks.iter().any(|task| task.is_alive())
+    }
+
+    /// Removes all tasks that are not alive.
+    pub fn clean_stale(&mut self) {
+        self.tasks.retain(|task| task.is_alive());
+    }
+
+    /// Writes the stats to inspect under the given node. Returns the number of tasks that were
+    /// written.
+    pub fn record_to_node(&self, node: &inspect::Node) -> u64 {
+        let mut task_count = 0;
+        for task in &self.tasks {
+            task.record_to_node(&node);
+            task_count += 1;
+        }
+        task_count
+    }
+
+    pub fn is_measuring(&self) -> bool {
+        self.is_measuring
+    }
+
+    #[cfg(test)]
+    pub fn tasks_mut(&mut self) -> &mut [TaskInfo<T>] {
+        &mut self.tasks
+    }
+
+    #[cfg(test)]
+    pub fn total_measurements(&self) -> usize {
+        self.tasks.iter().map(|task| task.total_measurements()).sum()
+    }
+}
+
+impl ComponentStats<Task> {
+    /// Start reading CPU stats.
+    pub async fn start_measuring(&mut self, tasks: ComponentTasks) {
+        if let Some(task) = tasks.component_task.and_then(|task| TaskInfo::try_from(task).ok()) {
+            self.tasks.push(task);
+            self.measure().await;
+        }
+        // Still mark is_measuring as true, if we failed to convert to a TaskInfo it means the
+        // component already died since we couldn't query its basic info so we should make this
+        // true so at least we clean up.
+        self.is_measuring = true;
+    }
+}
diff --git a/src/sys/component_manager/src/component_tree_stats.rs b/src/sys/component_manager/src/diagnostics/component_tree_stats.rs
similarity index 67%
rename from src/sys/component_manager/src/component_tree_stats.rs
rename to src/sys/component_manager/src/diagnostics/component_tree_stats.rs
index 105cff6..176e5a1 100644
--- a/src/sys/component_manager/src/component_tree_stats.rs
+++ b/src/sys/component_manager/src/diagnostics/component_tree_stats.rs
@@ -4,34 +4,30 @@
 
 use {
     crate::{
+        diagnostics::{
+            component_stats::ComponentStats, constants::*, measurement::Measurement,
+            runtime_stats_source::RuntimeStatsSource, task_info::TaskInfo,
+        },
         model::error::ModelError,
         model::hooks::{
             Event, EventPayload, EventType, HasEventType, Hook, HooksRegistration, RuntimeInfo,
         },
     },
     async_trait::async_trait,
-    fidl_fuchsia_diagnostics_types::{
-        ComponentDiagnostics, ComponentTasks, Task as DiagnosticsTask, TaskUnknown,
-    },
+    fidl_fuchsia_diagnostics_types::{ComponentDiagnostics, Task as DiagnosticsTask},
     fuchsia_async as fasync,
     fuchsia_inspect::{self as inspect, HistogramProperty},
     fuchsia_inspect_contrib::nodes::BoundedListNode,
-    fuchsia_zircon::{self as zx, AsHandleRef, HandleBased, Task},
-    fuchsia_zircon_sys as zx_sys,
+    fuchsia_zircon::{self as zx, HandleBased},
     futures::{lock::Mutex, FutureExt},
     log::warn,
     moniker::{AbsoluteMoniker, ExtendedMoniker},
     std::{
-        collections::{BTreeMap, VecDeque},
-        ops::{AddAssign, Deref, DerefMut},
+        collections::BTreeMap,
         sync::{Arc, Weak},
-        time::Duration,
     },
 };
 
-const CPU_SAMPLE_PERIOD_SECONDS: Duration = Duration::from_secs(60);
-const COMPONENT_CPU_MAX_SAMPLES: usize = 60;
-
 /// Provides stats for all components running in the system.
 pub struct ComponentTreeStats<T: RuntimeStatsSource> {
     /// Map from a moniker of a component running in the system to its stats.
@@ -50,7 +46,7 @@
     totals: Mutex<AggregatedStats>,
 }
 
-impl<T: 'static + RuntimeStatsSource + Send> ComponentTreeStats<T> {
+impl<T: 'static + RuntimeStatsSource + Send + Sync> ComponentTreeStats<T> {
     pub async fn new(node: inspect::Node) -> Arc<Self> {
         let processing_times = node.create_int_exponential_histogram(
             "processing_times_ns",
@@ -107,7 +103,7 @@
     async fn track_ready(&self, moniker: ExtendedMoniker, source: T) {
         if let Ok(task_info) = TaskInfo::try_from(source) {
             let mut stats = ComponentStats::ready(task_info);
-            stats.measure();
+            stats.measure().await;
             self.tree.lock().await.insert(moniker.clone(), Arc::new(Mutex::new(stats)));
         }
     }
@@ -136,7 +132,7 @@
         let mut task_count = 0;
         for (moniker, stats) in self.tree.lock().await.iter() {
             let stats_guard = stats.lock().await;
-            if stats_guard.is_measuring {
+            if stats_guard.is_measuring() {
                 // TODO(fxbug.dev/73169): unify diagnostics and component manager monikers.
                 let key = match moniker {
                     ExtendedMoniker::ComponentManager => moniker.to_string(),
@@ -149,7 +145,7 @@
                     }
                 };
                 let child = node.create_child(key);
-                task_count += stats_guard.write_inspect_to(&child);
+                task_count += stats_guard.record_to_node(&child);
                 node.record(child);
             }
         }
@@ -166,7 +162,7 @@
         let mut to_remove = Vec::new();
         for (moniker, stat) in stats.into_iter() {
             let mut stat_guard = stat.lock().await;
-            aggregated += &stat_guard.measure();
+            aggregated += &stat_guard.measure().await;
             stat_guard.clean_stale();
             if !stat_guard.is_alive() {
                 to_remove.push(moniker);
@@ -247,7 +243,7 @@
                 if let Some(ComponentDiagnostics { tasks: Some(tasks), .. }) = receiver.await.ok() {
                     if let Some(this) = weak_self.upgrade() {
                         if let Some(stats) = this.tree.lock().await.get_mut(&moniker_for_fut) {
-                            stats.lock().await.start_measuring(tasks);
+                            stats.lock().await.start_measuring(tasks).await;
                         }
                     }
                 }
@@ -299,278 +295,23 @@
         // automatically a timestamp for the entry.
         let mut node_writer = self.node.create_entry();
         node_writer
-            .create_int("timestamp", measurement.timestamp.into_nanos())
-            .create_int("cpu_time", measurement.cpu_time.into_nanos())
-            .create_int("queue_time", measurement.queue_time.into_nanos());
+            .create_int("timestamp", measurement.timestamp().into_nanos())
+            .create_int("cpu_time", measurement.cpu_time().into_nanos())
+            .create_int("queue_time", measurement.queue_time().into_nanos());
         self.previous_measurement = self.recent_measurement.take();
         self.recent_measurement = Some(measurement);
     }
 
     fn write_recents_to(&self, node: &inspect::Node) {
         if let Some(measurement) = &self.previous_measurement {
-            node.record_int("previous_cpu_time", measurement.cpu_time.into_nanos());
-            node.record_int("previous_queue_time", measurement.queue_time.into_nanos());
-            node.record_int("previous_timestamp", measurement.timestamp.into_nanos());
+            node.record_int("previous_cpu_time", measurement.cpu_time().into_nanos());
+            node.record_int("previous_queue_time", measurement.queue_time().into_nanos());
+            node.record_int("previous_timestamp", measurement.timestamp().into_nanos());
         }
         if let Some(measurement) = &self.recent_measurement {
-            node.record_int("recent_cpu_time", measurement.cpu_time.into_nanos());
-            node.record_int("recent_queue_time", measurement.queue_time.into_nanos());
-            node.record_int("recent_timestamp", measurement.timestamp.into_nanos());
-        }
-    }
-}
-
-struct ComponentStats<T: RuntimeStatsSource> {
-    tasks: Vec<TaskInfo<T>>,
-    _task: Option<fasync::Task<()>>,
-    is_measuring: bool,
-}
-
-impl<T: RuntimeStatsSource> ComponentStats<T> {
-    /// Creates a new `ComponentStats` awaiting the component tasks not ready to take measurements
-    /// yet.
-    pub fn pending(task: fasync::Task<()>) -> Self {
-        Self { tasks: Vec::new(), _task: Some(task), is_measuring: false }
-    }
-
-    /// Creates a new `ComponentStats` and starts taking measurements.
-    pub fn ready(task: TaskInfo<T>) -> Self {
-        Self { tasks: vec![task], is_measuring: true, _task: None }
-    }
-
-    /// Takes a runtime info measurement and records it. Drops old ones if the maximum amount is
-    /// exceeded.
-    pub fn measure(&mut self) -> Measurement {
-        let mut result = Measurement::default();
-        for task in self.tasks.iter_mut() {
-            task.measure().map(|measurement| {
-                result += measurement;
-            });
-        }
-
-        result
-    }
-
-    /// A `ComponentStats` is alive when:
-    /// - It has not started measuring yet: this means we are still waiting for the diagnostics
-    ///   data to arrive from the runner, or
-    /// - Any of its tasks are alive.
-    pub fn is_alive(&self) -> bool {
-        !self.is_measuring || self.tasks.iter().any(|task| task.is_alive())
-    }
-
-    /// Removes all tasks that are not alive.
-    pub fn clean_stale(&mut self) {
-        self.tasks.retain(|task| task.is_alive());
-    }
-
-    /// Writes the stats to inspect under the given node. Returns the number of tasks that were
-    /// written.
-    pub fn write_inspect_to(&self, node: &inspect::Node) -> u64 {
-        let mut task_count = 0;
-        for task in &self.tasks {
-            task.write_inspect_to(&node);
-            task_count += 1;
-        }
-        task_count
-    }
-
-    #[cfg(test)]
-    fn total_measurements(&self) -> usize {
-        self.tasks.iter().map(|task| task.measurements.len()).sum()
-    }
-}
-
-impl ComponentStats<DiagnosticsTask> {
-    /// Start reading CPU stats.
-    pub fn start_measuring(&mut self, tasks: ComponentTasks) {
-        if let Some(task) = tasks.component_task.and_then(|task| TaskInfo::try_from(task).ok()) {
-            self.tasks.push(task);
-            self.measure();
-        }
-        // Still mark is_measuring as true, if we failed to convert to a TaskInfo it means the
-        // component already died since we couldn't query its basic info so we should make this
-        // true so at least we clean up.
-        self.is_measuring = true;
-    }
-}
-
-#[derive(Default)]
-struct Measurement {
-    timestamp: zx::Time,
-    cpu_time: zx::Duration,
-    queue_time: zx::Duration,
-}
-
-impl Measurement {
-    fn empty(timestamp: zx::Time) -> Self {
-        Self {
-            timestamp,
-            cpu_time: zx::Duration::from_nanos(0),
-            queue_time: zx::Duration::from_nanos(0),
-        }
-    }
-}
-
-impl AddAssign<&Measurement> for Measurement {
-    fn add_assign(&mut self, other: &Measurement) {
-        *self = Self {
-            timestamp: self.timestamp,
-            cpu_time: self.cpu_time + other.cpu_time,
-            queue_time: self.queue_time + other.queue_time,
-        };
-    }
-}
-
-impl From<zx::TaskRuntimeInfo> for Measurement {
-    fn from(info: zx::TaskRuntimeInfo) -> Self {
-        Self {
-            timestamp: zx::Time::get_monotonic(),
-            cpu_time: zx::Duration::from_nanos(info.cpu_time),
-            queue_time: zx::Duration::from_nanos(info.queue_time),
-        }
-    }
-}
-
-struct TaskInfo<T: RuntimeStatsSource> {
-    koid: zx_sys::zx_koid_t,
-    task: T,
-    measurements: MeasurementsQueue,
-    should_drop_old_measurements: bool,
-    post_invalidation_measurements: usize,
-}
-
-impl<T: RuntimeStatsSource> TaskInfo<T> {
-    /// Creates a new `TaskInfo` from the given cpu stats provider.
-    // Due to https://github.com/rust-lang/rust/issues/50133 we cannot just derive TryFrom on a
-    // generic type given a collision with the blanket implementation.
-    pub fn try_from(task: T) -> Result<Self, zx::Status> {
-        Ok(Self {
-            koid: task.koid()?,
-            task,
-            measurements: MeasurementsQueue::new(),
-            should_drop_old_measurements: false,
-            post_invalidation_measurements: 0,
-        })
-    }
-
-    /// Take a new measurement. If the handle of this task is invalid, then it keeps track of how
-    /// many measurements would have been done. When the maximum amount allowed is hit, then it
-    /// drops the oldest measurement.
-    pub fn measure(&mut self) -> Option<&Measurement> {
-        if self.task.handle_is_invalid() {
-            if self.should_drop_old_measurements {
-                self.measurements.pop_front();
-            } else {
-                self.post_invalidation_measurements += 1;
-                self.should_drop_old_measurements = self.post_invalidation_measurements
-                    + self.measurements.len()
-                    >= COMPONENT_CPU_MAX_SAMPLES;
-            }
-            return None;
-        }
-        if let Ok(runtime_info) = self.task.get_runtime_info() {
-            let measurement = runtime_info.into();
-            self.measurements.insert(measurement);
-            return self.measurements.back();
-        }
-        None
-    }
-
-    /// A task is alive when:
-    /// - Its handle is valid, or
-    /// - There's at least one measurement saved.
-    pub fn is_alive(&self) -> bool {
-        return !self.task.handle_is_invalid() || !self.measurements.is_empty();
-    }
-
-    /// Writes the task measurements under the given inspect node `parent`.
-    pub fn write_inspect_to(&self, parent: &inspect::Node) {
-        let node = parent.create_child(self.koid.to_string());
-        let samples = node.create_child("@samples");
-        for (i, measurement) in self.measurements.iter().enumerate() {
-            let child = samples.create_child(i.to_string());
-            child.record_int("timestamp", measurement.timestamp.into_nanos());
-            child.record_int("cpu_time", measurement.cpu_time.into_nanos());
-            child.record_int("queue_time", measurement.queue_time.into_nanos());
-            samples.record(child);
-        }
-        node.record(samples);
-        parent.record(node);
-    }
-}
-
-struct MeasurementsQueue {
-    values: VecDeque<Measurement>,
-}
-
-impl Deref for MeasurementsQueue {
-    type Target = VecDeque<Measurement>;
-    fn deref(&self) -> &Self::Target {
-        &self.values
-    }
-}
-
-impl DerefMut for MeasurementsQueue {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.values
-    }
-}
-
-impl MeasurementsQueue {
-    pub fn new() -> Self {
-        Self { values: VecDeque::new() }
-    }
-
-    pub fn insert(&mut self, measurement: Measurement) {
-        self.values.push_back(measurement);
-        while self.values.len() > COMPONENT_CPU_MAX_SAMPLES {
-            self.values.pop_front();
-        }
-    }
-}
-
-pub trait RuntimeStatsSource {
-    /// The koid of the Cpu stats source.
-    fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status>;
-    /// Whether the handle backing up this source is invalid.
-    fn handle_is_invalid(&self) -> bool;
-    /// Provides the runtime info containing the stats.
-    fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status>;
-}
-
-impl RuntimeStatsSource for DiagnosticsTask {
-    fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
-        let info = match &self {
-            DiagnosticsTask::Job(job) => job.basic_info(),
-            DiagnosticsTask::Process(process) => process.basic_info(),
-            DiagnosticsTask::Thread(thread) => thread.basic_info(),
-            TaskUnknown!() => {
-                unreachable!("only jobs, threads and processes are tasks");
-            }
-        }?;
-        Ok(info.koid.raw_koid())
-    }
-
-    fn handle_is_invalid(&self) -> bool {
-        match &self {
-            DiagnosticsTask::Job(job) => job.as_handle_ref().is_invalid(),
-            DiagnosticsTask::Process(process) => process.as_handle_ref().is_invalid(),
-            DiagnosticsTask::Thread(thread) => thread.as_handle_ref().is_invalid(),
-            TaskUnknown!() => {
-                unreachable!("only jobs, threads and processes are tasks");
-            }
-        }
-    }
-
-    fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
-        match &self {
-            DiagnosticsTask::Job(job) => job.get_runtime_info(),
-            DiagnosticsTask::Process(process) => process.get_runtime_info(),
-            DiagnosticsTask::Thread(thread) => thread.get_runtime_info(),
-            TaskUnknown!() => {
-                unreachable!("only jobs, threads and processes are tasks");
-            }
+            node.record_int("recent_cpu_time", measurement.cpu_time().into_nanos());
+            node.record_int("recent_queue_time", measurement.queue_time().into_nanos());
+            node.record_int("recent_timestamp", measurement.timestamp().into_nanos());
         }
     }
 }
@@ -579,85 +320,22 @@
 mod tests {
     use {
         super::*,
-        crate::model::{
-            actions::{ActionSet, StopAction},
-            testing::{
-                routing_test_helpers::RoutingTest,
-                test_helpers::{component_decl_with_test_runner, ComponentDeclBuilder},
+        crate::{
+            diagnostics::testing::FakeTask,
+            model::{
+                actions::{ActionSet, StopAction},
+                testing::{
+                    routing_test_helpers::RoutingTest,
+                    test_helpers::{component_decl_with_test_runner, ComponentDeclBuilder},
+                },
             },
         },
         diagnostics_hierarchy::DiagnosticsHierarchy,
         fuchsia_inspect::testing::{assert_inspect_tree, AnyProperty},
+        fuchsia_zircon::AsHandleRef,
         moniker::AbsoluteMoniker,
     };
 
-    #[derive(Default)]
-    struct FakeTask {
-        values: Arc<std::sync::Mutex<VecDeque<zx::TaskRuntimeInfo>>>,
-        koid: zx_sys::zx_koid_t,
-        invalid_handle: bool,
-    }
-
-    impl FakeTask {
-        fn new(koid: zx_sys::zx_koid_t, values: Vec<zx::TaskRuntimeInfo>) -> Self {
-            Self {
-                koid,
-                invalid_handle: false,
-                values: Arc::new(std::sync::Mutex::new(values.into())),
-            }
-        }
-    }
-
-    impl RuntimeStatsSource for FakeTask {
-        fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
-            Ok(self.koid.clone())
-        }
-        fn handle_is_invalid(&self) -> bool {
-            self.invalid_handle
-        }
-        fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
-            Ok(self.values.lock().unwrap().pop_front().unwrap_or(zx::TaskRuntimeInfo::default()))
-        }
-    }
-
-    #[fuchsia::test]
-    async fn rotates_measurements_per_task() {
-        // Set up test
-        let mut task: TaskInfo<FakeTask> = TaskInfo::try_from(FakeTask::default()).unwrap();
-        assert!(task.is_alive());
-
-        // Take three measurements.
-        task.measure();
-        assert_eq!(task.measurements.len(), 1);
-        task.measure();
-        assert_eq!(task.measurements.len(), 2);
-        task.measure();
-        assert!(task.is_alive());
-        assert_eq!(task.measurements.len(), 3);
-
-        // Invalidate the handle
-        task.task.invalid_handle = true;
-
-        // Allow MAX-N (N=3 here) measurements to be taken until we start dropping.
-        for i in 3..COMPONENT_CPU_MAX_SAMPLES {
-            task.measure();
-            assert_eq!(task.measurements.len(), 3);
-            assert_eq!(task.post_invalidation_measurements, i - 2);
-        }
-
-        task.measure(); // 1 dropped, 2 left
-        assert!(task.is_alive());
-        assert_eq!(task.measurements.len(), 2);
-        task.measure(); // 2 dropped, 1 left
-        assert!(task.is_alive());
-        assert_eq!(task.measurements.len(), 1);
-
-        // Take one last measure.
-        task.measure(); // 3 dropped, 0 left
-        assert!(!task.is_alive());
-        assert_eq!(task.measurements.len(), 0);
-    }
-
     #[fuchsia::test]
     async fn components_are_deleted_when_all_tasks_are_gone() {
         let inspector = inspect::Inspector::new();
@@ -680,8 +358,10 @@
         );
 
         // Invalidate the handle, to simulate that the component stopped.
-        for task in stats.tree.lock().await.get(&moniker).unwrap().lock().await.tasks.iter_mut() {
-            task.task.invalid_handle = true;
+        for task in
+            stats.tree.lock().await.get(&moniker).unwrap().lock().await.tasks_mut().iter_mut()
+        {
+            task.task_mut().invalid_handle = true;
         }
 
         for i in 0..COMPONENT_CPU_MAX_SAMPLES {
@@ -955,11 +635,19 @@
 
         // Verify that after invalidating and an mmediate restart the data stays there if
         // there was no measurement in between.
-        for task in
-            stats.tree.lock().await.get(&extended_moniker).unwrap().lock().await.tasks.iter_mut()
+        for task in stats
+            .tree
+            .lock()
+            .await
+            .get(&extended_moniker)
+            .unwrap()
+            .lock()
+            .await
+            .tasks_mut()
+            .iter_mut()
         {
             let job = zx::Job::from(zx::Handle::invalid());
-            task.task = DiagnosticsTask::Job(job);
+            task.set_task(DiagnosticsTask::Job(job));
         }
         assert_inspect_tree!(test.builtin_environment.inspector, root: contains {
             cpu_stats: contains {
diff --git a/src/sys/component_manager/src/diagnostics/constants.rs b/src/sys/component_manager/src/diagnostics/constants.rs
new file mode 100644
index 0000000..0004687
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/constants.rs
@@ -0,0 +1,8 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::time::Duration;
+
+pub const CPU_SAMPLE_PERIOD_SECONDS: Duration = Duration::from_secs(60);
+pub const COMPONENT_CPU_MAX_SAMPLES: usize = 60;
diff --git a/src/sys/component_manager/src/diagnostics/measurement.rs b/src/sys/component_manager/src/diagnostics/measurement.rs
new file mode 100644
index 0000000..2b1d024
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/measurement.rs
@@ -0,0 +1,102 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::diagnostics::constants::COMPONENT_CPU_MAX_SAMPLES,
+    fuchsia_inspect as inspect, fuchsia_zircon as zx,
+    std::{
+        collections::VecDeque,
+        ops::{AddAssign, Deref, DerefMut},
+    },
+};
+
+#[derive(Default)]
+pub struct Measurement {
+    timestamp: zx::Time,
+    cpu_time: zx::Duration,
+    queue_time: zx::Duration,
+}
+
+impl Measurement {
+    /// An empty measurement (zeros) at the given `timestamp`.
+    pub fn empty(timestamp: zx::Time) -> Self {
+        Self {
+            timestamp,
+            cpu_time: zx::Duration::from_nanos(0),
+            queue_time: zx::Duration::from_nanos(0),
+        }
+    }
+
+    /// Records the measurement data to the given inspect `node`.
+    pub fn record_to_node(&self, node: &inspect::Node) {
+        node.record_int("timestamp", self.timestamp.into_nanos());
+        node.record_int("cpu_time", self.cpu_time.into_nanos());
+        node.record_int("queue_time", self.queue_time.into_nanos());
+    }
+
+    /// The measured cpu time.
+    pub fn cpu_time(&self) -> &zx::Duration {
+        &self.cpu_time
+    }
+
+    /// The measured queue time.
+    pub fn queue_time(&self) -> &zx::Duration {
+        &self.queue_time
+    }
+
+    /// Time when the measurement was taken.
+    pub fn timestamp(&self) -> &zx::Time {
+        &self.timestamp
+    }
+}
+
+impl AddAssign<&Measurement> for Measurement {
+    fn add_assign(&mut self, other: &Measurement) {
+        *self = Self {
+            timestamp: self.timestamp,
+            cpu_time: self.cpu_time + other.cpu_time,
+            queue_time: self.queue_time + other.queue_time,
+        };
+    }
+}
+
+impl From<zx::TaskRuntimeInfo> for Measurement {
+    fn from(info: zx::TaskRuntimeInfo) -> Self {
+        Self {
+            timestamp: zx::Time::get_monotonic(),
+            cpu_time: zx::Duration::from_nanos(info.cpu_time),
+            queue_time: zx::Duration::from_nanos(info.queue_time),
+        }
+    }
+}
+
+pub struct MeasurementsQueue {
+    values: VecDeque<Measurement>,
+}
+
+impl Deref for MeasurementsQueue {
+    type Target = VecDeque<Measurement>;
+    fn deref(&self) -> &Self::Target {
+        &self.values
+    }
+}
+
+impl DerefMut for MeasurementsQueue {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.values
+    }
+}
+
+impl MeasurementsQueue {
+    pub fn new() -> Self {
+        Self { values: VecDeque::new() }
+    }
+
+    pub fn insert(&mut self, measurement: Measurement) {
+        self.values.push_back(measurement);
+        while self.values.len() > COMPONENT_CPU_MAX_SAMPLES {
+            self.values.pop_front();
+        }
+    }
+}
diff --git a/src/sys/component_manager/src/diagnostics/mod.rs b/src/sys/component_manager/src/diagnostics/mod.rs
new file mode 100644
index 0000000..2d83a81
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/mod.rs
@@ -0,0 +1,13 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+pub use crate::diagnostics::component_tree_stats::ComponentTreeStats;
+
+mod component_stats;
+mod component_tree_stats;
+mod constants;
+mod measurement;
+pub mod runtime_stats_source;
+mod task_info;
+mod testing;
diff --git a/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs b/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs
new file mode 100644
index 0000000..43ec3a3
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/runtime_stats_source.rs
@@ -0,0 +1,57 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    async_trait::async_trait,
+    fidl_fuchsia_diagnostics_types::{Task as DiagnosticsTask, TaskUnknown},
+    fuchsia_zircon::{self as zx, AsHandleRef, Task},
+    fuchsia_zircon_sys as zx_sys,
+};
+
+#[async_trait]
+pub trait RuntimeStatsSource {
+    /// The koid of the Cpu stats source.
+    fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status>;
+    /// Whether the handle backing up this source is invalid.
+    fn handle_is_invalid(&self) -> bool;
+    /// Provides the runtime info containing the stats.
+    async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status>;
+}
+
+#[async_trait]
+impl RuntimeStatsSource for DiagnosticsTask {
+    fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
+        let info = match &self {
+            DiagnosticsTask::Job(job) => job.basic_info(),
+            DiagnosticsTask::Process(process) => process.basic_info(),
+            DiagnosticsTask::Thread(thread) => thread.basic_info(),
+            TaskUnknown!() => {
+                unreachable!("only jobs, threads and processes are tasks");
+            }
+        }?;
+        Ok(info.koid.raw_koid())
+    }
+
+    fn handle_is_invalid(&self) -> bool {
+        match &self {
+            DiagnosticsTask::Job(job) => job.as_handle_ref().is_invalid(),
+            DiagnosticsTask::Process(process) => process.as_handle_ref().is_invalid(),
+            DiagnosticsTask::Thread(thread) => thread.as_handle_ref().is_invalid(),
+            TaskUnknown!() => {
+                unreachable!("only jobs, threads and processes are tasks");
+            }
+        }
+    }
+
+    async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
+        match &self {
+            DiagnosticsTask::Job(job) => job.get_runtime_info(),
+            DiagnosticsTask::Process(process) => process.get_runtime_info(),
+            DiagnosticsTask::Thread(thread) => thread.get_runtime_info(),
+            TaskUnknown!() => {
+                unreachable!("only jobs, threads and processes are tasks");
+            }
+        }
+    }
+}
diff --git a/src/sys/component_manager/src/diagnostics/task_info.rs b/src/sys/component_manager/src/diagnostics/task_info.rs
new file mode 100644
index 0000000..6175b52
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/task_info.rs
@@ -0,0 +1,172 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use {
+    crate::diagnostics::{
+        constants::COMPONENT_CPU_MAX_SAMPLES,
+        measurement::{Measurement, MeasurementsQueue},
+        runtime_stats_source::RuntimeStatsSource,
+    },
+    fuchsia_inspect as inspect, fuchsia_zircon as zx, fuchsia_zircon_sys as zx_sys,
+};
+
+pub struct TaskInfo<T: RuntimeStatsSource> {
+    koid: zx_sys::zx_koid_t,
+    task: T,
+    measurements: MeasurementsQueue,
+    should_drop_old_measurements: bool,
+    post_invalidation_measurements: usize,
+}
+
+impl<T: RuntimeStatsSource> TaskInfo<T> {
+    /// Creates a new `TaskInfo` from the given cpu stats provider.
+    // Due to https://github.com/rust-lang/rust/issues/50133 we cannot just derive TryFrom on a
+    // generic type given a collision with the blanket implementation.
+    pub fn try_from(task: T) -> Result<Self, zx::Status> {
+        Ok(Self {
+            koid: task.koid()?,
+            task,
+            measurements: MeasurementsQueue::new(),
+            should_drop_old_measurements: false,
+            post_invalidation_measurements: 0,
+        })
+    }
+
+    /// Take a new measurement. If the handle of this task is invalid, then it keeps track of how
+    /// many measurements would have been done. When the maximum amount allowed is hit, then it
+    /// drops the oldest measurement.
+    pub async fn measure(&mut self) -> Option<&Measurement> {
+        if self.task.handle_is_invalid() {
+            if self.should_drop_old_measurements {
+                self.measurements.pop_front();
+            } else {
+                self.post_invalidation_measurements += 1;
+                self.should_drop_old_measurements = self.post_invalidation_measurements
+                    + self.measurements.len()
+                    >= COMPONENT_CPU_MAX_SAMPLES;
+            }
+            return None;
+        }
+        if let Ok(runtime_info) = self.task.get_runtime_info().await {
+            let measurement = runtime_info.into();
+            self.measurements.insert(measurement);
+            return self.measurements.back();
+        }
+        None
+    }
+
+    /// A task is alive when:
+    /// - Its handle is valid, or
+    /// - There's at least one measurement saved.
+    pub fn is_alive(&self) -> bool {
+        return !self.task.handle_is_invalid() || !self.measurements.is_empty();
+    }
+
+    /// Writes the task measurements under the given inspect node `parent`.
+    pub fn record_to_node(&self, parent: &inspect::Node) {
+        let node = parent.create_child(self.koid.to_string());
+        let samples = node.create_child("@samples");
+        for (i, measurement) in self.measurements.iter().enumerate() {
+            let child = samples.create_child(i.to_string());
+            measurement.record_to_node(&child);
+            samples.record(child);
+        }
+        node.record(samples);
+        parent.record(node);
+    }
+
+    #[cfg(test)]
+    pub fn total_measurements(&self) -> usize {
+        self.measurements.len()
+    }
+
+    #[cfg(test)]
+    pub fn set_task(&mut self, task: T) {
+        self.task = task;
+    }
+
+    #[cfg(test)]
+    pub fn task_mut(&mut self) -> &mut T {
+        &mut self.task
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::diagnostics::testing::FakeTask;
+    use fuchsia_inspect::testing::{assert_inspect_tree, AnyProperty};
+
+    #[fuchsia::test]
+    async fn rotates_measurements_per_task() {
+        // Set up test
+        let mut task: TaskInfo<FakeTask> = TaskInfo::try_from(FakeTask::default()).unwrap();
+        assert!(task.is_alive());
+
+        // Take three measurements.
+        task.measure().await;
+        assert_eq!(task.measurements.len(), 1);
+        task.measure().await;
+        assert_eq!(task.measurements.len(), 2);
+        task.measure().await;
+        assert!(task.is_alive());
+        assert_eq!(task.measurements.len(), 3);
+
+        // Invalidate the handle
+        task.task.invalid_handle = true;
+
+        // Allow MAX-N (N=3 here) measurements to be taken until we start dropping.
+        for i in 3..COMPONENT_CPU_MAX_SAMPLES {
+            task.measure().await;
+            assert_eq!(task.measurements.len(), 3);
+            assert_eq!(task.post_invalidation_measurements, i - 2);
+        }
+
+        task.measure().await; // 1 dropped, 2 left
+        assert!(task.is_alive());
+        assert_eq!(task.measurements.len(), 2);
+        task.measure().await; // 2 dropped, 1 left
+        assert!(task.is_alive());
+        assert_eq!(task.measurements.len(), 1);
+
+        // Take one last measure.
+        task.measure().await; // 3 dropped, 0 left
+        assert!(!task.is_alive());
+        assert_eq!(task.measurements.len(), 0);
+    }
+
+    #[fuchsia::test]
+    async fn write_inspect() {
+        let mut task = TaskInfo::try_from(FakeTask::new(
+            1,
+            vec![
+                zx::TaskRuntimeInfo { cpu_time: 2, queue_time: 4 },
+                zx::TaskRuntimeInfo { cpu_time: 6, queue_time: 8 },
+            ],
+        ))
+        .unwrap();
+
+        task.measure().await;
+        task.measure().await;
+
+        let inspector = inspect::Inspector::new();
+        task.record_to_node(inspector.root());
+        assert_inspect_tree!(inspector, root: {
+            "1": {
+                "@samples": {
+                    "0": {
+                        cpu_time: 2i64,
+                        queue_time: 4i64,
+                        timestamp: AnyProperty,
+                    },
+                    "1": {
+                        cpu_time: 6i64,
+                        queue_time: 8i64,
+                        timestamp: AnyProperty,
+                    }
+                }
+            }
+        });
+    }
+}
diff --git a/src/sys/component_manager/src/diagnostics/testing.rs b/src/sys/component_manager/src/diagnostics/testing.rs
new file mode 100644
index 0000000..80d50571
--- /dev/null
+++ b/src/sys/component_manager/src/diagnostics/testing.rs
@@ -0,0 +1,39 @@
+// Copyright 2021 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#![cfg(test)]
+
+use {
+    crate::diagnostics::runtime_stats_source::RuntimeStatsSource,
+    async_trait::async_trait,
+    fuchsia_zircon as zx, fuchsia_zircon_sys as zx_sys,
+    futures::lock::Mutex,
+    std::{collections::VecDeque, sync::Arc},
+};
+
+#[derive(Default)]
+pub struct FakeTask {
+    values: Arc<Mutex<VecDeque<zx::TaskRuntimeInfo>>>,
+    koid: zx_sys::zx_koid_t,
+    pub invalid_handle: bool,
+}
+
+impl FakeTask {
+    pub fn new(koid: zx_sys::zx_koid_t, values: Vec<zx::TaskRuntimeInfo>) -> Self {
+        Self { koid, invalid_handle: false, values: Arc::new(Mutex::new(values.into())) }
+    }
+}
+
+#[async_trait]
+impl RuntimeStatsSource for FakeTask {
+    fn koid(&self) -> Result<zx_sys::zx_koid_t, zx::Status> {
+        Ok(self.koid.clone())
+    }
+    fn handle_is_invalid(&self) -> bool {
+        self.invalid_handle
+    }
+    async fn get_runtime_info(&self) -> Result<zx::TaskRuntimeInfo, zx::Status> {
+        Ok(self.values.lock().await.pop_front().unwrap_or(zx::TaskRuntimeInfo::default()))
+    }
+}
diff --git a/src/sys/component_manager/src/lib.rs b/src/sys/component_manager/src/lib.rs
index 23c8975..b83cf24 100644
--- a/src/sys/component_manager/src/lib.rs
+++ b/src/sys/component_manager/src/lib.rs
@@ -18,7 +18,6 @@
 
 pub(crate) mod capability_ready_notifier;
 pub(crate) mod channel;
-pub(crate) mod component_tree_stats;
 pub(crate) mod framework;
 pub(crate) mod fuchsia_boot_resolver;
 pub(crate) mod fuchsia_pkg_resolver;
@@ -27,3 +26,4 @@
 
 mod builtin;
 mod constants;
+mod diagnostics;