blob: e86d349118834125c89486d4178d527b0001b2e2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context, Error},
diagnostics_hierarchy::{ArrayContent, DiagnosticsHierarchy, Property},
diagnostics_reader::{ArchiveReader, Inspect},
CobaltEvent, CountEvent, EventPayload, HistogramBucket as CobaltHistogramBucket,
LoggerFactoryMarker, LoggerFactoryProxy, LoggerProxy,
HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker,
MetricEventLoggerFactoryProxy, MetricEventLoggerProxy, MetricEventPayload, ProjectSpec,
fuchsia_async::{self as fasync, futures::StreamExt},
fuchsia_inspect::{self as inspect, NumericProperty},
fuchsia_zircon as zx,
futures::{channel::oneshot, future::join_all, select, stream::FuturesUnordered},
DataType, MetricConfig, ParsedSelector, ProjectConfig, SamplerConfig, SelectorList,
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
tracing::{error, info, warn},
// Data structure and algorithm overview
// Cobalt is organized into Projects, each of which contains several Metrics.
// MetricConfig - defined in src/diagnostics/lib/sampler-config/src/
// This is deserialized from Sampler config files or created by FIRE by interpolating
// component information into FIRE config files. It contains
// - selectors: SelectorList
// - metric_id
// - metric_type: DataType
// - event_codes: Vec<u32>
// - upload_once boolean
// - use_legacy_cobalt boolean
// ** NOTE ** Multiple selectors can be provided in a single metric. Only one selector is
// expected to fetch/match data. When one does, the other selectors will be disabled for
// efficiency.
// ProjectConfig - defined in src/diagnostics/lib/sampler-config/src/
// This encodes the contents of a single config file:
// - project_id
// - customer_id
// - poll_rate_sec
// - metrics: Vec<MetricConfig>
// SamplerConfig - defined in src/diagnostics/lib/sampler-config/src/
// The entire config for Sampler. Contains
// - list of ProjectConfig
// - minimum sample rate
// ProjectSampler - defined in src/diagnostics/sampler/src/
// This contains
// - several MetricConfig's
// - an ArchiveReader configured with all active selectors
// - a cache of previous Diagnostic values, indexed by selector strings
// - FIDL proxies for Cobalt and MetricEvent loggers
// - these loggers are configured with project_id and customer_id
// - Poll rate
// - Inspect stats (struct ProjectSamplerStats)
// - moniker_to_selector_map (see below * )
// ProjectSampler is stored in:
// - TaskCancellation: execution_context: fasync::Task<Vec<ProjectSampler>>,
// - RebootSnapshotProcessor: project_samplers: Vec<ProjectSampler>,
// - SamplerExecutor: project_samplers: Vec<ProjectSampler>,
// - ProjectSamplerTaskExit::RebootTriggered(ProjectSampler),
// SamplerExecutor (defined in is built from a single SamplerConfig
// SamplerExecutor contains
// - a list of ProjectSamplers
// - an Inspect stats structure
// SamplerExecutor only has one member function execute() which calls spawn() on each
// project sampler, passing it a receiver-oneshot to cancel it. The collection of
// oneshot-senders and spawned-tasks builds the returned TaskCancellation.
// TaskCancellation is then passed to a reboot_watcher (in src/diagnostics/sampler/src/
// which does nothing until the reboot service either closes (in which case it calls
// run_without_cancellation()) or sends a message (in which case it calls perform_reboot_cleanup()).
// ProjectSampler.spawn() calls fasync::Task::spawn to create a task that starts a timer, then loops
// listening to that timer and to the reboot_oneshot. When the timer triggers, it calls
// self.process_next_snapshot(). If the reboot oneshot arrives, the task returns
// ProjectSamplerTaskExit::RebootTriggered(self).
// * moniker_to_selector_map
// When a ProjectSampler retrieves data, the data arrives from the Archivist organized by moniker.
// For each moniker, we need to visit the selectors that may find data in it. Those selectors may
// be scattered throughout the MetricConfig's in the ProjectSampler.
// moniker_to_selector_map contains a map from moniker to SelectorIndexes, a struct containing the
// index of the MetricConfig in the ProjectSampler's list, and the selector in the MetricConfig's
// list.
// ** NOTE ** The moniker portion of the selectors must match exactly the moniker returned from
// the Archivist. No wildcards.
// Selectors will become unused, either because of upload_once, or because data was found by a
// different selector. Rather than implement deletion in the moniker_to_selector_map and the vec's,
// which would add lots of bug surface and maintenance debt, each selector is an Option<> so that
// selectors can be deleted/disabled without changing the rest of the data structure.
// Once all Diagnostic data is processed, the structure is rebuilt if any selectors
// have been disabled; rebuilding less often would be
// premature optimization at this point.
// perform_reboot_cleanup() builds a moniker_to_project_map for use by the RebootSnapshotProcessor.
// This has a similar purpose to moniker_to_selector_map, since monikers' data may
// be relevant to any number of projects. When not rebooting, each project fetches
// its own data from Archivist, so there's no need for a moniker_to_project_map. But
// on reboot, all projects' data is fetched at once, and needs to be sorted out.
pub struct TaskCancellation {
senders: Vec<oneshot::Sender<()>>,
_sampler_executor_stats: Arc<SamplerExecutorStats>,
execution_context: fasync::Task<Vec<ProjectSampler>>,
impl TaskCancellation {
// It's possible the reboot register goes down. If that
// happens, we want to continue driving execution with
// no consideration for cancellation. This does that.
pub async fn run_without_cancellation(self) {
pub async fn perform_reboot_cleanup(self) {
// Let every sampler know that a reboot is pending and they should exit.
self.senders.into_iter().for_each(|sender| {
.unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err))
// Get the most recently updated project samplers from all the futures. They hold the
// cache with the most recent values for all the metrics we need to sample and diff!
let project_samplers: Vec<ProjectSampler> = self.execution_context.await;
// Maps a selector to the indices of the project_samplers vec which contain configurations
// that transform the property defined by the selector.
let mut moniker_to_projects_map: HashMap<String, HashSet<usize>> = HashMap::new();
// Set of all selectors from all projects
let mut mondo_selectors_set = Vec::new();
for (index, project_sampler) in project_samplers.iter().enumerate() {
for metric in &project_sampler.metrics {
for selector_opt in metric.selectors.iter() {
if let Some(selector) = selector_opt {
if moniker_to_projects_map.get(&selector.moniker).is_none() {
.insert(selector.moniker.to_string(), HashSet::new());
let mut reader = ArchiveReader::new();
let mut reboot_processor =
RebootSnapshotProcessor { reader, project_samplers, moniker_to_projects_map };
// Log errors encountered in final snapshot, but always swallow errors so we can gracefully
// notify RebootMethodsWatcherRegister that we yield our remaining time.
.unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e));
struct RebootSnapshotProcessor {
// Reader constructed from the union of selectors
// for every ProjectSampler config.
reader: ArchiveReader,
// Vector of mutable ProjectSamplers that will
// process their final samples.
project_samplers: Vec<ProjectSampler>,
// Mapping from a moniker to a vector of indices into
// the project_samplers, where each indexed ProjectSampler has
// at least one selector that uses that moniker.
moniker_to_projects_map: HashMap<String, HashSet<usize>>,
impl RebootSnapshotProcessor {
pub async fn process_reboot_sample(&mut self) -> Result<(), Error> {
let snapshot_data = self.reader.snapshot::<Inspect>().await?;
for data_packet in snapshot_data {
let moniker = data_packet.moniker;
match data_packet.payload {
None => {
process_schema_errors(&data_packet.metadata.errors, &moniker);
Some(payload) => self.process_single_payload(payload, &moniker).await,
async fn process_single_payload(
&mut self,
hierarchy: DiagnosticsHierarchy<String>,
moniker: &String,
) {
if let Some(project_indexes) = self.moniker_to_projects_map.get(moniker) {
for index in project_indexes {
let project_sampler = &mut self.project_samplers[*index];
if let Err(err) = project_sampler.process_component_data(&hierarchy, moniker).await
// If processing the final sample failed, just log the
// error and proceed, everything's getting shut down
// soon anyway.
warn!(?err, "A project sampler failed to process a reboot sample");
} else {
warn!(%moniker, "A moniker was not found in the project_samplers map");
/// Owner of the sampler execution context.
pub struct SamplerExecutor {
project_samplers: Vec<ProjectSampler>,
sampler_executor_stats: Arc<SamplerExecutorStats>,
impl SamplerExecutor {
/// Instantiate connection to the cobalt logger and map ProjectConfigurations
/// to ProjectSampler plans.
pub async fn new(sampler_config: SamplerConfig) -> Result<Self, Error> {
let logger_factory: Arc<LoggerFactoryProxy> = Arc::new(
.context("Failed to connect to the Cobalt LoggerFactory")?,
let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new(
.context("Failed to connect to the Metric LoggerFactory")?,
let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec;
let sampler_executor_stats = Arc::new(
.with_inspect(inspect::component::inspector().root(), "sampler_executor_stats")
.unwrap_or_else(|err| {
warn!(?err, "Failed to attach inspector to SamplerExecutorStats struct");
.add(sampler_config.project_configs.len() as u64);
let mut project_to_stats_map: HashMap<u32, Arc<ProjectSamplerStats>> = HashMap::new();
// TODO(42067): Create only one ArchiveReader for each unique poll rate so we
// can avoid redundant snapshots.
let project_sampler_futures =
sampler_config.project_configs.into_iter().map(|project_config| {
let project_sampler_stats =
format!("project_{:?}", project_config.project_id,),
.unwrap_or_else(|err| {
"Failed to attach inspector to ProjectSamplerStats struct"
let mut project_samplers: Vec<ProjectSampler> = Vec::new();
for project_sampler in join_all(project_sampler_futures).await.into_iter() {
match project_sampler {
Ok(project_sampler) => project_samplers.push(project_sampler),
Err(e) => {
warn!("ProjectSampler construction failed: {:?}", e);
Ok(SamplerExecutor { project_samplers, sampler_executor_stats })
/// Turn each ProjectSampler plan into an fasync::Task which executes its associated plan,
/// and process errors if any tasks exit unexpectedly.
pub fn execute(self) -> TaskCancellation {
// Take ownership of the inspect struct so we can give it to the execution context. We do this
// so that the execution context can return the struct when it's halted by reboot, which allows inspect
// properties to survive through the reboot flow.
let task_cancellation_owned_stats = self.sampler_executor_stats.clone();
let execution_context_owned_stats = self.sampler_executor_stats.clone();
let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self
.map(|project_sampler| {
let (sender, receiver) = oneshot::channel::<()>();
(sender, project_sampler.spawn(receiver))
let execution_context = fasync::Task::spawn(async move {
let mut healthily_exited_samplers = Vec::new();
while let Some(sampler_result) = {
match sampler_result {
Err(e) => {
// TODO(42067): Consider restarting the failed sampler depending on
// failure mode.
warn!("A spawned sampler has failed: {:?}", e);
Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => {
Ok(ProjectSamplerTaskExit::WorkCompleted) => {
info!("A sampler completed its workload, and exited.");
TaskCancellation {
_sampler_executor_stats: task_cancellation_owned_stats,
pub struct ProjectSampler {
archive_reader: ArchiveReader,
// The metrics used by this Project. Indexed by moniker_to_selector_map.
metrics: Vec<MetricConfig>,
// Cache from Inspect selector to last sampled property. This is the selector from
// MetricConfig; it may contain wildcards.
metric_cache: HashMap<String, Property>,
// Cobalt logger proxy using this ProjectSampler's project id.
cobalt_logger: Option<LoggerProxy>,
// fuchsia.metrics logger proxy using this ProjectSampler's project id.
metrics_logger: Option<MetricEventLoggerProxy>,
// Map from moniker to relevant selectors.
moniker_to_selector_map: HashMap<String, Vec<SelectorIndexes>>,
// The frequency with which we snapshot Inspect properties
// for this project.
poll_rate_sec: i64,
// Inspect stats on a node namespaced by this project's associated id.
// It's an arc since a single project can have multiple samplers at
// different frequencies, but we want a single project to have one node.
project_sampler_stats: Arc<ProjectSamplerStats>,
#[derive(Clone, Debug)]
struct SelectorIndexes {
// The index of the metric in ProjectSampler's `metrics` list.
metric_index: usize,
// The index of the selector in the MetricConfig's `selectors` list.
selector_index: usize,
pub enum ProjectSamplerTaskExit {
// The project sampler processed a
// reboot signal on its oneshot and
// yielded to the final-snapshot.
// The project sampler has no more
// work to complete; perhaps all
// metrics were "upload_once"?
pub enum ProjectSamplerEvent {
// Indicates whether a sampler in the project has been removed (set to None), in which case the
// ArchiveAccessor should be reconfigured.
// The selector lists may be consolidated (and thus the maps would be rebuilt), but
// the program will run correctly whether they are or not.
enum SnapshotOutcome {
impl ProjectSampler {
pub async fn new(
config: ProjectConfig,
cobalt_logger_factory: Arc<LoggerFactoryProxy>,
metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>,
minimum_sample_rate_sec: i64,
project_sampler_stats: Arc<ProjectSamplerStats>,
) -> Result<ProjectSampler, Error> {
let customer_id = config.customer_id;
let project_id = config.project_id;
let poll_rate_sec = config.poll_rate_sec;
if poll_rate_sec < minimum_sample_rate_sec {
return Err(format_err!(
"Project with id: {:?} uses a polling rate:",
" {:?} below minimum configured poll rate: {:?}"
let mut cobalt_logged: i64 = 0;
let mut metrics_logged: i64 = 0;
for metric_config in &config.metrics {
if metric_config.use_legacy_cobalt.unwrap_or(false) {
cobalt_logged += 1;
} else {
metrics_logged += 1;
project_sampler_stats.metrics_configured.add(config.metrics.len() as u64);
let cobalt_logger = if cobalt_logged > 0 {
let (cobalt_logger_proxy, cobalt_server_end) =
fidl::endpoints::create_proxy().context("Failed to create endpoints")?;
.create_logger_from_project_spec(customer_id, project_id, cobalt_server_end)
} else {
let metrics_logger = if metrics_logged > 0 {
let (metrics_logger_proxy, metrics_server_end) =
fidl::endpoints::create_proxy().context("Failed to create endpoints")?;
let mut project_spec = ProjectSpec::EMPTY;
project_spec.customer_id = Some(customer_id);
project_spec.project_id = Some(project_id);
.create_metric_event_logger(project_spec, metrics_server_end)
} else {
let mut all_selectors = Vec::<String>::new();
for metric in &config.metrics {
for selector_opt in metric.selectors.iter() {
if let Some(selector) = selector_opt {
let mut archive_reader = ArchiveReader::new();
let mut project_sampler = ProjectSampler {
archive_reader: ArchiveReader::new(),
moniker_to_selector_map: HashMap::new(),
metrics: config.metrics,
metric_cache: HashMap::new(),
// Fill in archive_reader and moniker_to_selector_map
pub fn spawn(
mut self,
mut reboot_oneshot: oneshot::Receiver<()>,
) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> {
fasync::Task::spawn(async move {
let mut periodic_timer =
loop {
let done = select! {
opt = => {
if opt.is_some() {
} else {
oneshot_res = reboot_oneshot => {
match oneshot_res {
Ok(()) => {
Err(e) => {
format_err!("Oneshot closure error: {:?}", e))
match done {
ProjectSamplerEvent::TimerDied => {
return Err(format_err!(concat!(
"The ProjectSampler timer died, something went wrong.",
ProjectSamplerEvent::RebootChannelClosed(e) => {
// TODO(42067): Consider differentiating errors if
// we ever want to recover a sampler after a oneshot channel death.
return Err(format_err!(
"The Reboot signaling oneshot died, something went wrong: {:?}",
ProjectSamplerEvent::RebootTriggered => {
// The reboot oneshot triggered, meaning it's time to perform
// our final snapshot. Return self to reuse most recent cache.
return Ok(ProjectSamplerTaskExit::RebootTriggered(self));
ProjectSamplerEvent::TimerTriggered => {
// Check whether this sampler
// still needs to run (perhaps all metrics were
// "upload_once"?). If it doesn't, we want to be
// sure that it is not included in the reboot-workload.
if self.is_all_done() {
return Ok(ProjectSamplerTaskExit::WorkCompleted);
async fn process_next_snapshot(&mut self) -> Result<(), Error> {
let snapshot_data = self.archive_reader.snapshot::<Inspect>().await?;
let mut selectors_changed = false;
for data_packet in snapshot_data.iter() {
match &data_packet.payload {
None => {
process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker);
Some(payload) => {
if self.process_component_data(payload, &data_packet.moniker).await?
== SnapshotOutcome::SelectorsChanged
selectors_changed = true;
if selectors_changed {
fn is_all_done(&self) -> bool {
fn rebuild_selector_data_structures(&mut self) {
let mut all_selectors = vec![];
for mut metric in &mut self.metrics {
// TODO( Using Box<ParsedSelector> could reduce copying.
let active_selectors = metric
.filter(|selector| selector.is_some())
.map(|selector| selector.to_owned())
for selector in active_selectors.iter() {
// unwrap() is OK since we filtered for Some
metric.selectors = SelectorList(active_selectors);
self.archive_reader = ArchiveReader::new();
self.moniker_to_selector_map = HashMap::new();
for (metric_index, metric) in self.metrics.iter().enumerate() {
for (selector_index, selector) in metric.selectors.iter().enumerate() {
let moniker = &selector.as_ref().unwrap().moniker;
if self.moniker_to_selector_map.get(moniker).is_none() {
self.moniker_to_selector_map.insert(moniker.clone(), Vec::new());
.push(SelectorIndexes { metric_index, selector_index });
async fn process_component_data(
&mut self,
payload: &DiagnosticsHierarchy,
moniker: &String,
) -> Result<SnapshotOutcome, Error> {
let indexes_opt = &self.moniker_to_selector_map.get(moniker);
let selector_indexes = match indexes_opt {
None => {
warn!(%moniker, "Moniker not found in map");
return Ok(SnapshotOutcome::SelectorsUnchanged);
Some(indexes) => indexes.to_vec(),
// We cloned the selector indexes. Whatever we do in this function must not invalidate them.
let mut selectors_changed = false;
for index_info in selector_indexes.iter() {
let SelectorIndexes { metric_index, selector_index } = index_info;
let metric = &self.metrics[*metric_index];
// It's fine if a selector has been removed and is None.
if let Some(ParsedSelector { selector, selector_string, .. }) =
let found_values = diagnostics_hierarchy::select_from_hierarchy(
match found_values.len() {
// Maybe the data hasn't been published yet. Maybe another selector in this
// metric is the correct one to find the data. Either way, not-found is fine.
0 => {}
1 => {
// export_sample() needs mut self, so we can't pass in values directly from
// metric, since metric is a ref into data contained in self;
// we have to copy them out first.
let metric_type = metric.metric_type;
let metric_id = metric.metric_id;
let codes = metric.event_codes.clone();
let selector_string = selector_string.to_owned();
let use_cobalt = metric.use_legacy_cobalt == Some(true);
selectors_changed = selectors_changed
|| self.update_metric_selectors(index_info)
== SnapshotOutcome::SelectorsChanged;
too_many => {
warn!(%too_many, %selector_string, "Too many matches for selector")
match selectors_changed {
true => Ok(SnapshotOutcome::SelectorsChanged),
false => Ok(SnapshotOutcome::SelectorsUnchanged),
fn update_metric_selectors(&mut self, index_info: &SelectorIndexes) -> SnapshotOutcome {
let metric = &mut self.metrics[index_info.metric_index];
if let Some(true) = metric.upload_once {
for selector in metric.selectors.iter_mut() {
*selector = None;
return SnapshotOutcome::SelectorsChanged;
let mut deleted = false;
for (index, selector) in metric.selectors.iter_mut().enumerate() {
if index != index_info.selector_index && *selector != None {
*selector = None;
deleted = true;
match deleted {
true => SnapshotOutcome::SelectorsChanged,
false => SnapshotOutcome::SelectorsUnchanged,
async fn export_sample(
&mut self,
metric_type: DataType,
metric_id: u32,
event_codes: Vec<u32>,
selector: &String,
new_sample: &Property,
use_legacy_logger: bool,
) -> Result<(), Error> {
let previous_sample_opt: Option<&Property> = self.metric_cache.get(selector);
if let Some(payload) =
process_sample_for_data_type(new_sample, previous_sample_opt, selector, &metric_type)
self.maybe_update_cache(new_sample, &metric_type, selector);
if use_legacy_logger {
let transformed_payload: EventPayload =
let mut cobalt_event = CobaltEvent {
payload: transformed_payload,
component: None,
self.cobalt_logger.as_ref().unwrap().log_cobalt_event(&mut cobalt_event).await?;
} else {
let mut metric_events = vec![MetricEvent { metric_id, event_codes, payload }];
// Note: The MetricEvent vector can't be marked send because it
// is a dyn object stream and rust can't confirm that it doesn't have handles. This
// is fine because we don't actually need to "send" to make the API call. But if we chain
// the creation of the future with the await on the future, rust interperets all variables
// including the reference to the event vector as potentially being needed across the await.
// So we have to split the creation of the future out from the await on the future. :(
let log_future = self
.log_metric_events(&mut metric_events.iter_mut());
fn maybe_update_cache(
&mut self,
new_sample: &Property,
data_type: &DataType,
selector: &String,
) {
match data_type {
DataType::Occurrence | DataType::IntHistogram => {
self.metric_cache.insert(selector.clone(), new_sample.clone());
DataType::Integer => (),
// This is only called for Cobalt 1.0 metrics.
fn transform_metrics_payload_to_cobalt(payload: MetricEventPayload) -> EventPayload {
match payload {
MetricEventPayload::Count(count) => {
// Safe to unwrap because we use cobalt v1.0 sanitization when constructing the Count metric event payload.
EventPayload::EventCount(CountEvent {
count: count.try_into().unwrap(),
period_duration_micros: 0,
// Cobalt 1.0 doesn't have Integer values, MEMORY_USED is the closest approximation.
MetricEventPayload::IntegerValue(value) => EventPayload::MemoryBytesUsed(value),
MetricEventPayload::Histogram(hist) => {
let legacy_histogram = hist
.map(|metric_bucket| CobaltHistogramBucket {
index: metric_bucket.index,
count: metric_bucket.count,
_ => unreachable!("We only support count, int, and histogram"),
fn process_sample_for_data_type(
new_sample: &Property,
previous_sample_opt: Option<&Property>,
selector: &String,
data_type: &DataType,
) -> Option<MetricEventPayload> {
let event_payload_res = match data_type {
DataType::Occurrence => process_occurence(new_sample, previous_sample_opt, selector),
DataType::IntHistogram => process_int_histogram(new_sample, previous_sample_opt, selector),
DataType::Integer => {
// If we previously cached a metric with an int-type, log a warning and ignore it.
// This may be a case of using a single selector for two metrics, one event count
// and one int.
if previous_sample_opt.is_some() {
error!("Sampler has erroneously cached an Int type metric: {:?}", selector);
process_int(new_sample, selector)
match event_payload_res {
Ok(payload_opt) => payload_opt,
Err(e) => {
warn!(concat!("Failed to process Inspect property for cobalt: {:?}"), e);
// It's possible for Inspect numerical properties to experience overflows/conversion
// errors when being mapped to cobalt types. Sanitize these numericals, and provide
// meaningful errors.
fn sanitize_unsigned_numerical(diff: u64, selector: &str) -> Result<i64, Error> {
match diff.try_into() {
Ok(diff) => Ok(diff),
Err(e) => {
return Err(format_err!(
"Selector used for EventCount type",
" refered to an unsigned int property,",
" but cobalt requires i64, and casting introduced overflow",
" which produces a negative int: {:?}. This could be due to",
" a single sample being larger than i64, or a diff between",
" samples being larger than i64. Error: {:?}"
fn process_int_histogram(
new_sample: &Property,
prev_sample_opt: Option<&Property>,
selector: &String,
) -> Result<Option<MetricEventPayload>, Error> {
let diff = match prev_sample_opt {
None => convert_inspect_histogram_to_cobalt_histogram(new_sample, selector)?,
Some(prev_sample) => {
// If the data type changed then we just reset the cache.
if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) {
compute_histogram_diff(new_sample, prev_sample, selector)?
} else {
convert_inspect_histogram_to_cobalt_histogram(new_sample, selector)?
if diff.iter().any(|v| v.count != 0) {
} else {
fn compute_histogram_diff(
new_sample: &Property,
old_sample: &Property,
selector: &String,
) -> Result<Vec<HistogramBucket>, Error> {
let new_histogram_buckets =
convert_inspect_histogram_to_cobalt_histogram(new_sample, selector)?;
let old_histogram_buckets =
convert_inspect_histogram_to_cobalt_histogram(old_sample, selector)?;
if old_histogram_buckets.len() != new_histogram_buckets.len() {
return Err(format_err!(
"Selector referenced an Inspect IntArray",
" that was specified as an IntHistogram type ",
" but the histogram bucket count changed between",
" samples, which is incompatible with Cobalt.",
" Selector: {:?}, Inspect type: {}"
.map(|(new_bucket, old_bucket)| {
if new_bucket.count < old_bucket.count {
return Err(format_err!(
"Selector referenced an Inspect IntArray",
" that was specified as an IntHistogram type ",
" but atleast one bucket saw the count decrease",
" between samples, which is incompatible with Cobalt's",
" need for monotonically increasing counts.",
" Selector: {:?}, Inspect type: {}"
Ok(HistogramBucket {
count: new_bucket.count - old_bucket.count,
index: new_bucket.index,
.collect::<Result<Vec<HistogramBucket>, Error>>()
fn convert_inspect_histogram_to_cobalt_histogram(
inspect_histogram: &Property,
selector: &String,
) -> Result<Vec<HistogramBucket>, Error> {
let histogram_bucket_constructor =
|index: usize, count: u64| -> Result<HistogramBucket, Error> {
match u32::try_from(index) {
Ok(index) => Ok(HistogramBucket { index, count }),
Err(_) => Err(format_err!(
"Selector referenced an Inspect IntArray",
" that was specified as an IntHistogram type ",
" but a bucket contained a negative count. This",
" is incompatible with Cobalt histograms which only",
" support positive histogram counts.",
" vector. Selector: {:?}, Inspect type: {}"
match inspect_histogram {
Property::IntArray(_, ArrayContent::Buckets(bucket_vec)) => bucket_vec
.map(|(index, bucket)| {
if bucket.count < 0 {
return Err(format_err!(
"Selector referenced an Inspect IntArray",
" that was specified as an IntHistogram type ",
" but a bucket contained a negative count. This",
" is incompatible with Cobalt histograms which only",
" support positive histogram counts.",
" vector. Selector: {:?}, Inspect type: {}"
// Count is a non-negative i64, so casting with `as` is safe from
// truncations.
histogram_bucket_constructor(index, bucket.count as u64)
.collect::<Result<Vec<HistogramBucket>, Error>>(),
Property::UintArray(_, ArrayContent::Buckets(bucket_vec)) => bucket_vec
.map(|(index, bucket)| histogram_bucket_constructor(index, bucket.count))
.collect::<Result<Vec<HistogramBucket>, Error>>(),
_ => {
// TODO(42067): Does cobalt support floors or step counts that are
// not ints? if so, we can support that as well with double arrays if the
// actual counts are whole numbers.
return Err(format_err!(
"Selector referenced an Inspect property",
" that was specified as an IntHistogram type ",
" but is unable to be encoded in a cobalt HistogramBucket",
" vector. Selector: {:?}, Inspect type: {}"
fn process_int(
new_sample: &Property,
selector: &String,
) -> Result<Option<MetricEventPayload>, Error> {
let sampled_int = match new_sample {
Property::Uint(_, val) => sanitize_unsigned_numerical(val.clone(), selector)?,
Property::Int(_, val) => val.clone(),
_ => {
return Err(format_err!(
"Selector referenced an Inspect property",
" that was specified as an Int type ",
" but is unable to be encoded in an i64",
" Selector: {:?}, Inspect type: {}"
fn process_occurence(
new_sample: &Property,
prev_sample_opt: Option<&Property>,
selector: &String,
) -> Result<Option<MetricEventPayload>, Error> {
let diff = match prev_sample_opt {
None => compute_initial_event_count(new_sample, selector)?,
Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, selector)?,
if diff < 0 {
return Err(format_err!(
"Event count must be monotonically increasing,",
" but we observed a negative event count diff for: {:?}"
if diff == 0 {
return Ok(None);
// TODO(42067): Once fuchsia.cobalt is gone, we don't need to preserve
// occurence counts "fitting" into i64s.
Ok(Some(MetricEventPayload::Count(diff as u64)))
fn compute_initial_event_count(new_sample: &Property, selector: &String) -> Result<i64, Error> {
match new_sample {
Property::Uint(_, val) => sanitize_unsigned_numerical(val.clone(), selector),
Property::Int(_, val) => Ok(val.clone()),
_ => Err(format_err!(
"Selector referenced an Inspect property",
" that is not compatible with cached",
" transformation to an event count.",
" Selector: {:?}, {}"
fn compute_event_count_diff(
new_sample: &Property,
old_sample: &Property,
selector: &String,
) -> Result<i64, Error> {
match (new_sample, old_sample) {
// We don't need to validate that old_count and new_count are positive here.
// If new_count was negative, and old_count was positive, then the diff would be
// negative, which is an errorful state. It's impossible for old_count to be negative
// as either it was the first sample which would make a negative diff which is an error,
// or it was a negative new_count with a positive old_count, which we've already shown will
// produce an errorful state.
(Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count),
(Property::Uint(_, new_count), Property::Uint(_, old_count)) => {
sanitize_unsigned_numerical(new_count - old_count, selector)
// If we have a correctly typed new sample, but it didn't match either of the above cases,
// this means the new sample changed types compared to the old sample. We should just
// restart the cache, and treat the new sample as a first observation.
(_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => {
"Inspect type of sampled data changed between samples. Restarting cache. {}",
compute_initial_event_count(new_sample, selector)
_ => Err(format_err!(
"Inspect type of sampled data changed between samples",
" to a type incompatible with event counters.",
" Selector: {:?}, New type: {:?}"
fn process_schema_errors(errors: &Option<Vec<diagnostics_data::Error>>, moniker: &String) {
match errors {
Some(errors) => {
for error in errors {
if !error.message.contains("Inspect hierarchy was fully filtered") {
warn!("Moniker: {}, Error: {:?}", moniker, error);
None => {
warn!("Moniker: {} encountered null payload and no errors.", moniker);
mod tests {
use super::*;
use diagnostics_hierarchy::{hierarchy, Bucket};
use futures::executor;
fn test_process_payload_with_escapes() {
// Inserting a string into the hierarchy that requires escaping.
let unescaped: String = "path/to".to_string();
let hierarchy = hierarchy! {
root: {
var unescaped: {
value: 0,
"value/with:escapes": 0,
let mut sampler = ProjectSampler {
archive_reader: ArchiveReader::new(),
moniker_to_selector_map: HashMap::new(),
metrics: vec![],
metric_cache: HashMap::new(),
cobalt_logger: None,
metrics_logger: None,
poll_rate_sec: 3600,
project_sampler_stats: Arc::new(ProjectSamplerStats::new()),
let selector: String = "my/component:root/path\\/to:value".to_string();
sampler.metrics.push(MetricConfig {
selectors: SelectorList(vec![sampler_config::parse_selector_for_test(&selector)]),
metric_id: 1,
// Occurrence type with a value of zero will not attempt to use any loggers.
metric_type: DataType::Occurrence,
event_codes: Vec::new(),
// upload_once means that process_component_data will return SelectorsChanged if
// it is found in the map.
upload_once: Some(true),
use_legacy_cobalt: Some(false),
match executor::block_on(
sampler.process_component_data(&hierarchy, &"my/component".to_string()),
) {
// This selector will be found and removed from the map, resulting in a
// SelectorsChanged response.
Ok(SnapshotOutcome::SelectorsChanged) => (),
_ => panic!("Expecting SelectorsChanged from process_component_data."),
let selector_with_escaped_property: String =
sampler.metrics.push(MetricConfig {
selectors: SelectorList(vec![sampler_config::parse_selector_for_test(
metric_id: 1,
// Occurrence type with a value of zero will not attempt to use any loggers.
metric_type: DataType::Occurrence,
event_codes: Vec::new(),
// upload_once means that the method will return SelectorsChanged if it is found
// in the map.
upload_once: Some(true),
use_legacy_cobalt: Some(false),
match executor::block_on(
sampler.process_component_data(&hierarchy, &"my/component".to_string()),
) {
// This selector will be found and removed from the map, resulting in a
// SelectorsChanged response.
Ok(SnapshotOutcome::SelectorsChanged) => (),
_ => panic!("Expecting SelectorsChanged from process_component_data."),
let selector_unfound: String = "my/component:root/path/to:value".to_string();
sampler.metrics.push(MetricConfig {
selectors: SelectorList(vec![sampler_config::parse_selector_for_test(
metric_id: 1,
// Occurrence type with a value of zero will not attempt to use any loggers.
metric_type: DataType::Occurrence,
event_codes: Vec::new(),
// upload_once means that the method will return SelectorsChanged if it is found
// in the map.
upload_once: Some(true),
use_legacy_cobalt: Some(false),
match executor::block_on(
sampler.process_component_data(&hierarchy, &"my/component".to_string()),
) {
// This selector will not be found and removed from the map, resulting in SelectorsUnchanged.
Ok(SnapshotOutcome::SelectorsUnchanged) => (),
_ => panic!("Expecting SelectorsUnchanged from process_component_data."),
struct EventCountTesterParams {
new_val: Property,
old_val: Option<Property>,
process_ok: bool,
event_made: bool,
diff: i64,
fn process_occurence_tester(params: EventCountTesterParams) {
let selector: String = "test:root:count".to_string();
let event_res = process_occurence(&params.new_val, params.old_val.as_ref(), &selector);
if !params.process_ok {
let event_opt = event_res.unwrap();
if !params.event_made {
let event = event_opt.unwrap();
match event {
MetricEventPayload::Count(count) => {
assert_eq!(count, params.diff as u64);
_ => panic!("Expecting event counts."),
let transformed_event = transform_metrics_payload_to_cobalt(event);
match transformed_event {
EventPayload::EventCount(count_event) => {
assert_eq!(count_event.count, params.diff);
_ => panic!("Expecting count events."),
fn test_normal_process_occurence() {
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), 1),
old_val: None,
process_ok: true,
event_made: true,
diff: 1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), 1),
old_val: Some(Property::Int("count".to_string(), 1)),
process_ok: true,
event_made: false,
diff: -1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), 3),
old_val: Some(Property::Int("count".to_string(), 1)),
process_ok: true,
event_made: true,
diff: 2,
fn test_data_type_changing_process_occurence() {
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), 1),
old_val: None,
process_ok: true,
event_made: true,
diff: 1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Uint("count".to_string(), 1),
old_val: None,
process_ok: true,
event_made: true,
diff: 1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Uint("count".to_string(), 3),
old_val: Some(Property::Int("count".to_string(), 1)),
process_ok: true,
event_made: true,
diff: 3,
process_occurence_tester(EventCountTesterParams {
new_val: Property::String("count".to_string(), "big_oof".to_string()),
old_val: Some(Property::Int("count".to_string(), 1)),
process_ok: false,
event_made: false,
diff: -1,
fn test_event_count_negatives_and_overflows() {
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), -11),
old_val: None,
process_ok: false,
event_made: false,
diff: -1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Int("count".to_string(), 9),
old_val: Some(Property::Int("count".to_string(), 10)),
process_ok: false,
event_made: false,
diff: -1,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Uint("count".to_string(), std::u64::MAX),
old_val: None,
process_ok: false,
event_made: false,
diff: -1,
let i64_max_in_u64: u64 = std::i64::MAX.try_into().unwrap();
process_occurence_tester(EventCountTesterParams {
new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
old_val: Some(Property::Uint("count".to_string(), 1)),
process_ok: true,
event_made: true,
diff: std::i64::MAX,
process_occurence_tester(EventCountTesterParams {
new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2),
old_val: Some(Property::Uint("count".to_string(), 1)),
process_ok: false,
event_made: false,
diff: -1,
struct IntTesterParams {
new_val: Property,
process_ok: bool,
sample: i64,
fn process_int_tester(params: IntTesterParams) {
let selector: String = "test:root:count".to_string();
let event_res = process_int(&params.new_val, &selector);
if !params.process_ok {
let event_opt = event_res.unwrap();
let event = event_opt.unwrap();
match event {
MetricEventPayload::IntegerValue(val) => {
assert_eq!(val, params.sample);
_ => panic!("Expecting event counts."),
let transformed_event = transform_metrics_payload_to_cobalt(event);
match transformed_event {
EventPayload::MemoryBytesUsed(value) => {
assert_eq!(value, params.sample);
_ => panic!("Expecting count events."),
fn test_normal_process_int() {
process_int_tester(IntTesterParams {
new_val: Property::Int("count".to_string(), 13),
process_ok: true,
sample: 13,
process_int_tester(IntTesterParams {
new_val: Property::Int("count".to_string(), -13),
process_ok: true,
sample: -13,
process_int_tester(IntTesterParams {
new_val: Property::Int("count".to_string(), 0),
process_ok: true,
sample: 0,
process_int_tester(IntTesterParams {
new_val: Property::Uint("count".to_string(), 13),
process_ok: true,
sample: 13,
process_int_tester(IntTesterParams {
new_val: Property::String("count".to_string(), "big_oof".to_string()),
process_ok: false,
sample: -1,
fn test_int_edge_cases() {
process_int_tester(IntTesterParams {
new_val: Property::Int("count".to_string(), std::i64::MAX),
process_ok: true,
sample: std::i64::MAX,
process_int_tester(IntTesterParams {
new_val: Property::Int("count".to_string(), std::i64::MIN),
process_ok: true,
sample: std::i64::MIN,
let i64_max_in_u64: u64 = std::i64::MAX.try_into().unwrap();
process_int_tester(IntTesterParams {
new_val: Property::Uint("count".to_string(), i64_max_in_u64),
process_ok: true,
sample: std::i64::MAX,
process_int_tester(IntTesterParams {
new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1),
process_ok: false,
sample: -1,
fn create_inspect_bucket_vec<T: Copy>(hist: Vec<T>) -> Vec<Bucket<T>> {
.map(|val| Bucket {
// Cobalt doesn't use the Inspect floor and ceiling, so
// lets use val for them since its the only thing available
// with type T.
floor: *val,
ceiling: *val,
count: *val,
fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property<String> {
let bucket_vec = create_inspect_bucket_vec::<i64>(hist);
Property::IntArray("Bloop".to_string(), ArrayContent::Buckets(bucket_vec))
fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> {
let bucket_vec = create_inspect_bucket_vec::<u64>(hist);
Property::UintArray("Bloop".to_string(), ArrayContent::Buckets(bucket_vec))
struct IntHistogramTesterParams {
new_val: Property,
old_val: Option<Property>,
process_ok: bool,
event_made: bool,
diff: Vec<u64>,
fn process_int_histogram_tester(params: IntHistogramTesterParams) {
let selector: String = "test:root:count".to_string();
let event_res = process_int_histogram(&params.new_val, params.old_val.as_ref(), &selector);
if !params.process_ok {
let event_opt = event_res.unwrap();
if !params.event_made {
let event = event_opt.unwrap();
match event.clone() {
MetricEventPayload::Histogram(histogram_buckets) => {
assert_eq!(histogram_buckets.len(), params.diff.len());
let expected_histogram_buckets = params
.map(|(index, count)| HistogramBucket {
index: u32::try_from(index).unwrap(),
count: *count,
assert_eq!(histogram_buckets, expected_histogram_buckets);
_ => panic!("Expecting int histogram."),
let transformed_event = transform_metrics_payload_to_cobalt(event);
match transformed_event {
EventPayload::IntHistogram(histogram_buckets) => {
assert_eq!(histogram_buckets.len(), params.diff.len());
let expected_histogram_buckets = params
.map(|(index, count)| CobaltHistogramBucket {
index: u32::try_from(index).unwrap(),
count: *count,
assert_eq!(histogram_buckets, expected_histogram_buckets);
_ => panic!("Expecting int histogram."),
fn test_normal_process_int_histogram() {
// Test that simple in-bounds first-samples of both types of Inspect histograms
// produce correct event types.
let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]);
let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_i64_sample,
old_val: None,
process_ok: true,
event_made: true,
diff: vec![1, 1, 1, 1],
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: None,
process_ok: true,
event_made: true,
diff: vec![1, 1, 1, 1],
// Test an Inspect uint histogram at the boundaries of the type produce valid
// cobalt events.
let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]);
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: None,
process_ok: true,
event_made: true,
diff: vec![u64::MAX, u64::MAX, u64::MAX],
// Test that an empty Inspect histogram produces no event.
let new_u64_sample = convert_vector_to_uint_histogram(Vec::new());
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: None,
process_ok: true,
event_made: false,
diff: Vec::new(),
let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]);
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: None,
process_ok: true,
event_made: false,
diff: Vec::new(),
// Test that monotonically increasing histograms are good!.
let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1]));
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: old_u64_sample,
process_ok: true,
event_made: true,
diff: vec![1, 0, 0, 0],
let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_i64_sample,
old_val: old_i64_sample,
process_ok: true,
event_made: true,
diff: vec![4, 1, 0, 2],
// Test that changing the histogram type resets the cache.
let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]);
let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1]));
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: old_i64_sample,
process_ok: true,
event_made: true,
diff: vec![2, 1, 1, 1],
fn test_errorful_process_int_histogram() {
// Test that changing the histogram length is an error.
let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]);
let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1]));
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_u64_sample,
old_val: old_u64_sample,
process_ok: false,
event_made: false,
diff: Vec::new(),
// Test that new samples cant have negative values.
let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]);
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_i64_sample,
old_val: None,
process_ok: false,
event_made: false,
diff: Vec::new(),
// Test that histograms must be monotonically increasing.
let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]);
let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1]));
process_int_histogram_tester(IntHistogramTesterParams {
new_val: new_i64_sample,
old_val: old_i64_sample,
process_ok: false,
event_made: false,
diff: Vec::new(),