| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| //! # Data Structure and Algorithm Overview |
| //! |
| //! Cobalt is organized into Projects, each of which contains several Metrics. |
| //! |
| //! [`MetricConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs |
| //! This is deserialized from Sampler config files or created by FIRE by interpolating |
| //! component information into FIRE config files. It contains |
| //! - selectors: SelectorList |
| //! - metric_id |
| //! - metric_type: DataType |
| //! - event_codes: Vec<u32> |
| //! - upload_once boolean |
| //! |
| //! **NOTE:** Multiple selectors can be provided in a single metric. Only one selector is |
| //! expected to fetch/match data. When one does, the other selectors will be disabled for |
| //! efficiency. |
| //! |
| //! [`ProjectConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs |
| //! This encodes the contents of a single config file: |
| //! - project_id |
| //! - customer_id (defaults to 1) |
| //! - poll_rate_sec |
| //! - metrics: Vec<MetricConfig> |
| //! |
| //! [`SamplerConfig`] - defined in src/diagnostics/lib/sampler-config/src/lib.rs |
| //! The entire config for Sampler. Contains |
| //! - list of ProjectConfig |
| //! - minimum sample rate |
| //! |
| //! [`ProjectSampler`] - defined in src/diagnostics/sampler/src/executor.rs |
| //! This contains |
| //! - several MetricConfig's |
| //! - an ArchiveReader configured with all active selectors |
| //! - a cache of previous Diagnostic values, indexed by selector strings |
| //! - FIDL proxies for Cobalt and MetricEvent loggers |
| //! - these loggers are configured with project_id and customer_id |
| //! - Poll rate |
| //! - Inspect stats (struct ProjectSamplerStats) |
| //! - moniker_to_selector_map (see below * ) |
| //! |
| //! [`ProjectSampler`] is stored in: |
| //! - [`TaskCancellation`]: execution_context: fasync::Task<Vec<ProjectSampler>>, |
| //! - [`RebootSnapshotProcessor`]: project_samplers: Vec<ProjectSampler>, |
| //! - [`SamplerExecutor`]: project_samplers: Vec<ProjectSampler>, |
| //! - [`ProjectSamplerTaskExit::RebootTriggered(ProjectSampler)`], |
| //! |
| //! [`SamplerExecutor`] (defined in executor.rs) is built from a single [`SamplerConfig`]. |
| //! [`SamplerExecutor`] contains |
| //! - a list of ProjectSamplers |
| //! - an Inspect stats structure |
| //! [`SamplerExecutor`] only has one member function execute() which calls spawn() on each |
| //! project sampler, passing it a receiver-oneshot to cancel it. The collection of |
| //! oneshot-senders and spawned-tasks builds the returned TaskCancellation. |
| //! |
| //! [`TaskCancellation`] is then passed to a reboot_watcher (in src/diagnostics/sampler/src/lib.rs) |
| //! which does nothing until the reboot service either closes (calling |
| //! run_without_cancellation()) or sends a message (calling perform_reboot_cleanup()). |
| //! |
| //! [`ProjectSampler`] calls fasync::Task::spawn to create a task that starts a timer, then loops |
| //! listening to that timer and to the reboot_oneshot. When the timer triggers, it calls |
| //! self.process_next_snapshot(). If the reboot oneshot arrives, the task returns |
| //! [`ProjectSamplerTaskExit::RebootTriggered(self)`]. |
| //! |
| //! |
| //! * moniker_to_selector_map |
| //! |
| //! When a [`ProjectSampler`] retrieves data, the data arrives from the Archivist organized by |
| //! moniker. For each moniker, we need to visit the selectors that may find data in it. Those |
| //! selectors may be scattered throughout the [`MetricConfig`]'s in the [`ProjectSampler`]. |
| //! moniker_to_selector_map contains a map from moniker to [`SelectorIndexes`], a struct containing |
| //! the index of the [`MetricConfig`] in the [`ProjectSampler`]'s list, and the selector in the |
| //! [`MetricConfig`]'s list. |
| //! |
| //! **NOTE:** The moniker portion of the selectors must match exactly the moniker returned from |
| //! the Archivist. No wildcards. |
| //! |
| //! Selectors will become unused, either because of upload_once, or because data was found by a |
| //! different selector. Rather than implement deletion in the moniker_to_selector_map and the vec's, |
| //! which would add lots of bug surface and maintenance debt, each selector is an Option<> so that |
| //! selectors can be deleted/disabled without changing the rest of the data structure. |
| //! Once all Diagnostic data is processed, the structure is rebuilt if any selectors |
| //! have been disabled; rebuilding less often would be |
| //! premature optimization at this point. |
| //! |
| //! perform_reboot_cleanup() builds a moniker_to_project_map for use by the |
| //! [`RebootSnapshotProcessor`]. This has a similar purpose to moniker_to_selector_map, since |
| //! monikers' data may be relevant to any number of projects. When not rebooting, each project |
| //! fetches its own data from Archivist, so there's no need for a moniker_to_project_map. But |
| //! on reboot, all projects' data is fetched at once, and needs to be sorted out. |
| |
| use { |
| crate::diagnostics::*, |
| anyhow::{format_err, Context, Error}, |
| diagnostics_data::{Data, InspectHandleName}, |
| diagnostics_hierarchy::{ |
| ArrayContent, DiagnosticsHierarchy, ExponentialHistogram, LinearHistogram, Property, |
| }, |
| diagnostics_reader::{ArchiveReader, Inspect, RetryConfig}, |
| fidl_fuchsia_metrics::{ |
| HistogramBucket, MetricEvent, MetricEventLoggerFactoryMarker, |
| MetricEventLoggerFactoryProxy, MetricEventLoggerProxy, MetricEventPayload, ProjectSpec, |
| }, |
| fuchsia_async as fasync, |
| fuchsia_component::client::connect_to_protocol, |
| fuchsia_inspect::{self as inspect, NumericProperty}, |
| fuchsia_inspect_derive::WithInspect, |
| fuchsia_zircon as zx, |
| futures::{channel::oneshot, future::join_all, select, stream::FuturesUnordered, StreamExt}, |
| sampler_config::{ |
| DataType, MetricConfig, ParsedSelector, ProjectConfig, SamplerConfig, SelectorList, |
| }, |
| std::{ |
| collections::{HashMap, HashSet}, |
| sync::Arc, |
| }, |
| tracing::{info, warn}, |
| }; |
| |
| /// An event to be logged to the cobalt logger. Events are generated first, |
| /// then logged. (This permits unit-testing the code that generates events from |
| /// Diagnostic data.) |
| type EventToLog = (u32, MetricEvent); |
| |
| pub struct TaskCancellation { |
| senders: Vec<oneshot::Sender<()>>, |
| _sampler_executor_stats: Arc<SamplerExecutorStats>, |
| execution_context: fasync::Task<Vec<ProjectSampler>>, |
| } |
| |
| impl TaskCancellation { |
| /// It's possible the reboot register goes down. If that |
| /// happens, we want to continue driving execution with |
| /// no consideration for cancellation. This does that. |
| pub async fn run_without_cancellation(self) { |
| self.execution_context.await; |
| } |
| |
| pub async fn perform_reboot_cleanup(self) { |
| // Let every sampler know that a reboot is pending and they should exit. |
| self.senders.into_iter().for_each(|sender| { |
| sender |
| .send(()) |
| .unwrap_or_else(|err| warn!("Failed to send reboot over oneshot: {:?}", err)) |
| }); |
| |
| // Get the most recently updated project samplers from all the futures. They hold the |
| // cache with the most recent values for all the metrics we need to sample and diff! |
| let project_samplers: Vec<ProjectSampler> = self.execution_context.await; |
| |
| // Maps a selector to the indices of the project_samplers vec which contain configurations |
| // that transform the property defined by the selector. |
| let mut moniker_to_projects_map: HashMap<String, HashSet<usize>> = HashMap::new(); |
| // Set of all selectors from all projects |
| let mut mondo_selectors_set = Vec::new(); |
| |
| for (index, project_sampler) in project_samplers.iter().enumerate() { |
| for metric in &project_sampler.metrics { |
| for selector_opt in metric.selectors.iter() { |
| if let Some(selector) = selector_opt { |
| if moniker_to_projects_map.get(&selector.moniker).is_none() { |
| moniker_to_projects_map |
| .insert(selector.moniker.to_string(), HashSet::new()); |
| } |
| moniker_to_projects_map.get_mut(&selector.moniker).unwrap().insert(index); |
| mondo_selectors_set.push(selector.selector_string.clone()); |
| } |
| } |
| } |
| } |
| |
| let mut reader = ArchiveReader::new(); |
| reader.retry(RetryConfig::never()).add_selectors(mondo_selectors_set.into_iter()); |
| |
| let mut reboot_processor = |
| RebootSnapshotProcessor { reader, project_samplers, moniker_to_projects_map }; |
| // Log errors encountered in final snapshot, but always swallow errors so we can gracefully |
| // notify RebootMethodsWatcherRegister that we yield our remaining time. |
| reboot_processor |
| .process_reboot_sample() |
| .await |
| .unwrap_or_else(|e| warn!("Reboot snapshot failed! {:?}", e)); |
| } |
| } |
| |
| struct RebootSnapshotProcessor { |
| /// Reader constructed from the union of selectors |
| /// for every [`ProjectSampler`] config. |
| reader: ArchiveReader, |
| /// Vector of mutable [`ProjectSampler`] objects that will |
| /// process their final samples. |
| project_samplers: Vec<ProjectSampler>, |
| /// Mapping from a moniker to a vector of indices into |
| /// the project_samplers, where each indexed [`ProjectSampler`] has |
| /// at least one selector that uses that moniker. |
| moniker_to_projects_map: HashMap<String, HashSet<usize>>, |
| } |
| |
| impl RebootSnapshotProcessor { |
| pub async fn process_reboot_sample(&mut self) -> Result<(), Error> { |
| let snapshot_data = self.reader.snapshot::<Inspect>().await?; |
| for data_packet in snapshot_data { |
| let moniker = data_packet.moniker; |
| match data_packet.payload { |
| None => { |
| process_schema_errors(&data_packet.metadata.errors, &moniker); |
| } |
| Some(payload) => { |
| self.process_single_payload(payload, &data_packet.metadata.name, &moniker).await |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn process_single_payload( |
| &mut self, |
| hierarchy: DiagnosticsHierarchy<String>, |
| diagnostics_filename: &Option<InspectHandleName>, |
| moniker: &String, |
| ) { |
| if let Some(project_indexes) = self.moniker_to_projects_map.get(moniker) { |
| for index in project_indexes { |
| let project_sampler = &mut self.project_samplers[*index]; |
| // If processing the final sample failed, just log the |
| // error and proceed, everything's getting shut down |
| // soon anyway. |
| let maybe_err = match project_sampler |
| .process_component_data(&hierarchy, diagnostics_filename, moniker) |
| .await |
| { |
| Err(err) => Some(err), |
| Ok((_selector_changes, events_to_log)) => { |
| project_sampler.log_events(events_to_log).await.err() |
| } |
| }; |
| if let Some(err) = maybe_err { |
| warn!(?err, "A project sampler failed to process a reboot sample"); |
| } |
| } |
| } else { |
| warn!(%moniker, "A moniker was not found in the project_samplers map"); |
| } |
| } |
| } |
| |
| /// Owner of the sampler execution context. |
| pub struct SamplerExecutor { |
| project_samplers: Vec<ProjectSampler>, |
| sampler_executor_stats: Arc<SamplerExecutorStats>, |
| } |
| |
| impl SamplerExecutor { |
| /// Instantiate connection to the cobalt logger and map ProjectConfigurations |
| /// to [`ProjectSampler`] plans. |
| pub async fn new(sampler_config: Arc<SamplerConfig>) -> Result<Self, Error> { |
| let metric_logger_factory: Arc<MetricEventLoggerFactoryProxy> = Arc::new( |
| connect_to_protocol::<MetricEventLoggerFactoryMarker>() |
| .context("Failed to connect to the Metric LoggerFactory")?, |
| ); |
| |
| let minimum_sample_rate_sec = sampler_config.minimum_sample_rate_sec; |
| |
| let sampler_executor_stats = Arc::new( |
| SamplerExecutorStats::new() |
| .with_inspect(inspect::component::inspector().root(), "sampler_executor_stats") |
| .unwrap_or_else(|err| { |
| warn!(?err, "Failed to attach inspector to SamplerExecutorStats struct"); |
| SamplerExecutorStats::default() |
| }), |
| ); |
| |
| sampler_executor_stats |
| .total_project_samplers_configured |
| .add(sampler_config.project_configs.len() as u64); |
| |
| let mut project_to_stats_map: HashMap<u32, Arc<ProjectSamplerStats>> = HashMap::new(); |
| // TODO(https://fxbug.dev/42118220): Create only one ArchiveReader for each unique poll rate so we |
| // can avoid redundant snapshots. |
| let project_sampler_futures = |
| sampler_config.project_configs.iter().cloned().map(|project_config| { |
| let project_sampler_stats = |
| project_to_stats_map.entry(project_config.project_id).or_insert(Arc::new( |
| ProjectSamplerStats::new() |
| .with_inspect( |
| &sampler_executor_stats.inspect_node, |
| format!("project_{:?}", project_config.project_id,), |
| ) |
| .unwrap_or_else(|err| { |
| warn!( |
| ?err, |
| "Failed to attach inspector to ProjectSamplerStats struct" |
| ); |
| ProjectSamplerStats::default() |
| }), |
| )); |
| ProjectSampler::new( |
| project_config, |
| metric_logger_factory.clone(), |
| minimum_sample_rate_sec, |
| project_sampler_stats.clone(), |
| ) |
| }); |
| |
| let mut project_samplers: Vec<ProjectSampler> = Vec::new(); |
| for project_sampler in join_all(project_sampler_futures).await.into_iter() { |
| match project_sampler { |
| Ok(project_sampler) => project_samplers.push(project_sampler), |
| Err(e) => { |
| warn!("ProjectSampler construction failed: {:?}", e); |
| } |
| } |
| } |
| Ok(SamplerExecutor { project_samplers, sampler_executor_stats }) |
| } |
| |
| /// Turn each [`ProjectSampler`] plan into an [`fasync::Task`] which executes its associated plan, |
| /// and process errors if any tasks exit unexpectedly. |
| pub fn execute(self) -> TaskCancellation { |
| // Take ownership of the inspect struct so we can give it to the execution context. We do this |
| // so that the execution context can return the struct when it's halted by reboot, which allows inspect |
| // properties to survive through the reboot flow. |
| let task_cancellation_owned_stats = self.sampler_executor_stats.clone(); |
| let execution_context_owned_stats = self.sampler_executor_stats.clone(); |
| |
| let (senders, mut spawned_tasks): (Vec<oneshot::Sender<()>>, FuturesUnordered<_>) = self |
| .project_samplers |
| .into_iter() |
| .map(|project_sampler| { |
| let (sender, receiver) = oneshot::channel::<()>(); |
| (sender, project_sampler.spawn(receiver)) |
| }) |
| .unzip(); |
| |
| let execution_context = fasync::Task::spawn(async move { |
| let mut healthily_exited_samplers = Vec::new(); |
| while let Some(sampler_result) = spawned_tasks.next().await { |
| match sampler_result { |
| Err(e) => { |
| // TODO(https://fxbug.dev/42118220): Consider restarting the failed sampler depending on |
| // failure mode. |
| warn!("A spawned sampler has failed: {:?}", e); |
| execution_context_owned_stats.errorfully_exited_samplers.add(1); |
| } |
| Ok(ProjectSamplerTaskExit::RebootTriggered(sampler)) => { |
| healthily_exited_samplers.push(sampler); |
| execution_context_owned_stats.reboot_exited_samplers.add(1); |
| } |
| Ok(ProjectSamplerTaskExit::WorkCompleted) => { |
| info!("A sampler completed its workload, and exited."); |
| execution_context_owned_stats.healthily_exited_samplers.add(1); |
| } |
| } |
| } |
| |
| healthily_exited_samplers |
| }); |
| |
| TaskCancellation { |
| execution_context, |
| senders, |
| _sampler_executor_stats: task_cancellation_owned_stats, |
| } |
| } |
| } |
| |
| pub struct ProjectSampler { |
| archive_reader: ArchiveReader, |
| /// The metrics used by this Project. Indexed by moniker_to_selector_map. |
| metrics: Vec<MetricConfig>, |
| /// Cache from Inspect selector to last sampled property. This is the selector from |
| /// [`MetricConfig`]; it may contain wildcards. |
| metric_cache: HashMap<MetricCacheKey, Property>, |
| /// Cobalt logger proxy using this ProjectSampler's project id. It's an Option so it doesn't |
| /// have to be created for unit tests; it will always be Some() outside unit tests. |
| metric_loggers: HashMap<u32, MetricEventLoggerProxy>, |
| /// Map from moniker to relevant selectors. |
| moniker_to_selector_map: HashMap<String, Vec<SelectorIndexes>>, |
| /// The frequency with which we snapshot Inspect properties |
| /// for this project. |
| poll_rate_sec: i64, |
| /// Inspect stats on a node namespaced by this project's associated id. |
| /// It's an arc since a single project can have multiple samplers at |
| /// different frequencies, but we want a single project to have one node. |
| project_sampler_stats: Arc<ProjectSamplerStats>, |
| /// The id of the project. |
| /// Project ID that metrics are being sampled and forwarded on behalf of. |
| project_id: u32, |
| } |
| |
| #[derive(Debug, Eq, Hash, PartialEq)] |
| struct MetricCacheKey { |
| handle_name: Option<InspectHandleName>, |
| selector: String, |
| } |
| |
| #[derive(Clone, Debug)] |
| struct SelectorIndexes { |
| /// The index of the metric in [`ProjectSampler`]'s `metrics` list. |
| metric_index: usize, |
| /// The index of the selector in the [`MetricConfig`]'s `selectors` list. |
| selector_index: usize, |
| } |
| |
| pub enum ProjectSamplerTaskExit { |
| /// The [`ProjectSampler`] processed a reboot signal on its oneshot, and yielded |
| /// to the final-snapshot. |
| RebootTriggered(ProjectSampler), |
| /// The [`ProjectSampler`] has no more work to complete; perhaps all metrics were "upload_once"? |
| WorkCompleted, |
| } |
| |
| pub enum ProjectSamplerEvent { |
| TimerTriggered, |
| TimerDied, |
| RebootTriggered, |
| RebootChannelClosed(Error), |
| } |
| |
| /// Indicates whether a sampler in the project has been removed (set to None), in which case the |
| /// ArchiveAccessor should be reconfigured. |
| /// The selector lists may be consolidated (and thus the maps would be rebuilt), but |
| /// the program will run correctly whether they are or not. |
| #[derive(PartialEq)] |
| enum SnapshotOutcome { |
| SelectorsChanged, |
| SelectorsUnchanged, |
| } |
| |
| impl ProjectSampler { |
| pub async fn new( |
| config: Arc<ProjectConfig>, |
| metric_logger_factory: Arc<MetricEventLoggerFactoryProxy>, |
| minimum_sample_rate_sec: i64, |
| project_sampler_stats: Arc<ProjectSamplerStats>, |
| ) -> Result<ProjectSampler, Error> { |
| let customer_id = config.customer_id(); |
| let project_id = config.project_id; |
| let poll_rate_sec = config.poll_rate_sec; |
| if poll_rate_sec < minimum_sample_rate_sec { |
| return Err(format_err!( |
| concat!( |
| "Project with id: {:?} uses a polling rate:", |
| " {:?} below minimum configured poll rate: {:?}" |
| ), |
| project_id, |
| poll_rate_sec, |
| minimum_sample_rate_sec, |
| )); |
| } |
| |
| project_sampler_stats.project_sampler_count.add(1); |
| project_sampler_stats.metrics_configured.add(config.metrics.len() as u64); |
| |
| let mut metric_loggers = HashMap::new(); |
| // TODO(https://fxbug.dev/42071858): we should remove this once we support batching. There should be |
| // only one metric logger per ProjectSampler. |
| if project_id != 0 { |
| let (metric_logger_proxy, metrics_server_end) = |
| fidl::endpoints::create_proxy().context("Failed to create endpoints")?; |
| let mut project_spec = ProjectSpec::default(); |
| project_spec.customer_id = Some(customer_id); |
| project_spec.project_id = Some(project_id); |
| metric_logger_factory |
| .create_metric_event_logger(&project_spec, metrics_server_end) |
| .await? |
| .map_err(|e| format_err!("error response for project {}: {:?}", project_id, e))?; |
| metric_loggers.insert(project_id, metric_logger_proxy); |
| } |
| let mut all_selectors = Vec::<String>::new(); |
| for metric in &config.metrics { |
| if let Some(metric_project_id) = metric.project_id { |
| if metric_loggers.get(&metric_project_id).is_none() { |
| let (metric_logger_proxy, metrics_server_end) = |
| fidl::endpoints::create_proxy().context("Failed to create endpoints")?; |
| let mut project_spec = ProjectSpec::default(); |
| project_spec.customer_id = Some(customer_id); |
| project_spec.project_id = Some(metric_project_id); |
| metric_logger_factory |
| .create_metric_event_logger(&project_spec, metrics_server_end) |
| .await? |
| .map_err(|e| format_err!("error response for project {} while creating metric logger {}: {:?}", project_id, metric_project_id, e))?; |
| metric_loggers.insert(metric_project_id, metric_logger_proxy); |
| } |
| } |
| for selector_opt in metric.selectors.iter() { |
| if let Some(selector) = selector_opt { |
| all_selectors.push(selector.selector_string.clone()); |
| } |
| } |
| } |
| let mut project_sampler = ProjectSampler { |
| project_id, |
| archive_reader: ArchiveReader::new(), |
| moniker_to_selector_map: HashMap::new(), |
| metrics: config.metrics.clone(), |
| metric_cache: HashMap::new(), |
| metric_loggers, |
| poll_rate_sec, |
| project_sampler_stats, |
| }; |
| // Fill in archive_reader and moniker_to_selector_map |
| project_sampler.rebuild_selector_data_structures(); |
| Ok(project_sampler) |
| } |
| |
| pub fn spawn( |
| mut self, |
| mut reboot_oneshot: oneshot::Receiver<()>, |
| ) -> fasync::Task<Result<ProjectSamplerTaskExit, Error>> { |
| fasync::Task::spawn(async move { |
| let mut periodic_timer = |
| fasync::Interval::new(zx::Duration::from_seconds(self.poll_rate_sec)); |
| loop { |
| let done = select! { |
| opt = periodic_timer.next() => { |
| if opt.is_some() { |
| ProjectSamplerEvent::TimerTriggered |
| } else { |
| ProjectSamplerEvent::TimerDied |
| } |
| }, |
| oneshot_res = reboot_oneshot => { |
| match oneshot_res { |
| Ok(()) => { |
| ProjectSamplerEvent::RebootTriggered |
| }, |
| Err(e) => { |
| ProjectSamplerEvent::RebootChannelClosed( |
| format_err!("Oneshot closure error: {:?}", e)) |
| } |
| } |
| } |
| }; |
| |
| match done { |
| ProjectSamplerEvent::TimerDied => { |
| return Err(format_err!(concat!( |
| "The ProjectSampler timer died, something went wrong.", |
| ))); |
| } |
| ProjectSamplerEvent::RebootChannelClosed(e) => { |
| // TODO(https://fxbug.dev/42118220): Consider differentiating errors if |
| // we ever want to recover a sampler after a oneshot channel death. |
| return Err(format_err!( |
| concat!( |
| "The Reboot signaling oneshot died, something went wrong: {:?}", |
| ), |
| e |
| )); |
| } |
| ProjectSamplerEvent::RebootTriggered => { |
| // The reboot oneshot triggered, meaning it's time to perform |
| // our final snapshot. Return self to reuse most recent cache. |
| return Ok(ProjectSamplerTaskExit::RebootTriggered(self)); |
| } |
| ProjectSamplerEvent::TimerTriggered => { |
| self.process_next_snapshot().await?; |
| // Check whether this sampler |
| // still needs to run (perhaps all metrics were |
| // "upload_once"?). If it doesn't, we want to be |
| // sure that it is not included in the reboot-workload. |
| if self.is_all_done() { |
| return Ok(ProjectSamplerTaskExit::WorkCompleted); |
| } |
| } |
| } |
| } |
| }) |
| } |
| |
| async fn process_next_snapshot(&mut self) -> Result<(), Error> { |
| let snapshot_data = self.archive_reader.snapshot::<Inspect>().await?; |
| let events_to_log = self.process_snapshot(snapshot_data).await?; |
| self.log_events(events_to_log).await?; |
| Ok(()) |
| } |
| |
| async fn process_snapshot( |
| &mut self, |
| snapshot: Vec<Data<Inspect>>, |
| ) -> Result<Vec<EventToLog>, Error> { |
| let mut selectors_changed = false; |
| let mut events_to_log = vec![]; |
| for data_packet in snapshot.iter() { |
| match &data_packet.payload { |
| None => { |
| process_schema_errors(&data_packet.metadata.errors, &data_packet.moniker); |
| } |
| Some(payload) => { |
| let (selector_outcome, mut events) = self |
| .process_component_data( |
| payload, |
| &data_packet.metadata.name, |
| &data_packet.moniker, |
| ) |
| .await?; |
| if selector_outcome == SnapshotOutcome::SelectorsChanged { |
| selectors_changed = true; |
| } |
| events_to_log.append(&mut events); |
| } |
| } |
| } |
| if selectors_changed { |
| self.rebuild_selector_data_structures(); |
| } |
| Ok(events_to_log) |
| } |
| |
| fn is_all_done(&self) -> bool { |
| self.moniker_to_selector_map.is_empty() |
| } |
| |
| fn rebuild_selector_data_structures(&mut self) { |
| let mut all_selectors = vec![]; |
| for metric in &mut self.metrics { |
| // TODO(https://fxbug.dev/42168860): Using Box<ParsedSelector> could reduce copying. |
| let active_selectors = metric |
| .selectors |
| .iter() |
| .filter(|selector| selector.is_some()) |
| .map(|selector| selector.to_owned()) |
| .collect::<Vec<_>>(); |
| for selector in active_selectors.iter() { |
| // unwrap() is OK since we filtered for Some |
| all_selectors.push(selector.as_ref().unwrap().selector_string.to_owned()); |
| } |
| metric.selectors = SelectorList(active_selectors); |
| } |
| self.archive_reader = ArchiveReader::new(); |
| self.archive_reader.retry(RetryConfig::never()).add_selectors(all_selectors.into_iter()); |
| self.moniker_to_selector_map = HashMap::new(); |
| for (metric_index, metric) in self.metrics.iter().enumerate() { |
| for (selector_index, selector) in metric.selectors.iter().enumerate() { |
| let moniker = &selector.as_ref().unwrap().moniker; |
| if self.moniker_to_selector_map.get(moniker).is_none() { |
| self.moniker_to_selector_map.insert(moniker.clone(), Vec::new()); |
| } |
| self.moniker_to_selector_map |
| .get_mut(moniker) |
| .unwrap() |
| .push(SelectorIndexes { metric_index, selector_index }); |
| } |
| } |
| } |
| |
| async fn process_component_data( |
| &mut self, |
| payload: &DiagnosticsHierarchy, |
| diagnostics_filename: &Option<InspectHandleName>, |
| moniker: &String, |
| ) -> Result<(SnapshotOutcome, Vec<EventToLog>), Error> { |
| let indexes_opt = &self.moniker_to_selector_map.get(moniker); |
| let selector_indexes = match indexes_opt { |
| None => { |
| warn!(%moniker, "Moniker not found in map"); |
| return Ok((SnapshotOutcome::SelectorsUnchanged, vec![])); |
| } |
| Some(indexes) => indexes.to_vec(), |
| }; |
| // We cloned the selector indexes. Whatever we do in this function must not invalidate them. |
| let mut snapshot_outcome = SnapshotOutcome::SelectorsUnchanged; |
| let mut events_to_log = vec![]; |
| for index_info in selector_indexes.iter() { |
| let SelectorIndexes { metric_index, selector_index } = index_info; |
| let metric = &self.metrics[*metric_index]; |
| let project_id = metric.project_id.unwrap_or(self.project_id); |
| // It's fine if a selector has been removed and is None. |
| if let Some(ParsedSelector { selector, selector_string, .. }) = |
| &metric.selectors[*selector_index] |
| { |
| let found_properties = |
| diagnostics_hierarchy::select_from_hierarchy(&payload, &selector)?; |
| match found_properties.len() { |
| // Maybe the data hasn't been published yet. Maybe another selector in this |
| // metric is the correct one to find the data. Either way, not-found is fine. |
| 0 => {} |
| 1 => { |
| // export_sample() needs mut self, so we can't pass in values directly from |
| // metric, since metric is a ref into data contained in self; |
| // we have to copy them out first. |
| let metric_type = metric.metric_type; |
| let metric_id = metric.metric_id; |
| let codes = metric.event_codes.clone(); |
| let metric_cache_key = MetricCacheKey { |
| handle_name: diagnostics_filename.clone(), |
| selector: selector_string.to_string(), |
| }; |
| if let Some(event) = self |
| .prepare_sample( |
| metric_type, |
| metric_id, |
| codes, |
| metric_cache_key, |
| &found_properties[0], |
| ) |
| .await? |
| { |
| if let Some(parsed_selector) = |
| &self.metrics[*metric_index].selectors[*selector_index] |
| { |
| parsed_selector.increment_upload_count(); |
| } |
| events_to_log.push((project_id, event)); |
| } |
| if self.update_metric_selectors(index_info) { |
| snapshot_outcome = SnapshotOutcome::SelectorsChanged; |
| } |
| } |
| too_many => warn!(%too_many, %selector_string, "Too many matches for selector"), |
| } |
| } |
| } |
| Ok((snapshot_outcome, events_to_log)) |
| } |
| |
| /// Handle selectors that may be removed (e.g. if upload_once is set). Return true if the |
| /// selector was changed/removed, false otherwise. |
| fn update_metric_selectors(&mut self, index_info: &SelectorIndexes) -> bool { |
| let metric = &mut self.metrics[index_info.metric_index]; |
| if let Some(true) = metric.upload_once { |
| for selector in metric.selectors.iter_mut() { |
| *selector = None; |
| } |
| return true; |
| } |
| let mut deleted = false; |
| for (index, selector) in metric.selectors.iter_mut().enumerate() { |
| if index != index_info.selector_index && *selector != None { |
| *selector = None; |
| deleted = true; |
| } |
| } |
| return deleted; |
| } |
| |
| async fn prepare_sample( |
| &mut self, |
| metric_type: DataType, |
| metric_id: u32, |
| event_codes: Vec<u32>, |
| metric_cache_key: MetricCacheKey, |
| new_sample: &Property, |
| ) -> Result<Option<MetricEvent>, Error> { |
| let previous_sample_opt: Option<&Property> = self.metric_cache.get(&metric_cache_key); |
| |
| if let Some(payload) = process_sample_for_data_type( |
| new_sample, |
| previous_sample_opt, |
| &metric_cache_key, |
| &metric_type, |
| ) { |
| self.maybe_update_cache(new_sample, &metric_type, metric_cache_key); |
| Ok(Some(MetricEvent { metric_id, event_codes, payload })) |
| } else { |
| Ok(None) |
| } |
| } |
| |
| async fn log_events(&mut self, events: Vec<EventToLog>) -> Result<(), Error> { |
| for (project_id, event) in events.into_iter() { |
| self.metric_loggers |
| .get(&project_id) |
| .as_ref() |
| .unwrap() |
| .log_metric_events(&[event]) |
| .await? |
| .map_err(|e| format_err!("error from cobalt: {:?}", e))?; |
| self.project_sampler_stats.cobalt_logs_sent.add(1); |
| } |
| Ok(()) |
| } |
| |
| fn maybe_update_cache( |
| &mut self, |
| new_sample: &Property, |
| data_type: &DataType, |
| metric_cache_key: MetricCacheKey, |
| ) { |
| match data_type { |
| DataType::Occurrence | DataType::IntHistogram => { |
| self.metric_cache.insert(metric_cache_key, new_sample.clone()); |
| } |
| DataType::Integer | DataType::String => (), |
| } |
| } |
| } |
| |
| fn process_sample_for_data_type( |
| new_sample: &Property, |
| previous_sample_opt: Option<&Property>, |
| data_source: &MetricCacheKey, |
| data_type: &DataType, |
| ) -> Option<MetricEventPayload> { |
| let event_payload_res = match data_type { |
| DataType::Occurrence => process_occurence(new_sample, previous_sample_opt, data_source), |
| DataType::IntHistogram => { |
| process_int_histogram(new_sample, previous_sample_opt, data_source) |
| } |
| DataType::Integer => { |
| // If we previously cached a metric with an int-type, log a warning and ignore it. |
| // This may be a case of using a single selector for two metrics, one event count |
| // and one int. |
| if previous_sample_opt.is_some() { |
| warn!("Sampler has erroneously cached an Int type metric: {:?}", data_source); |
| } |
| process_int(new_sample, data_source) |
| } |
| DataType::String => { |
| if previous_sample_opt.is_some() { |
| warn!("Sampler has erroneously cached a String type metric: {:?}", data_source); |
| } |
| process_string(new_sample, data_source) |
| } |
| }; |
| |
| match event_payload_res { |
| Ok(payload_opt) => payload_opt, |
| Err(err) => { |
| warn!(?data_source, ?err, "Failed to process Inspect property for cobalt",); |
| None |
| } |
| } |
| } |
| |
| /// It's possible for Inspect numerical properties to experience overflows/conversion |
| /// errors when being mapped to Cobalt types. Sanitize these numericals, and provide |
| /// meaningful errors. |
| fn sanitize_unsigned_numerical(diff: u64, data_source: &MetricCacheKey) -> Result<i64, Error> { |
| match diff.try_into() { |
| Ok(diff) => Ok(diff), |
| Err(e) => { |
| return Err(format_err!( |
| concat!( |
| "Selector used for EventCount type", |
| " refered to an unsigned int property,", |
| " but cobalt requires i64, and casting introduced overflow", |
| " which produces a negative int: {:?}. This could be due to", |
| " a single sample being larger than i64, or a diff between", |
| " samples being larger than i64. Error: {:?}" |
| ), |
| data_source, |
| e |
| )); |
| } |
| } |
| } |
| |
| fn process_int_histogram( |
| new_sample: &Property, |
| prev_sample_opt: Option<&Property>, |
| data_source: &MetricCacheKey, |
| ) -> Result<Option<MetricEventPayload>, Error> { |
| let diff = match prev_sample_opt { |
| None => convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?, |
| Some(prev_sample) => { |
| // If the data type changed then we just reset the cache. |
| if std::mem::discriminant(new_sample) == std::mem::discriminant(prev_sample) { |
| compute_histogram_diff(new_sample, prev_sample, data_source)? |
| } else { |
| convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)? |
| } |
| } |
| }; |
| |
| let non_empty_diff: Vec<HistogramBucket> = diff.into_iter().filter(|v| v.count != 0).collect(); |
| if !non_empty_diff.is_empty() { |
| Ok(Some(MetricEventPayload::Histogram(non_empty_diff))) |
| } else { |
| Ok(None) |
| } |
| } |
| |
| fn compute_histogram_diff( |
| new_sample: &Property, |
| old_sample: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<Vec<HistogramBucket>, Error> { |
| let new_histogram_buckets = |
| convert_inspect_histogram_to_cobalt_histogram(new_sample, data_source)?; |
| let old_histogram_buckets = |
| convert_inspect_histogram_to_cobalt_histogram(old_sample, data_source)?; |
| |
| if old_histogram_buckets.len() != new_histogram_buckets.len() { |
| return Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect IntArray", |
| " that was specified as an IntHistogram type ", |
| " but the histogram bucket count changed between", |
| " samples, which is incompatible with Cobalt.", |
| " Selector: {:?}, Inspect type: {}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )); |
| } |
| |
| new_histogram_buckets |
| .iter() |
| .zip(old_histogram_buckets) |
| .map(|(new_bucket, old_bucket)| { |
| if new_bucket.count < old_bucket.count { |
| return Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect IntArray", |
| " that was specified as an IntHistogram type ", |
| " but at least one bucket saw the count decrease", |
| " between samples, which is incompatible with Cobalt's", |
| " need for monotonically increasing counts.", |
| " Selector: {:?}, Inspect type: {}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )); |
| } |
| Ok(HistogramBucket { |
| count: new_bucket.count - old_bucket.count, |
| index: new_bucket.index, |
| }) |
| }) |
| .collect::<Result<Vec<HistogramBucket>, Error>>() |
| } |
| |
| fn build_cobalt_histogram(counts: impl Iterator<Item = u64>) -> Vec<HistogramBucket> { |
| counts |
| .enumerate() |
| .map(|(index, count)| HistogramBucket { index: index as u32, count }) |
| .collect() |
| } |
| |
| fn build_sparse_cobalt_histogram<'a>( |
| counts: impl Iterator<Item = u64>, |
| indexes: &[usize], |
| size: usize, |
| ) -> Vec<HistogramBucket> { |
| let mut histogram = |
| Vec::from_iter((0..size).map(|index| HistogramBucket { index: index as u32, count: 0 })); |
| for (index, count) in indexes.iter().zip(counts) { |
| histogram[*index].count = count; |
| } |
| histogram |
| } |
| |
| fn convert_inspect_histogram_to_cobalt_histogram( |
| inspect_histogram: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<Vec<HistogramBucket>, Error> { |
| macro_rules! err {($($message:expr),+) => { |
| Err(format_err!( |
| concat!($($message),+ , " Selector: {:?}, Inspect type: {}"), |
| data_source, |
| inspect_histogram.discriminant_name() |
| )) |
| }} |
| |
| let sanitize_size = |size: usize| -> Result<(), Error> { |
| if size > u32::MAX as usize { |
| return err!( |
| "Selector referenced an Inspect array", |
| " that was specified as a histogram type ", |
| " but contained an index too large for a u32." |
| ); |
| } |
| Ok(()) |
| }; |
| |
| let sanitize_indexes = |indexes: &[usize], size: usize| -> Result<(), Error> { |
| for index in indexes.iter() { |
| if *index >= size { |
| return err!( |
| "Selector referenced an Inspect array", |
| " that was specified as a histogram type ", |
| " but contained an invalid index." |
| ); |
| } |
| } |
| Ok(()) |
| }; |
| |
| let sanitize_counts = |counts: &[i64]| -> Result<(), Error> { |
| for count in counts.iter() { |
| if *count < 0 { |
| return err!( |
| "Selector referenced an Inspect IntArray", |
| " that was specified as an IntHistogram type ", |
| " but a bucket contained a negative count. This", |
| " is incompatible with Cobalt histograms which only", |
| " support positive histogram counts." |
| ); |
| } |
| } |
| Ok(()) |
| }; |
| |
| let histogram = match inspect_histogram { |
| Property::IntArray( |
| _, |
| ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }), |
| ) |
| | Property::IntArray( |
| _, |
| ArrayContent::ExponentialHistogram(ExponentialHistogram { |
| counts, indexes, size, .. |
| }), |
| ) => { |
| sanitize_size(*size)?; |
| sanitize_counts(counts)?; |
| match (indexes, counts) { |
| (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c as u64)), |
| (Some(indexes), counts) => { |
| sanitize_indexes(indexes, *size)?; |
| build_sparse_cobalt_histogram(counts.iter().map(|c| *c as u64), indexes, *size) |
| } |
| } |
| } |
| Property::UintArray( |
| _, |
| ArrayContent::LinearHistogram(LinearHistogram { counts, indexes, size, .. }), |
| ) |
| | Property::UintArray( |
| _, |
| ArrayContent::ExponentialHistogram(ExponentialHistogram { |
| counts, indexes, size, .. |
| }), |
| ) => { |
| sanitize_size(*size)?; |
| match (indexes, counts) { |
| (None, counts) => build_cobalt_histogram(counts.iter().map(|c| *c)), |
| (Some(indexes), counts) => { |
| sanitize_indexes(indexes, *size)?; |
| build_sparse_cobalt_histogram(counts.iter().map(|c| *c), indexes, *size) |
| } |
| } |
| } |
| _ => { |
| // TODO(https://fxbug.dev/42118220): Does cobalt support floors or step counts that are |
| // not ints? if so, we can support that as well with double arrays if the |
| // actual counts are whole numbers. |
| return Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect property", |
| " that was specified as an IntHistogram type ", |
| " but is unable to be encoded in a cobalt HistogramBucket", |
| " vector. Selector: {:?}, Inspect type: {}" |
| ), |
| data_source, |
| inspect_histogram.discriminant_name() |
| )); |
| } |
| }; |
| Ok(histogram) |
| } |
| |
| fn process_int( |
| new_sample: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<Option<MetricEventPayload>, Error> { |
| let sampled_int = match new_sample { |
| Property::Uint(_, val) => sanitize_unsigned_numerical(val.clone(), data_source)?, |
| Property::Int(_, val) => val.clone(), |
| _ => { |
| return Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect property", |
| " that was specified as an Int type ", |
| " but is unable to be encoded in an i64", |
| " Selector: {:?}, Inspect type: {}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )); |
| } |
| }; |
| |
| Ok(Some(MetricEventPayload::IntegerValue(sampled_int))) |
| } |
| |
| fn process_string( |
| new_sample: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<Option<MetricEventPayload>, Error> { |
| let sampled_string = match new_sample { |
| Property::String(_, val) => val.clone(), |
| _ => { |
| return Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect property specified as String", |
| " but property is not type String.", |
| " Selector: {:?}, Inspect type: {}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )); |
| } |
| }; |
| |
| Ok(Some(MetricEventPayload::StringValue(sampled_string))) |
| } |
| |
| fn process_occurence( |
| new_sample: &Property, |
| prev_sample_opt: Option<&Property>, |
| data_source: &MetricCacheKey, |
| ) -> Result<Option<MetricEventPayload>, Error> { |
| let diff = match prev_sample_opt { |
| None => compute_initial_event_count(new_sample, data_source)?, |
| Some(prev_sample) => compute_event_count_diff(new_sample, prev_sample, data_source)?, |
| }; |
| |
| if diff < 0 { |
| return Err(format_err!( |
| concat!( |
| "Event count must be monotonically increasing,", |
| " but we observed a negative event count diff for: {:?}" |
| ), |
| data_source |
| )); |
| } |
| |
| if diff == 0 { |
| return Ok(None); |
| } |
| |
| // TODO(https://fxbug.dev/42118220): Once fuchsia.cobalt is gone, we don't need to preserve |
| // occurrence counts "fitting" into i64s. |
| Ok(Some(MetricEventPayload::Count(diff as u64))) |
| } |
| |
| fn compute_initial_event_count( |
| new_sample: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<i64, Error> { |
| match new_sample { |
| Property::Uint(_, val) => sanitize_unsigned_numerical(val.clone(), data_source), |
| Property::Int(_, val) => Ok(val.clone()), |
| _ => Err(format_err!( |
| concat!( |
| "Selector referenced an Inspect property", |
| " that is not compatible with cached", |
| " transformation to an event count.", |
| " Selector: {:?}, {}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )), |
| } |
| } |
| |
| fn compute_event_count_diff( |
| new_sample: &Property, |
| old_sample: &Property, |
| data_source: &MetricCacheKey, |
| ) -> Result<i64, Error> { |
| match (new_sample, old_sample) { |
| // We don't need to validate that old_count and new_count are positive here. |
| // If new_count was negative, and old_count was positive, then the diff would be |
| // negative, which is an errorful state. It's impossible for old_count to be negative |
| // as either it was the first sample which would make a negative diff which is an error, |
| // or it was a negative new_count with a positive old_count, which we've already shown will |
| // produce an errorful state. |
| (Property::Int(_, new_count), Property::Int(_, old_count)) => Ok(new_count - old_count), |
| (Property::Uint(_, new_count), Property::Uint(_, old_count)) => { |
| // u64::MAX will cause sanitized_unsigned_numerical to build an |
| // appropriate error message for a subtraction underflow. |
| sanitize_unsigned_numerical( |
| new_count.checked_sub(*old_count).unwrap_or(u64::MAX), |
| data_source, |
| ) |
| } |
| // If we have a correctly typed new sample, but it didn't match either of the above cases, |
| // this means the new sample changed types compared to the old sample. We should just |
| // restart the cache, and treat the new sample as a first observation. |
| (_, Property::Uint(_, _)) | (_, Property::Int(_, _)) => { |
| warn!( |
| "Inspect type of sampled data changed between samples. Restarting cache. {:?}", |
| data_source |
| ); |
| compute_initial_event_count(new_sample, data_source) |
| } |
| _ => Err(format_err!( |
| concat!( |
| "Inspect type of sampled data changed between samples", |
| " to a type incompatible with event counters.", |
| " Selector: {:?}, New type: {:?}" |
| ), |
| data_source, |
| new_sample.discriminant_name() |
| )), |
| } |
| } |
| |
| fn process_schema_errors(errors: &Option<Vec<diagnostics_data::InspectError>>, moniker: &String) { |
| match errors { |
| Some(errors) => { |
| for error in errors { |
| if !error.message.contains("Inspect hierarchy was fully filtered") { |
| warn!("Moniker: {}, Error: {:?}", moniker, error); |
| } |
| } |
| } |
| None => { |
| warn!("Moniker: {} encountered null payload and no errors.", moniker); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use diagnostics_hierarchy::hierarchy; |
| use futures::executor; |
| |
| /// Test inserting a string into the hierarchy that requires escaping. |
| #[fuchsia::test] |
| fn test_process_payload_with_escapes() { |
| let unescaped: String = "path/to".to_string(); |
| let hierarchy = hierarchy! { |
| root: { |
| var unescaped: { |
| value: 0, |
| "value/with:escapes": 0, |
| } |
| } |
| }; |
| |
| let mut sampler = ProjectSampler { |
| archive_reader: ArchiveReader::new(), |
| moniker_to_selector_map: HashMap::new(), |
| metrics: vec![], |
| metric_cache: HashMap::new(), |
| metric_loggers: HashMap::new(), |
| project_id: 1, |
| poll_rate_sec: 3600, |
| project_sampler_stats: Arc::new(ProjectSamplerStats::new()), |
| }; |
| let selector: String = "my/component:root/path\\/to:value".to_string(); |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test(&selector)]), |
| metric_id: 1, |
| // Occurrence type with a value of zero will not attempt to use any loggers. |
| metric_type: DataType::Occurrence, |
| event_codes: Vec::new(), |
| // upload_once means that process_component_data will return SelectorsChanged if |
| // it is found in the map. |
| upload_once: Some(true), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| match executor::block_on(sampler.process_component_data( |
| &hierarchy, |
| &Some(InspectHandleName::filename("a_filename")), |
| &"my/component".to_string(), |
| )) { |
| // This selector will be found and removed from the map, resulting in a |
| // SelectorsChanged response. |
| Ok((SnapshotOutcome::SelectorsChanged, _events)) => (), |
| _ => panic!("Expecting SelectorsChanged from process_component_data."), |
| } |
| |
| let selector_with_escaped_property: String = |
| "my/component:root/path\\/to:value\\/with\\:escapes".to_string(); |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test( |
| &selector_with_escaped_property, |
| )]), |
| metric_id: 1, |
| // Occurrence type with a value of zero will not attempt to use any loggers. |
| metric_type: DataType::Occurrence, |
| event_codes: Vec::new(), |
| // upload_once means that the method will return SelectorsChanged if it is found |
| // in the map. |
| upload_once: Some(true), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| match executor::block_on(sampler.process_component_data( |
| &hierarchy, |
| &Some(InspectHandleName::filename("a_filename")), |
| &"my/component".to_string(), |
| )) { |
| // This selector will be found and removed from the map, resulting in a |
| // SelectorsChanged response. |
| Ok((SnapshotOutcome::SelectorsChanged, _events)) => (), |
| _ => panic!("Expecting SelectorsChanged from process_component_data."), |
| } |
| |
| let selector_unfound: String = "my/component:root/path/to:value".to_string(); |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test( |
| &selector_unfound, |
| )]), |
| metric_id: 1, |
| // Occurrence type with a value of zero will not attempt to use any loggers. |
| metric_type: DataType::Occurrence, |
| event_codes: Vec::new(), |
| // upload_once means that the method will return SelectorsChanged if it is found |
| // in the map. |
| upload_once: Some(true), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| match executor::block_on(sampler.process_component_data( |
| &hierarchy, |
| &Some(InspectHandleName::filename("a_filename")), |
| &"my/component".to_string(), |
| )) { |
| // This selector will not be found and removed from the map, resulting in SelectorsUnchanged. |
| Ok((SnapshotOutcome::SelectorsUnchanged, _events)) => (), |
| _ => panic!("Expecting SelectorsUnchanged from process_component_data."), |
| } |
| } |
| |
| /// Test that a decreasing occurrence type (which is not allowed) doesn't crash due to e.g. |
| /// unchecked unsigned subtraction overflow. |
| #[fuchsia::test] |
| fn decreasing_occurrence_is_correct() { |
| let big_number = Property::Uint("foo".to_string(), 5); |
| let small_number = Property::Uint("foo".to_string(), 2); |
| let key = MetricCacheKey { |
| handle_name: Some(InspectHandleName::filename("some_file")), |
| selector: "sel".to_string(), |
| }; |
| |
| assert_eq!( |
| process_sample_for_data_type( |
| &big_number, |
| Some(&small_number), |
| &key, |
| &DataType::Occurrence |
| ), |
| Some(MetricEventPayload::Count(3)) |
| ); |
| assert_eq!( |
| process_sample_for_data_type( |
| &small_number, |
| Some(&big_number), |
| &key, |
| &DataType::Occurrence |
| ), |
| None |
| ); |
| } |
| |
| /// Test removal of selectors marked with upload_once. |
| #[fuchsia::test] |
| fn test_upload_once() { |
| let hierarchy = hierarchy! { |
| root: { |
| value_one: 0, |
| value_two: 1, |
| } |
| }; |
| |
| let mut sampler = ProjectSampler { |
| archive_reader: ArchiveReader::new(), |
| moniker_to_selector_map: HashMap::new(), |
| metrics: vec![], |
| metric_cache: HashMap::new(), |
| metric_loggers: HashMap::new(), |
| project_id: 1, |
| poll_rate_sec: 3600, |
| project_sampler_stats: Arc::new(ProjectSamplerStats::new()), |
| }; |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test( |
| "my/component:root:value_one", |
| )]), |
| metric_id: 1, |
| metric_type: DataType::Integer, |
| event_codes: Vec::new(), |
| upload_once: Some(true), |
| }); |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test( |
| "my/component:root:value_two", |
| )]), |
| metric_id: 2, |
| metric_type: DataType::Integer, |
| event_codes: Vec::new(), |
| upload_once: Some(true), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| |
| // Both selectors should be found and removed from the map. |
| match executor::block_on(sampler.process_component_data( |
| &hierarchy, |
| &Some(InspectHandleName::filename("a_filename")), |
| &"my/component".to_string(), |
| )) { |
| Ok((SnapshotOutcome::SelectorsChanged, _events)) => (), |
| _ => panic!("Expecting SelectorsChanged from process_component_data."), |
| } |
| |
| let selector_indices = sampler.moniker_to_selector_map.get("my/component").unwrap(); |
| for index_info in selector_indices { |
| let metric = &sampler.metrics[index_info.metric_index]; |
| let selector = &metric.selectors[index_info.selector_index]; |
| assert!(selector.is_none()); |
| } |
| } |
| |
| struct EventCountTesterParams { |
| new_val: Property, |
| old_val: Option<Property>, |
| process_ok: bool, |
| event_made: bool, |
| diff: i64, |
| } |
| |
| fn process_occurence_tester(params: EventCountTesterParams) { |
| let data_source = MetricCacheKey { |
| handle_name: Some(InspectHandleName::filename("foo.file")), |
| selector: "test:root:count".to_string(), |
| }; |
| let event_res = process_occurence(¶ms.new_val, params.old_val.as_ref(), &data_source); |
| |
| if !params.process_ok { |
| assert!(event_res.is_err()); |
| return; |
| } |
| |
| assert!(event_res.is_ok()); |
| |
| let event_opt = event_res.unwrap(); |
| |
| if !params.event_made { |
| assert!(event_opt.is_none()); |
| return; |
| } |
| |
| assert!(event_opt.is_some()); |
| let event = event_opt.unwrap(); |
| match event { |
| MetricEventPayload::Count(count) => { |
| assert_eq!(count, params.diff as u64); |
| } |
| _ => panic!("Expecting event counts."), |
| } |
| } |
| |
| #[fuchsia::test] |
| fn test_normal_process_occurence() { |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), 1), |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: 1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), 1), |
| old_val: Some(Property::Int("count".to_string(), 1)), |
| process_ok: true, |
| event_made: false, |
| diff: -1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), 3), |
| old_val: Some(Property::Int("count".to_string(), 1)), |
| process_ok: true, |
| event_made: true, |
| diff: 2, |
| }); |
| } |
| |
| #[fuchsia::test] |
| fn test_data_type_changing_process_occurence() { |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), 1), |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: 1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Uint("count".to_string(), 1), |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: 1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Uint("count".to_string(), 3), |
| old_val: Some(Property::Int("count".to_string(), 1)), |
| process_ok: true, |
| event_made: true, |
| diff: 3, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::String("count".to_string(), "big_oof".to_string()), |
| old_val: Some(Property::Int("count".to_string(), 1)), |
| process_ok: false, |
| event_made: false, |
| diff: -1, |
| }); |
| } |
| |
| #[fuchsia::test] |
| fn test_event_count_negatives_and_overflows() { |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), -11), |
| old_val: None, |
| process_ok: false, |
| event_made: false, |
| diff: -1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Int("count".to_string(), 9), |
| old_val: Some(Property::Int("count".to_string(), 10)), |
| process_ok: false, |
| event_made: false, |
| diff: -1, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Uint("count".to_string(), u64::MAX), |
| old_val: None, |
| process_ok: false, |
| event_made: false, |
| diff: -1, |
| }); |
| |
| let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap(); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1), |
| old_val: Some(Property::Uint("count".to_string(), 1)), |
| process_ok: true, |
| event_made: true, |
| diff: i64::MAX, |
| }); |
| |
| process_occurence_tester(EventCountTesterParams { |
| new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 2), |
| old_val: Some(Property::Uint("count".to_string(), 1)), |
| process_ok: false, |
| event_made: false, |
| diff: -1, |
| }); |
| } |
| |
| struct IntTesterParams { |
| new_val: Property, |
| process_ok: bool, |
| sample: i64, |
| } |
| |
| fn process_int_tester(params: IntTesterParams) { |
| let data_source = MetricCacheKey { |
| handle_name: Some(InspectHandleName::filename("foo.file")), |
| selector: "test:root:count".to_string(), |
| }; |
| let event_res = process_int(¶ms.new_val, &data_source); |
| |
| if !params.process_ok { |
| assert!(event_res.is_err()); |
| return; |
| } |
| |
| assert!(event_res.is_ok()); |
| |
| let event = event_res.expect("event should be Ok").expect("event should be Some"); |
| match event { |
| MetricEventPayload::IntegerValue(val) => { |
| assert_eq!(val, params.sample); |
| } |
| _ => panic!("Expecting event counts."), |
| } |
| } |
| |
| #[fuchsia::test] |
| fn test_normal_process_int() { |
| process_int_tester(IntTesterParams { |
| new_val: Property::Int("count".to_string(), 13), |
| process_ok: true, |
| sample: 13, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Int("count".to_string(), -13), |
| process_ok: true, |
| sample: -13, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Int("count".to_string(), 0), |
| process_ok: true, |
| sample: 0, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Uint("count".to_string(), 13), |
| process_ok: true, |
| sample: 13, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::String("count".to_string(), "big_oof".to_string()), |
| process_ok: false, |
| sample: -1, |
| }); |
| } |
| |
| #[fuchsia::test] |
| fn test_int_edge_cases() { |
| process_int_tester(IntTesterParams { |
| new_val: Property::Int("count".to_string(), i64::MAX), |
| process_ok: true, |
| sample: i64::MAX, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Int("count".to_string(), std::i64::MIN), |
| process_ok: true, |
| sample: std::i64::MIN, |
| }); |
| |
| let i64_max_in_u64: u64 = i64::MAX.try_into().unwrap(); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Uint("count".to_string(), i64_max_in_u64), |
| process_ok: true, |
| sample: i64::MAX, |
| }); |
| |
| process_int_tester(IntTesterParams { |
| new_val: Property::Uint("count".to_string(), i64_max_in_u64 + 1), |
| process_ok: false, |
| sample: -1, |
| }); |
| } |
| |
| struct StringTesterParams { |
| sample: Property, |
| process_ok: bool, |
| previous_sample: Option<Property>, |
| } |
| |
| fn process_string_tester(params: StringTesterParams) { |
| let metric_cache_key = MetricCacheKey { |
| handle_name: Some(InspectHandleName::filename("foo.file")), |
| selector: "test:root:string_val".to_string(), |
| }; |
| |
| let event = process_sample_for_data_type( |
| ¶ms.sample, |
| params.previous_sample.as_ref(), |
| &metric_cache_key, |
| &DataType::String, |
| ); |
| |
| if !params.process_ok { |
| assert!(event.is_none()); |
| return; |
| } |
| |
| match event.unwrap() { |
| MetricEventPayload::StringValue(val) => { |
| assert_eq!(val.as_str(), params.sample.string().unwrap()); |
| } |
| _ => panic!("Expecting event with StringValue."), |
| } |
| } |
| |
| #[fuchsia::test] |
| fn test_process_string() { |
| process_string_tester(StringTesterParams { |
| sample: Property::String("string_val".to_string(), "Hello, world!".to_string()), |
| process_ok: true, |
| previous_sample: None, |
| }); |
| |
| // Ensure any erroneously cached values are ignored (a warning is logged in this case). |
| |
| process_string_tester(StringTesterParams { |
| sample: Property::String("string_val".to_string(), "Hello, world!".to_string()), |
| process_ok: true, |
| previous_sample: Some(Property::String("string_val".to_string(), "Uh oh!".to_string())), |
| }); |
| |
| // Ensure unsupported property types are not erroneously processed. |
| |
| process_string_tester(StringTesterParams { |
| sample: Property::Int("string_val".to_string(), 123), |
| process_ok: false, |
| previous_sample: None, |
| }); |
| |
| process_string_tester(StringTesterParams { |
| sample: Property::Uint("string_val".to_string(), 123), |
| process_ok: false, |
| previous_sample: None, |
| }); |
| } |
| |
| fn convert_vector_to_int_histogram(hist: Vec<i64>) -> Property { |
| let size = hist.len(); |
| Property::IntArray( |
| "Bloop".to_string(), |
| ArrayContent::LinearHistogram(LinearHistogram { |
| floor: 1, |
| step: 1, |
| counts: hist, |
| size, |
| indexes: None, |
| }), |
| ) |
| } |
| |
| fn convert_vector_to_uint_histogram(hist: Vec<u64>) -> Property<String> { |
| let size = hist.len(); |
| Property::UintArray( |
| "Bloop".to_string(), |
| ArrayContent::LinearHistogram(LinearHistogram { |
| floor: 1, |
| step: 1, |
| counts: hist, |
| size, |
| indexes: None, |
| }), |
| ) |
| } |
| |
| // Produce condensed histograms. Size is arbitrary 100 - indexes must be less than that. |
| fn convert_vectors_to_int_histogram(counts: Vec<i64>, indexes: Vec<usize>) -> Property<String> { |
| let size = 100; |
| Property::IntArray( |
| "Bloop".to_string(), |
| ArrayContent::LinearHistogram(LinearHistogram { |
| floor: 1, |
| step: 1, |
| counts, |
| size, |
| indexes: Some(indexes), |
| }), |
| ) |
| } |
| |
| fn convert_vectors_to_uint_histogram( |
| counts: Vec<u64>, |
| indexes: Vec<usize>, |
| ) -> Property<String> { |
| let size = 100; |
| Property::UintArray( |
| "Bloop".to_string(), |
| ArrayContent::LinearHistogram(LinearHistogram { |
| floor: 1, |
| step: 1, |
| counts, |
| size, |
| indexes: Some(indexes), |
| }), |
| ) |
| } |
| |
| struct IntHistogramTesterParams { |
| new_val: Property, |
| old_val: Option<Property>, |
| process_ok: bool, |
| event_made: bool, |
| diff: Vec<(u32, u64)>, |
| } |
| fn process_int_histogram_tester(params: IntHistogramTesterParams) { |
| let data_source = MetricCacheKey { |
| handle_name: Some(InspectHandleName::filename("foo.file")), |
| selector: "test:root:count".to_string(), |
| }; |
| let event_res = |
| process_int_histogram(¶ms.new_val, params.old_val.as_ref(), &data_source); |
| |
| if !params.process_ok { |
| assert!(event_res.is_err()); |
| return; |
| } |
| |
| assert!(event_res.is_ok()); |
| |
| let event_opt = event_res.unwrap(); |
| if !params.event_made { |
| assert!(event_opt.is_none()); |
| return; |
| } |
| |
| assert!(event_opt.is_some()); |
| |
| let event = event_opt.unwrap(); |
| match event.clone() { |
| MetricEventPayload::Histogram(histogram_buckets) => { |
| assert_eq!(histogram_buckets.len(), params.diff.len()); |
| |
| let expected_histogram_buckets = params |
| .diff |
| .iter() |
| .map(|(index, count)| HistogramBucket { index: *index, count: *count }) |
| .collect::<Vec<HistogramBucket>>(); |
| |
| assert_eq!(histogram_buckets, expected_histogram_buckets); |
| } |
| _ => panic!("Expecting int histogram."), |
| } |
| } |
| |
| /// Test that simple in-bounds first-samples of both types of Inspect histograms |
| /// produce correct event types. |
| #[fuchsia::test] |
| fn test_normal_process_int_histogram() { |
| let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, 1, 1]); |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]); |
| |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_i64_sample, |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)], |
| }); |
| |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, 1), (1, 1), (2, 1), (3, 1)], |
| }); |
| |
| // Test an Inspect uint histogram at the boundaries of the type produce valid |
| // cobalt events. |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![u64::MAX, u64::MAX, u64::MAX]); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: None, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, u64::MAX), (1, u64::MAX), (2, u64::MAX)], |
| }); |
| |
| // Test that an empty Inspect histogram produces no event. |
| let new_u64_sample = convert_vector_to_uint_histogram(Vec::new()); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: None, |
| process_ok: true, |
| event_made: false, |
| diff: Vec::new(), |
| }); |
| |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![0, 0, 0, 0]); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: None, |
| process_ok: true, |
| event_made: false, |
| diff: Vec::new(), |
| }); |
| |
| // Test that monotonically increasing histograms are good!. |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 2, 1]); |
| let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 0, 1])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: old_u64_sample, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, 1), (2, 2)], |
| }); |
| |
| let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]); |
| let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_i64_sample, |
| old_val: old_i64_sample, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, 4), (1, 1), (3, 2)], |
| }); |
| |
| // Test that changing the histogram type resets the cache. |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![2, 1, 1, 1]); |
| let old_i64_sample = Some(convert_vector_to_int_histogram(vec![1, 1, 1, 1])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: old_i64_sample, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(0, 2), (1, 1), (2, 1), (3, 1)], |
| }); |
| } |
| |
| // Test that we can handle condensed int and uint histograms, even with indexes out of order |
| #[fuchsia::test] |
| fn test_normal_process_condensed_histograms() { |
| let new_u64_sample = convert_vectors_to_int_histogram(vec![2, 6], vec![3, 5]); |
| let old_u64_sample = Some(convert_vectors_to_int_histogram(vec![1], vec![5])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: old_u64_sample, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(3, 2), (5, 5)], |
| }); |
| let new_i64_sample = convert_vectors_to_uint_histogram(vec![2, 4], vec![5, 3]); |
| let old_i64_sample = None; |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_i64_sample, |
| old_val: old_i64_sample, |
| process_ok: true, |
| event_made: true, |
| diff: vec![(3, 4), (5, 2)], |
| }); |
| } |
| |
| #[fuchsia::test] |
| fn test_errorful_process_int_histogram() { |
| // Test that changing the histogram length is an error. |
| let new_u64_sample = convert_vector_to_uint_histogram(vec![1, 1, 1, 1]); |
| let old_u64_sample = Some(convert_vector_to_uint_histogram(vec![1, 1, 1, 1, 1])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_u64_sample, |
| old_val: old_u64_sample, |
| process_ok: false, |
| event_made: false, |
| diff: Vec::new(), |
| }); |
| |
| // Test that new samples cant have negative values. |
| let new_i64_sample = convert_vector_to_int_histogram(vec![1, 1, -1, 1]); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_i64_sample, |
| old_val: None, |
| process_ok: false, |
| event_made: false, |
| diff: Vec::new(), |
| }); |
| |
| // Test that histograms must be monotonically increasing. |
| let new_i64_sample = convert_vector_to_int_histogram(vec![5, 2, 1, 3]); |
| let old_i64_sample = Some(convert_vector_to_int_histogram(vec![6, 1, 1, 1])); |
| process_int_histogram_tester(IntHistogramTesterParams { |
| new_val: new_i64_sample, |
| old_val: old_i64_sample, |
| process_ok: false, |
| event_made: false, |
| diff: Vec::new(), |
| }); |
| } |
| |
| /// Ensure that data distinguished only by metadata-filename - with the same moniker and |
| /// selector path - is kept properly separate in the previous-value cache. The same |
| /// MetricConfig should match each data source, but the occurrence counts |
| /// should reflect that the distinct values are individually tracked. |
| #[fuchsia::test] |
| async fn test_filename_distinguishes_data() { |
| let mut sampler = ProjectSampler { |
| archive_reader: ArchiveReader::new(), |
| moniker_to_selector_map: HashMap::new(), |
| metrics: vec![], |
| metric_cache: HashMap::new(), |
| metric_loggers: HashMap::new(), |
| project_id: 1, |
| poll_rate_sec: 3600, |
| project_sampler_stats: Arc::new(ProjectSamplerStats::new()), |
| }; |
| let selector: String = "my/component:root/branch:leaf".to_string(); |
| let metric_id = 1; |
| let event_codes = vec![]; |
| sampler.metrics.push(MetricConfig { |
| project_id: None, |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test(&selector)]), |
| metric_id, |
| metric_type: DataType::Occurrence, |
| event_codes, |
| upload_once: Some(false), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| |
| let file1_value4 = vec![Data::for_inspect( |
| "my/component", |
| Some(hierarchy! { root: {branch: {leaf: 4i32}}}), |
| 0, /* timestamp */ |
| "component-url", |
| Some(InspectHandleName::filename("file1")), |
| vec![], /* errors */ |
| )]; |
| let file2_value3 = vec![Data::for_inspect( |
| "my/component", |
| Some(hierarchy! { root: {branch: {leaf: 3i32}}}), |
| 0, /* timestamp */ |
| "component-url", |
| Some(InspectHandleName::filename("file2")), |
| vec![], /* errors */ |
| )]; |
| let file1_value6 = vec![Data::for_inspect( |
| "my/component", |
| Some(hierarchy! { root: {branch: {leaf: 6i32}}}), |
| 0, /* timestamp */ |
| "component-url", |
| Some(InspectHandleName::filename("file1")), |
| vec![], /* errors */ |
| )]; |
| let file2_value8 = vec![Data::for_inspect( |
| "my/component", |
| Some(hierarchy! { root: {branch: {leaf: 8i32}}}), |
| 0, /* timestamp */ |
| "component-url", |
| Some(InspectHandleName::filename("file2")), |
| vec![], /* errors */ |
| )]; |
| |
| fn expect_one_metric_event_value( |
| events: Result<Vec<EventToLog>, Error>, |
| value: u64, |
| context: &'static str, |
| ) { |
| let events = events.expect(context); |
| assert_eq!(events.len(), 1, "Events len not 1: {}: {}", context, events.len()); |
| let event = &events[0]; |
| let (project_id, MetricEvent { payload, .. }) = event; |
| assert_eq!(*project_id, 1); |
| if let fidl_fuchsia_metrics::MetricEventPayload::Count(payload) = payload { |
| assert_eq!( |
| payload, &value, |
| "Wrong payload, expected {} got {} at {}", |
| value, payload, context |
| ); |
| } else { |
| panic!("Expected MetricEventPayload::Count at {}, got {:?}", context, payload); |
| } |
| } |
| |
| expect_one_metric_event_value(sampler.process_snapshot(file1_value4).await, 4, "first"); |
| expect_one_metric_event_value(sampler.process_snapshot(file2_value3).await, 3, "second"); |
| expect_one_metric_event_value(sampler.process_snapshot(file1_value6).await, 2, "third"); |
| expect_one_metric_event_value(sampler.process_snapshot(file2_value8).await, 5, "fourth"); |
| } |
| |
| // TODO(https://fxbug.dev/42071858): we should remove this once we support batching. |
| #[fuchsia::test] |
| async fn project_id_can_be_overwritten_by_the_metric_project_id() { |
| let mut sampler = ProjectSampler { |
| archive_reader: ArchiveReader::new(), |
| moniker_to_selector_map: HashMap::new(), |
| metrics: vec![], |
| metric_cache: HashMap::new(), |
| metric_loggers: HashMap::new(), |
| project_id: 1, |
| poll_rate_sec: 3600, |
| project_sampler_stats: Arc::new(ProjectSamplerStats::new()), |
| }; |
| let selector: String = "my/component:root/branch:leaf".to_string(); |
| let metric_id = 1; |
| let event_codes = vec![]; |
| sampler.metrics.push(MetricConfig { |
| project_id: Some(2), |
| selectors: SelectorList(vec![sampler_config::parse_selector_for_test(&selector)]), |
| metric_id, |
| metric_type: DataType::Occurrence, |
| event_codes, |
| upload_once: Some(false), |
| }); |
| sampler.rebuild_selector_data_structures(); |
| |
| let value = vec![Data::for_inspect( |
| "my/component", |
| Some(hierarchy! { root: {branch: {leaf: 4i32}}}), |
| 0, /* timestamp */ |
| "component-url", |
| Some(InspectHandleName::filename("file1")), |
| vec![], /* errors */ |
| )]; |
| |
| let events = sampler.process_snapshot(value).await.expect("processed snapshot"); |
| assert_eq!(events.len(), 1); |
| let event = &events[0]; |
| let (project_id, MetricEvent { .. }) = event; |
| assert_eq!(*project_id, 2); |
| } |
| } |