| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/aggregate_store.h" |
| |
| #include <algorithm> |
| #include <map> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "src/algorithms/rappor/rappor_config_helper.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/proto_util.h" |
| #include "src/lib/util/status.h" |
| #include "src/local_aggregation/aggregation_utils.h" |
| #include "src/registry/packed_event_codes.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| using google::protobuf::RepeatedField; |
| using logger::Encoder; |
| using logger::kInvalidArguments; |
| using logger::kOK; |
| using logger::kOther; |
| using logger::MetricRef; |
| using logger::ObservationWriter; |
| using logger::ProjectContext; |
| using logger::Status; |
| using rappor::RapporConfigHelper; |
| using util::ConsistentProtoStore; |
| using util::SerializeToBase64; |
| using util::StatusCode; |
| |
| namespace { |
| |
| ////// General helper functions. |
| |
| // Populates a ReportAggregationKey proto message and then populates a string |
| // with the base64 encoding of the serialized proto. |
| bool PopulateReportKey(uint32_t customer_id, uint32_t project_id, uint32_t metric_id, |
| uint32_t report_id, std::string* key) { |
| ReportAggregationKey key_data; |
| key_data.set_customer_id(customer_id); |
| key_data.set_project_id(project_id); |
| key_data.set_metric_id(metric_id); |
| key_data.set_report_id(report_id); |
| return SerializeToBase64(key_data, key); |
| } |
| |
| ////// Helper functions used by the constructor and UpdateAggregationConfigs(). |
| |
| // Gets and validates the window sizes and/or aggregation windows from a ReportDefinition, converts |
| // window sizes to daily aggregation windows, sorts the aggregation windows in increasing order, and |
| // adds them to an AggregationConfig. |
| // |
| // TODO(pesk): Stop looking at the window_size field of |report| once all reports have been updated |
| // to have OnDeviceAggregationWindows only. |
| bool GetSortedAggregationWindowsFromReport(const ReportDefinition& report, |
| AggregationConfig* aggregation_config) { |
| if (report.window_size_size() == 0 && report.aggregation_window_size() == 0) { |
| LOG(ERROR) << "Report must have at least one window size or aggregation window."; |
| return false; |
| } |
| std::vector<uint32_t> aggregation_days; |
| std::vector<uint32_t> aggregation_hours; |
| for (const uint32_t window_size : report.window_size()) { |
| if (window_size == 0 || window_size > kMaxAllowedAggregationDays) { |
| LOG(ERROR) << "Window size must be positive and cannot exceed " << kMaxAllowedAggregationDays; |
| return false; |
| } |
| aggregation_days.push_back(window_size); |
| } |
| for (const auto& window : report.aggregation_window()) { |
| switch (window.units_case()) { |
| case OnDeviceAggregationWindow::kDays: { |
| uint32_t num_days = window.days(); |
| if (num_days == 0 || num_days > kMaxAllowedAggregationDays) { |
| LOG(ERROR) << "Daily windows must contain at least 1 and no more than " |
| << kMaxAllowedAggregationDays << " days"; |
| return false; |
| } |
| aggregation_days.push_back(num_days); |
| break; |
| } |
| case OnDeviceAggregationWindow::kHours: { |
| uint32_t num_hours = window.hours(); |
| if (num_hours == 0 || num_hours > kMaxAllowedAggregationHours) { |
| LOG(ERROR) << "Hourly windows must contain at least 1 and no more than " |
| << kMaxAllowedAggregationHours << " hours"; |
| return false; |
| } |
| aggregation_hours.push_back(num_hours); |
| break; |
| } |
| default: |
| LOG(ERROR) << "Invalid OnDeviceAggregationWindow type " << window.units_case(); |
| } |
| } |
| std::sort(aggregation_hours.begin(), aggregation_hours.end()); |
| std::sort(aggregation_days.begin(), aggregation_days.end()); |
| for (auto num_hours : aggregation_hours) { |
| *aggregation_config->add_aggregation_window() = MakeHourWindow(num_hours); |
| } |
| for (auto num_days : aggregation_days) { |
| *aggregation_config->add_aggregation_window() = MakeDayWindow(num_days); |
| } |
| return true; |
| } |
| |
| // Creates an AggregationConfig from a ProjectContext, MetricDefinition, and |
| // ReportDefinition and populates the aggregation_config field of a specified |
| // ReportAggregates. Also sets the type of the ReportAggregates based on the |
| // ReportDefinition's type. |
| // |
| // Accepts ReportDefinitions with either at least one WindowSize, or at least one |
| // OnDeviceAggregationWindow with units in days. |
| bool PopulateReportAggregates(const ProjectContext& project_context, const MetricDefinition& metric, |
| const ReportDefinition& report, ReportAggregates* report_aggregates) { |
| if (report.window_size_size() == 0 && report.aggregation_window_size() == 0) { |
| } |
| AggregationConfig* aggregation_config = report_aggregates->mutable_aggregation_config(); |
| *aggregation_config->mutable_project() = project_context.project(); |
| *aggregation_config->mutable_metric() = *project_context.GetMetric(metric.id()); |
| *aggregation_config->mutable_report() = report; |
| if (!GetSortedAggregationWindowsFromReport(report, aggregation_config)) { |
| return false; |
| } |
| switch (report.report_type()) { |
| case ReportDefinition::UNIQUE_N_DAY_ACTIVES: { |
| report_aggregates->set_allocated_unique_actives_aggregates(new UniqueActivesReportAggregates); |
| return true; |
| } |
| case ReportDefinition::PER_DEVICE_NUMERIC_STATS: |
| case ReportDefinition::PER_DEVICE_HISTOGRAM: { |
| report_aggregates->set_allocated_numeric_aggregates(new PerDeviceNumericAggregates); |
| return true; |
| } |
| default: |
| return false; |
| } |
| } |
| |
| // Move all items from the |window_size| field to the |aggregation_window| field |
| // of each AggregationConfig, preserving the order of the items. The |aggregation_window| field |
| // should be empty if the |window_size| field is nonempty. If for some reason this is not true, log |
| // an error and discard the contents of |aggregation_window| and replace them with the migrated |
| // |window_size| values. |
| void ConvertWindowSizesToAggregationDays(LocalAggregateStore* store) { |
| for (auto [key, aggregates] : store->by_report_key()) { |
| auto config = (*store->mutable_by_report_key())[key].mutable_aggregation_config(); |
| if (config->window_size_size() > 0 && config->aggregation_window_size() > 0) { |
| LOG(ERROR) << "Config has both a window_size and an aggregation_window; discarding all " |
| "aggregation_windows"; |
| config->clear_aggregation_window(); |
| } |
| for (auto window_size : config->window_size()) { |
| *config->add_aggregation_window() = MakeDayWindow(window_size); |
| } |
| config->clear_window_size(); |
| } |
| } |
| |
| // Upgrades the LocalAggregateStore from version 0 to |kCurrentLocalAggregateStoreVersion|. |
| Status UpgradeLocalAggregateStoreFromVersion0(LocalAggregateStore* store) { |
| ConvertWindowSizesToAggregationDays(store); |
| store->set_version(kCurrentLocalAggregateStoreVersion); |
| return kOK; |
| } |
| |
| } // namespace |
| |
| AggregateStore::AggregateStore(const Encoder* encoder, const ObservationWriter* observation_writer, |
| ConsistentProtoStore* local_aggregate_proto_store, |
| ConsistentProtoStore* obs_history_proto_store, |
| const size_t backfill_days) |
| : encoder_(encoder), |
| observation_writer_(observation_writer), |
| local_aggregate_proto_store_(local_aggregate_proto_store), |
| obs_history_proto_store_(obs_history_proto_store) { |
| CHECK_LE(backfill_days, kMaxAllowedBackfillDays) |
| << "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays; |
| backfill_days_ = backfill_days; |
| auto locked_store = protected_aggregate_store_.lock(); |
| locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore(); |
| auto restore_aggregates_status = |
| local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store)); |
| switch (restore_aggregates_status.error_code()) { |
| case StatusCode::OK: { |
| VLOG(4) << "Read LocalAggregateStore from disk."; |
| break; |
| } |
| case StatusCode::NOT_FOUND: { |
| VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding " |
| "with empty LocalAggregateStore. File will be created on " |
| "first snapshot of the LocalAggregateStore."; |
| locked_store->local_aggregate_store = MakeNewLocalAggregateStore(); |
| break; |
| } |
| default: { |
| LOG(ERROR) << "Read to local_aggregate_proto_store failed with status code: " |
| << restore_aggregates_status.error_code() |
| << "\nError message: " << restore_aggregates_status.error_message() |
| << "\nError details: " << restore_aggregates_status.error_details() |
| << "\nProceeding with empty LocalAggregateStore."; |
| locked_store->local_aggregate_store = MakeNewLocalAggregateStore(); |
| } |
| } |
| if (auto status = MaybeUpgradeLocalAggregateStore(&(locked_store->local_aggregate_store)); |
| status != kOK) { |
| LOG(ERROR) << "Failed to upgrade LocalAggregateStore to current version with status " << status |
| << ".\nProceeding with empty " |
| "LocalAggregateStore."; |
| locked_store->local_aggregate_store = MakeNewLocalAggregateStore(); |
| } |
| |
| auto locked_obs_history = protected_obs_history_.lock(); |
| auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history); |
| switch (restore_history_status.error_code()) { |
| case StatusCode::OK: { |
| VLOG(4) << "Read AggregatedObservationHistoryStore from disk."; |
| break; |
| } |
| case StatusCode::NOT_FOUND: { |
| VLOG(4) << "No file found for obs_history_proto_store. Proceeding " |
| "with empty AggregatedObservationHistoryStore. File will be " |
| "created on first snapshot of the AggregatedObservationHistoryStore."; |
| break; |
| } |
| default: { |
| LOG(ERROR) << "Read to obs_history_proto_store failed with status code: " |
| << restore_history_status.error_code() |
| << "\nError message: " << restore_history_status.error_message() |
| << "\nError details: " << restore_history_status.error_details() |
| << "\nProceeding with empty AggregatedObservationHistoryStore."; |
| locked_obs_history->obs_history = MakeNewObservationHistoryStore(); |
| } |
| } |
| if (auto status = MaybeUpgradeObservationHistoryStore(&locked_obs_history->obs_history); |
| status != kOK) { |
| LOG(ERROR) |
| << "Failed to upgrade AggregatedObservationHistoryStore to current version with status " |
| << status << ".\nProceeding with empty AggregatedObservationHistoryStore."; |
| locked_obs_history->obs_history = MakeNewObservationHistoryStore(); |
| } |
| } |
| |
| Status AggregateStore::MaybeInsertReportConfig(const ProjectContext& project_context, |
| const MetricDefinition& metric, |
| const ReportDefinition& report) { |
| auto locked = protected_aggregate_store_.lock(); |
| std::string key; |
| if (!PopulateReportKey(project_context.project().customer_id(), |
| project_context.project().project_id(), metric.id(), report.id(), &key)) { |
| return kInvalidArguments; |
| } |
| ReportAggregates report_aggregates; |
| if (locked->local_aggregate_store.by_report_key().count(key) == 0) { |
| if (!PopulateReportAggregates(project_context, metric, report, &report_aggregates)) { |
| return kInvalidArguments; |
| } |
| (*locked->local_aggregate_store.mutable_by_report_key())[key] = report_aggregates; |
| } |
| |
| // Make sure that the 'empty' store has the key as well. |
| ReportAggregates empty_report_aggregates; |
| if (locked->empty_local_aggregate_store.by_report_key().count(key) == 0) { |
| if (!PopulateReportAggregates(project_context, metric, report, &empty_report_aggregates)) { |
| return kInvalidArguments; |
| } |
| (*locked->empty_local_aggregate_store.mutable_by_report_key())[key] = empty_report_aggregates; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id, |
| uint32_t report_id, uint64_t event_code, uint32_t day_index) { |
| if (is_disabled_) { |
| return kOK; |
| } |
| std::string key; |
| if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &key)) { |
| return kInvalidArguments; |
| } |
| |
| auto locked = protected_aggregate_store_.lock(); |
| auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(key); |
| if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) { |
| LOG(ERROR) << "The Local Aggregate Store received an unexpected key."; |
| return kInvalidArguments; |
| } |
| if (!aggregates->second.has_unique_actives_aggregates()) { |
| LOG(ERROR) << "The local aggregates for this report key are not of type " |
| "UniqueActivesReportAggregates."; |
| return kInvalidArguments; |
| } |
| (*(*aggregates->second.mutable_unique_actives_aggregates()->mutable_by_event_code())[event_code] |
| .mutable_by_day_index())[day_index] |
| .mutable_activity_daily_aggregate() |
| ->set_activity_indicator(true); |
| return kOK; |
| } |
| |
| Status AggregateStore::UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id, |
| uint32_t metric_id, uint32_t report_id, |
| const std::string& component, uint64_t event_code, |
| uint32_t day_index, int64_t value) { |
| if (is_disabled_) { |
| return kOK; |
| } |
| std::string report_key; |
| if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &report_key)) { |
| return kInvalidArguments; |
| } |
| |
| auto locked = protected_aggregate_store_.lock(); |
| auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(report_key); |
| if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) { |
| LOG(ERROR) << "The Local Aggregate Store received an unexpected key."; |
| return kInvalidArguments; |
| } |
| if (!aggregates->second.has_numeric_aggregates()) { |
| LOG(ERROR) << "The local aggregates for this report key are not of a " |
| "compatible type."; |
| return kInvalidArguments; |
| } |
| |
| auto aggregates_by_day = |
| (*(*aggregates->second.mutable_numeric_aggregates()->mutable_by_component())[component] |
| .mutable_by_event_code())[event_code] |
| .mutable_by_day_index(); |
| bool has_stored_aggregate = ((*aggregates_by_day).find(day_index) != aggregates_by_day->end()); |
| auto day_aggregate = (*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate(); |
| |
| auto [status, updated_value] = GetUpdatedAggregate( |
| aggregates->second.aggregation_config().report().aggregation_type(), |
| has_stored_aggregate ? std::optional<int64_t>{day_aggregate->value()} : std::nullopt, value); |
| if (status == kOK) { |
| day_aggregate->set_value(updated_value); |
| return kOK; |
| } |
| |
| return status; |
| } |
| |
| RepeatedField<uint32_t> UnpackEventCodesProto(uint64_t packed_event_codes) { |
| RepeatedField<uint32_t> fields; |
| for (auto code : config::UnpackEventCodes(packed_event_codes)) { |
| *fields.Add() = code; |
| } |
| return fields; |
| } |
| |
| Status AggregateStore::BackUpLocalAggregateStore() { |
| // Lock, copy the LocalAggregateStore, and release the lock. Write the copy |
| // to |local_aggregate_proto_store_|. |
| auto local_aggregate_store = CopyLocalAggregateStore(); |
| auto status = local_aggregate_proto_store_->Write(local_aggregate_store); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: " |
| << status.error_code() << "\nError message: " << status.error_message() |
| << "\nError details: " << status.error_details(); |
| return kOther; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::BackUpObservationHistory() { |
| auto obs_history = protected_obs_history_.lock()->obs_history; |
| auto status = obs_history_proto_store_->Write(obs_history); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. " |
| "::cobalt::util::Status error code: " |
| << status.error_code() << "\nError message: " << status.error_message() |
| << "\nError details: " << status.error_details(); |
| return kOther; |
| } |
| return kOK; |
| } |
| |
| ////////////////////// GarbageCollect and helper functions ////////////////// |
| |
| namespace { |
| |
| void GarbageCollectUniqueActivesReportAggregates(uint32_t day_index, uint32_t max_aggregation_days, |
| uint32_t backfill_days, |
| UniqueActivesReportAggregates* report_aggregates) { |
| auto map_by_event_code = report_aggregates->mutable_by_event_code(); |
| for (auto event_code = map_by_event_code->begin(); event_code != map_by_event_code->end();) { |
| auto map_by_day = event_code->second.mutable_by_day_index(); |
| for (auto day = map_by_day->begin(); day != map_by_day->end();) { |
| if (day->first <= day_index - backfill_days - max_aggregation_days) { |
| day = map_by_day->erase(day); |
| } else { |
| ++day; |
| } |
| } |
| if (map_by_day->empty()) { |
| event_code = map_by_event_code->erase(event_code); |
| } else { |
| ++event_code; |
| } |
| } |
| } |
| |
| void GarbageCollectNumericReportAggregates(uint32_t day_index, uint32_t max_aggregation_days, |
| uint32_t backfill_days, |
| PerDeviceNumericAggregates* report_aggregates) { |
| auto map_by_component = report_aggregates->mutable_by_component(); |
| for (auto component = map_by_component->begin(); component != map_by_component->end();) { |
| auto map_by_event_code = component->second.mutable_by_event_code(); |
| for (auto event_code = map_by_event_code->begin(); event_code != map_by_event_code->end();) { |
| auto map_by_day = event_code->second.mutable_by_day_index(); |
| for (auto day = map_by_day->begin(); day != map_by_day->end();) { |
| if (day->first <= day_index - backfill_days - max_aggregation_days) { |
| day = map_by_day->erase(day); |
| } else { |
| ++day; |
| } |
| } |
| if (map_by_day->empty()) { |
| event_code = map_by_event_code->erase(event_code); |
| } else { |
| ++event_code; |
| } |
| } |
| if (map_by_event_code->empty()) { |
| component = map_by_component->erase(component); |
| } else { |
| ++component; |
| } |
| } |
| } |
| |
| } // namespace |
| |
| Status AggregateStore::GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local) { |
| if (day_index_local == 0u) { |
| day_index_local = day_index_utc; |
| } |
| CHECK_LT(day_index_utc, UINT32_MAX); |
| CHECK_LT(day_index_local, UINT32_MAX); |
| CHECK_GE(day_index_utc, kMaxAllowedAggregationDays + backfill_days_); |
| CHECK_GE(day_index_local, kMaxAllowedAggregationDays + backfill_days_); |
| |
| auto locked = protected_aggregate_store_.lock(); |
| for (const auto& [report_key, aggregates] : locked->local_aggregate_store.by_report_key()) { |
| uint32_t day_index; |
| const auto& config = aggregates.aggregation_config(); |
| switch (config.metric().time_zone_policy()) { |
| case MetricDefinition::UTC: { |
| day_index = day_index_utc; |
| break; |
| } |
| case MetricDefinition::LOCAL: { |
| day_index = day_index_local; |
| break; |
| } |
| default: |
| LOG_FIRST_N(ERROR, 10) << "The TimeZonePolicy of this MetricDefinition is invalid."; |
| continue; |
| } |
| if (aggregates.aggregation_config().aggregation_window_size() == 0) { |
| LOG_FIRST_N(ERROR, 10) << "This ReportDefinition does not have an aggregation window."; |
| continue; |
| } |
| // PopulateReportAggregates ensured that aggregation_window has at least one element, that all |
| // aggregation windows are <= kMaxAllowedAggregationDays, and that config.aggregation_window() |
| // is sorted in increasing order. |
| uint32_t max_aggregation_days = 1u; |
| const OnDeviceAggregationWindow& largest_window = |
| config.aggregation_window(config.aggregation_window_size() - 1); |
| if (largest_window.units_case() == OnDeviceAggregationWindow::kDays) { |
| max_aggregation_days = largest_window.days(); |
| } |
| if (max_aggregation_days == 0u || max_aggregation_days > day_index) { |
| LOG_FIRST_N(ERROR, 10) << "The maximum number of aggregation days " << max_aggregation_days |
| << " of this ReportDefinition is out of range."; |
| continue; |
| } |
| // For each ReportAggregates, descend to and iterate over the sub-map of |
| // local aggregates keyed by day index. Keep buckets with day indices |
| // greater than |day_index| - |backfill_days_| - |max_aggregation_days|, and |
| // remove all buckets with smaller day indices. |
| switch (aggregates.type_case()) { |
| case ReportAggregates::kUniqueActivesAggregates: { |
| GarbageCollectUniqueActivesReportAggregates( |
| day_index, max_aggregation_days, backfill_days_, |
| locked->local_aggregate_store.mutable_by_report_key() |
| ->at(report_key) |
| .mutable_unique_actives_aggregates()); |
| break; |
| } |
| case ReportAggregates::kNumericAggregates: { |
| GarbageCollectNumericReportAggregates(day_index, max_aggregation_days, backfill_days_, |
| locked->local_aggregate_store.mutable_by_report_key() |
| ->at(report_key) |
| .mutable_numeric_aggregates()); |
| break; |
| } |
| default: |
| continue; |
| } |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::GenerateObservations(uint32_t final_day_index_utc, |
| uint32_t final_day_index_local) { |
| if (final_day_index_local == 0u) { |
| final_day_index_local = final_day_index_utc; |
| } |
| CHECK_LT(final_day_index_utc, UINT32_MAX); |
| CHECK_LT(final_day_index_local, UINT32_MAX); |
| CHECK_GE(final_day_index_utc, kMaxAllowedAggregationDays + backfill_days_); |
| CHECK_GE(final_day_index_local, kMaxAllowedAggregationDays + backfill_days_); |
| |
| // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to |
| // generate observations. |
| auto local_aggregate_store = CopyLocalAggregateStore(); |
| for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) { |
| const auto& config = aggregates.aggregation_config(); |
| |
| const auto& metric = config.metric(); |
| auto metric_ref = MetricRef(&config.project(), &metric); |
| uint32_t final_day_index; |
| switch (metric.time_zone_policy()) { |
| case MetricDefinition::UTC: { |
| final_day_index = final_day_index_utc; |
| break; |
| } |
| case MetricDefinition::LOCAL: { |
| final_day_index = final_day_index_local; |
| break; |
| } |
| default: |
| LOG_FIRST_N(ERROR, 10) << "The TimeZonePolicy of this MetricDefinition is invalid."; |
| continue; |
| } |
| |
| const auto& report = config.report(); |
| // PopulateReportAggregates ensured that aggregation_window has at least one element, that all |
| // aggregation windows are <= kMaxAllowedAggregationDays, and that config.aggregation_window() |
| // is sorted in increasing order. |
| if (config.aggregation_window_size() == 0u) { |
| LOG_FIRST_N(ERROR, 10) << "No aggregation_window found for this report."; |
| continue; |
| } |
| uint32_t max_aggregation_days = 1u; |
| const OnDeviceAggregationWindow& largest_window = |
| config.aggregation_window(config.aggregation_window_size() - 1); |
| if (largest_window.units_case() == OnDeviceAggregationWindow::kDays) { |
| max_aggregation_days = largest_window.days(); |
| } |
| if (max_aggregation_days == 0u || max_aggregation_days > final_day_index) { |
| LOG_FIRST_N(ERROR, 10) << "The maximum number of aggregation days " << max_aggregation_days |
| << " of this ReportDefinition is out of range."; |
| continue; |
| } |
| switch (metric.metric_type()) { |
| case MetricDefinition::EVENT_OCCURRED: { |
| auto num_event_codes = RapporConfigHelper::BasicRapporNumCategories(metric); |
| |
| switch (report.report_type()) { |
| case ReportDefinition::UNIQUE_N_DAY_ACTIVES: { |
| auto status = GenerateUniqueActivesObservations(metric_ref, report_key, aggregates, |
| num_event_codes, final_day_index); |
| if (status != kOK) { |
| return status; |
| } |
| break; |
| } |
| default: |
| continue; |
| } |
| break; |
| } |
| case MetricDefinition::EVENT_COUNT: |
| case MetricDefinition::ELAPSED_TIME: |
| case MetricDefinition::FRAME_RATE: |
| case MetricDefinition::MEMORY_USAGE: { |
| switch (report.report_type()) { |
| case ReportDefinition::PER_DEVICE_NUMERIC_STATS: |
| case ReportDefinition::PER_DEVICE_HISTOGRAM: { |
| auto status = GenerateObsFromNumericAggregates(metric_ref, report_key, aggregates, |
| final_day_index); |
| if (status != kOK) { |
| return status; |
| } |
| break; |
| } |
| default: |
| continue; |
| } |
| break; |
| } |
| default: |
| continue; |
| } |
| } |
| return kOK; |
| } |
| |
| ////////// GenerateUniqueActivesObservations and helper methods //////////////// |
| |
| namespace { |
| |
| // Given the set of daily aggregates for a fixed event code, and the size and |
| // end date of an aggregation window, returns the first day index within that |
| // window on which the event code occurred. Returns 0 if the event code did |
| // not occur within the window. |
| uint32_t FirstActiveDayIndexInWindow(const DailyAggregates& daily_aggregates, |
| uint32_t obs_day_index, uint32_t aggregation_days) { |
| for (uint32_t day_index = obs_day_index - aggregation_days + 1; day_index <= obs_day_index; |
| day_index++) { |
| auto day_aggregate = daily_aggregates.by_day_index().find(day_index); |
| if (day_aggregate != daily_aggregates.by_day_index().end() && |
| day_aggregate->second.activity_daily_aggregate().activity_indicator()) { |
| return day_index; |
| } |
| } |
| return 0u; |
| } |
| |
| // Given the day index of an event occurrence and the size and end date |
| // of an aggregation window, returns true if the occurrence falls within |
| // the window and false if not. |
| bool IsActivityInWindow(uint32_t active_day_index, uint32_t obs_day_index, |
| uint32_t aggregation_days) { |
| return (active_day_index <= obs_day_index && active_day_index > obs_day_index - aggregation_days); |
| } |
| |
| } // namespace |
| |
| uint32_t AggregateStore::GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, |
| uint32_t event_code, |
| uint32_t aggregation_days) const { |
| auto obs_history = protected_obs_history_.const_lock()->obs_history; |
| auto report_history = obs_history.by_report_key().find(report_key); |
| if (report_history == obs_history.by_report_key().end()) { |
| return 0u; |
| } |
| auto event_code_history = |
| report_history->second.unique_actives_history().by_event_code().find(event_code); |
| if (event_code_history == report_history->second.unique_actives_history().by_event_code().end()) { |
| return 0u; |
| } |
| auto window_history = event_code_history->second.by_window_size().find(aggregation_days); |
| if (window_history == event_code_history->second.by_window_size().end()) { |
| return 0u; |
| } |
| return window_history->second; |
| } |
| |
| void AggregateStore::SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, |
| uint32_t event_code, |
| uint32_t aggregation_days, |
| uint32_t value) { |
| auto locked = protected_obs_history_.lock(); |
| (*(*(*locked->obs_history.mutable_by_report_key())[report_key] |
| .mutable_unique_actives_history() |
| ->mutable_by_event_code())[event_code] |
| .mutable_by_window_size())[aggregation_days] = value; |
| } |
| |
| Status AggregateStore::GenerateSingleUniqueActivesObservation( |
| const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index, |
| uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const { |
| auto encoder_result = encoder_->EncodeUniqueActivesObservation(metric_ref, report, obs_day_index, |
| event_code, was_active, window); |
| if (encoder_result.status != kOK) { |
| return encoder_result.status; |
| } |
| if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) { |
| LOG(ERROR) << "Failed to encode UniqueActivesObservation"; |
| return kOther; |
| } |
| |
| auto writer_status = observation_writer_->WriteObservation(encoder_result.observation, |
| std::move(encoder_result.metadata)); |
| if (writer_status != kOK) { |
| return writer_status; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::GenerateUniqueActivesObservations(const MetricRef metric_ref, |
| const std::string& report_key, |
| const ReportAggregates& report_aggregates, |
| uint32_t num_event_codes, |
| uint32_t final_day_index) { |
| CHECK_GT(final_day_index, backfill_days_); |
| // The earliest day index for which we might need to generate an |
| // Observation. |
| auto backfill_period_start = uint32_t(final_day_index - backfill_days_); |
| |
| for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) { |
| auto daily_aggregates = |
| report_aggregates.unique_actives_aggregates().by_event_code().find(event_code); |
| // Have any events ever been logged for this report and event code? |
| bool found_event_code = |
| (daily_aggregates != report_aggregates.unique_actives_aggregates().by_event_code().end()); |
| for (const auto& window : report_aggregates.aggregation_config().aggregation_window()) { |
| // Skip all hourly windows, and all daily windows which are larger than |
| // kMaxAllowedAggregationDays. |
| // |
| // TODO(pesk): Generate observations for hourly windows. |
| if (window.units_case() != OnDeviceAggregationWindow::kDays) { |
| LOG(INFO) << "Skipping unsupported aggregation window."; |
| continue; |
| } |
| if (window.days() > kMaxAllowedAggregationDays) { |
| LOG(WARNING) << "GenerateUniqueActivesObservations ignoring a window " |
| "size exceeding the maximum allowed value"; |
| continue; |
| } |
| // Find the earliest day index for which an Observation has not yet |
| // been generated for this report, event code, and window size. If |
| // that day index is later than |final_day_index|, no Observation is |
| // generated on this invocation. |
| auto last_gen = GetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days()); |
| auto first_day_index = std::max(last_gen + 1, backfill_period_start); |
| // The latest day index on which |event_type| is known to have |
| // occurred, so far. This value will be updated as we search |
| // forward from the earliest day index belonging to a window of |
| // interest. |
| uint32_t active_day_index = 0u; |
| // Iterate over the day indices |obs_day_index| for which we need |
| // to generate Observations. On each iteration, generate an |
| // Observation for the |window| ending on |obs_day_index|. |
| for (uint32_t obs_day_index = first_day_index; obs_day_index <= final_day_index; |
| obs_day_index++) { |
| bool was_active = false; |
| if (found_event_code) { |
| // If the current value of |active_day_index| falls within the |
| // window, generate an Observation of activity. If not, search |
| // forward in the window, update |active_day_index|, and generate an |
| // Observation of activity or inactivity depending on the result of |
| // the search. |
| if (IsActivityInWindow(active_day_index, obs_day_index, window.days())) { |
| was_active = true; |
| } else { |
| active_day_index = |
| FirstActiveDayIndexInWindow(daily_aggregates->second, obs_day_index, window.days()); |
| was_active = IsActivityInWindow(active_day_index, obs_day_index, window.days()); |
| } |
| } |
| auto status = GenerateSingleUniqueActivesObservation( |
| metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index, event_code, |
| window, was_active); |
| if (status != kOK) { |
| return status; |
| } |
| |
| SetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days(), obs_day_index); |
| } |
| } |
| } |
| return kOK; |
| } |
| |
| ////////// GenerateObsFromNumericAggregates and helper methods ///////////// |
| |
| uint32_t AggregateStore::GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key, |
| const std::string& component, |
| uint32_t event_code, |
| uint32_t aggregation_days) const { |
| auto obs_history = protected_obs_history_.const_lock()->obs_history; |
| const auto& report_history = obs_history.by_report_key().find(report_key); |
| if (report_history == obs_history.by_report_key().end()) { |
| return 0u; |
| } |
| if (!report_history->second.has_per_device_numeric_history()) { |
| return 0u; |
| } |
| const auto& component_history = |
| report_history->second.per_device_numeric_history().by_component().find(component); |
| if (component_history == |
| report_history->second.per_device_numeric_history().by_component().end()) { |
| return 0u; |
| } |
| const auto& event_code_history = component_history->second.by_event_code().find(event_code); |
| if (event_code_history == component_history->second.by_event_code().end()) { |
| return 0u; |
| } |
| const auto& window_history = event_code_history->second.by_window_size().find(aggregation_days); |
| if (window_history == event_code_history->second.by_window_size().end()) { |
| return 0u; |
| } |
| return window_history->second; |
| } |
| |
| void AggregateStore::SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key, |
| const std::string& component, |
| uint32_t event_code, |
| uint32_t aggregation_days, |
| uint32_t value) { |
| auto locked = protected_obs_history_.lock(); |
| (*(*(*(*locked->obs_history.mutable_by_report_key())[report_key] |
| .mutable_per_device_numeric_history() |
| ->mutable_by_component())[component] |
| .mutable_by_event_code())[event_code] |
| .mutable_by_window_size())[aggregation_days] = value; |
| } |
| |
| uint32_t AggregateStore::GetReportParticipationLastGeneratedDayIndex( |
| const std::string& report_key) const { |
| auto obs_history = protected_obs_history_.const_lock()->obs_history; |
| const auto& report_history = obs_history.by_report_key().find(report_key); |
| if (report_history == obs_history.by_report_key().end()) { |
| return 0u; |
| } |
| return report_history->second.report_participation_history().last_generated(); |
| } |
| |
| void AggregateStore::SetReportParticipationLastGeneratedDayIndex(const std::string& report_key, |
| uint32_t value) { |
| auto locked = protected_obs_history_.lock(); |
| (*locked->obs_history.mutable_by_report_key())[report_key] |
| .mutable_report_participation_history() |
| ->set_last_generated(value); |
| } |
| |
| void AggregateStore::DeleteData() { |
| LOG(INFO) << "AggregateStore: Deleting stored data"; |
| |
| { |
| auto locked = protected_aggregate_store_.lock(); |
| locked->local_aggregate_store = locked->empty_local_aggregate_store; |
| } |
| protected_obs_history_.lock()->obs_history = MakeNewObservationHistoryStore(); |
| } |
| |
| void AggregateStore::Disable(bool is_disabled) { |
| LOG(INFO) << "AggregateStore: " << (is_disabled ? "Disabling" : "Enabling") |
| << " event aggregate storage."; |
| is_disabled_ = is_disabled; |
| } |
| |
| Status AggregateStore::GenerateSinglePerDeviceNumericObservation( |
| const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index, |
| const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window, |
| int64_t value) const { |
| Encoder::Result encoder_result = |
| encoder_->EncodePerDeviceNumericObservation(metric_ref, report, obs_day_index, component, |
| UnpackEventCodesProto(event_code), value, window); |
| if (encoder_result.status != kOK) { |
| return encoder_result.status; |
| } |
| if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) { |
| LOG(ERROR) << "Failed to encode PerDeviceNumericObservation"; |
| return kOther; |
| } |
| |
| const auto& writer_status = observation_writer_->WriteObservation( |
| encoder_result.observation, std::move(encoder_result.metadata)); |
| if (writer_status != kOK) { |
| return writer_status; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::GenerateSinglePerDeviceHistogramObservation( |
| const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index, |
| const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window, |
| int64_t value) const { |
| Encoder::Result encoder_result = encoder_->EncodePerDeviceHistogramObservation( |
| metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value, |
| window); |
| |
| if (encoder_result.status != kOK) { |
| return encoder_result.status; |
| } |
| if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) { |
| LOG(ERROR) << "Failed to encode PerDeviceNumericObservation"; |
| return kOther; |
| } |
| |
| const auto& writer_status = observation_writer_->WriteObservation( |
| encoder_result.observation, std::move(encoder_result.metadata)); |
| if (writer_status != kOK) { |
| return writer_status; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::GenerateSingleReportParticipationObservation(const MetricRef metric_ref, |
| const ReportDefinition* report, |
| uint32_t obs_day_index) const { |
| auto encoder_result = |
| encoder_->EncodeReportParticipationObservation(metric_ref, report, obs_day_index); |
| if (encoder_result.status != kOK) { |
| return encoder_result.status; |
| } |
| if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) { |
| LOG(ERROR) << "Failed to encode ReportParticipationObservation"; |
| return kOther; |
| } |
| |
| const auto& writer_status = observation_writer_->WriteObservation( |
| encoder_result.observation, std::move(encoder_result.metadata)); |
| if (writer_status != kOK) { |
| return writer_status; |
| } |
| return kOK; |
| } |
| |
| Status AggregateStore::GenerateObsFromNumericAggregates(const MetricRef metric_ref, |
| const std::string& report_key, |
| const ReportAggregates& report_aggregates, |
| uint32_t final_day_index) { |
| CHECK_GT(final_day_index, backfill_days_); |
| // The first day index for which we might have to generate an Observation. |
| auto backfill_period_start = uint32_t(final_day_index - backfill_days_); |
| |
| // Generate any necessary PerDeviceNumericObservations for this report. |
| for (const auto& [component, event_code_aggregates] : |
| report_aggregates.numeric_aggregates().by_component()) { |
| for (const auto& [event_code, daily_aggregates] : event_code_aggregates.by_event_code()) { |
| // Populate a helper map keyed by day indices which belong to the range |
| // [|backfill_period_start|, |final_day_index|]. The value at a day |
| // index is the list of windows, in increasing size order, for which an |
| // Observation should be generated for that day index. |
| std::map<uint32_t, std::vector<OnDeviceAggregationWindow>> windows_by_obs_day; |
| for (const auto& window : report_aggregates.aggregation_config().aggregation_window()) { |
| if (window.units_case() != OnDeviceAggregationWindow::kDays) { |
| LOG(INFO) << "Skipping unsupported aggregation window."; |
| continue; |
| } |
| auto last_gen = GetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, |
| window.days()); |
| auto first_day_index = std::max(last_gen + 1, backfill_period_start); |
| for (auto obs_day_index = first_day_index; obs_day_index <= final_day_index; |
| obs_day_index++) { |
| windows_by_obs_day[obs_day_index].push_back(window); |
| } |
| } |
| // Iterate over the day indices |obs_day_index| for which we might need |
| // to generate an Observation. For each day index, generate an |
| // Observation for each needed window. |
| // |
| // More precisely, for each needed window, iterate over the days within that window. If at |
| // least one numeric event was logged during the window, compute the aggregate of the numeric |
| // values over the window and generate a PerDeviceNumericObservation. Whether or not a numeric |
| // event was found, update the AggregatedObservationHistory for this report, component, event |
| // code, and window size with |obs_day_index| as the most recent date of Observation |
| // generation. This reflects the fact that all needed Observations were generated for the |
| // window ending on that date. |
| for (auto obs_day_index = backfill_period_start; obs_day_index <= final_day_index; |
| obs_day_index++) { |
| const auto& windows = windows_by_obs_day.find(obs_day_index); |
| if (windows == windows_by_obs_day.end()) { |
| continue; |
| } |
| bool found_value_for_window = false; |
| int64_t window_aggregate = 0; |
| uint32_t num_days = 0; |
| for (const auto& window : windows->second) { |
| while (num_days < window.days()) { |
| bool found_value_for_day = false; |
| const auto& day_aggregates = |
| daily_aggregates.by_day_index().find(obs_day_index - num_days); |
| if (day_aggregates != daily_aggregates.by_day_index().end()) { |
| found_value_for_day = true; |
| } |
| const auto& aggregation_type = |
| report_aggregates.aggregation_config().report().aggregation_type(); |
| switch (aggregation_type) { |
| case ReportDefinition::SUM: |
| if (found_value_for_day) { |
| window_aggregate += day_aggregates->second.numeric_daily_aggregate().value(); |
| found_value_for_window = true; |
| } |
| break; |
| case ReportDefinition::MAX: |
| if (found_value_for_day) { |
| window_aggregate = std::max( |
| window_aggregate, day_aggregates->second.numeric_daily_aggregate().value()); |
| found_value_for_window = true; |
| } |
| break; |
| case ReportDefinition::MIN: |
| if (found_value_for_day && !found_value_for_window) { |
| window_aggregate = day_aggregates->second.numeric_daily_aggregate().value(); |
| found_value_for_window = true; |
| } else if (found_value_for_day) { |
| window_aggregate = std::min( |
| window_aggregate, day_aggregates->second.numeric_daily_aggregate().value()); |
| } |
| break; |
| default: |
| LOG(ERROR) << "Unexpected aggregation type " << aggregation_type; |
| return kInvalidArguments; |
| } |
| num_days++; |
| } |
| if (found_value_for_window) { |
| Status status; |
| const ReportDefinition* report = &report_aggregates.aggregation_config().report(); |
| switch (report->report_type()) { |
| case ReportDefinition::PER_DEVICE_NUMERIC_STATS: { |
| status = GenerateSinglePerDeviceNumericObservation( |
| metric_ref, report, obs_day_index, component, event_code, window, |
| window_aggregate); |
| if (status != kOK) { |
| return status; |
| } |
| break; |
| } |
| case ReportDefinition::PER_DEVICE_HISTOGRAM: { |
| auto status = GenerateSinglePerDeviceHistogramObservation( |
| metric_ref, report, obs_day_index, component, event_code, window, |
| window_aggregate); |
| if (status != kOK) { |
| return status; |
| } |
| break; |
| } |
| default: |
| LOG(ERROR) << "Unexpected report type " << report->report_type(); |
| return kInvalidArguments; |
| } |
| } |
| |
| SetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days(), |
| obs_day_index); |
| } |
| } |
| } |
| } |
| // Generate any necessary ReportParticipationObservations for this report. |
| auto participation_last_gen = GetReportParticipationLastGeneratedDayIndex(report_key); |
| auto participation_first_day_index = std::max(participation_last_gen + 1, backfill_period_start); |
| for (auto obs_day_index = participation_first_day_index; obs_day_index <= final_day_index; |
| obs_day_index++) { |
| GenerateSingleReportParticipationObservation( |
| metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index); |
| SetReportParticipationLastGeneratedDayIndex(report_key, obs_day_index); |
| } |
| return kOK; |
| } |
| |
| LocalAggregateStore AggregateStore::MakeNewLocalAggregateStore(uint32_t version) { |
| LocalAggregateStore store; |
| store.set_version(version); |
| return store; |
| } |
| |
| AggregatedObservationHistoryStore AggregateStore::MakeNewObservationHistoryStore(uint32_t version) { |
| AggregatedObservationHistoryStore store; |
| store.set_version(version); |
| return store; |
| } |
| |
| // We can upgrade from v0, but no other versions. |
| Status AggregateStore::MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) { |
| uint32_t version = store->version(); |
| if (version == kCurrentLocalAggregateStoreVersion) { |
| return kOK; |
| } |
| VLOG(4) << "Attempting to upgrade LocalAggregateStore from version " << version; |
| switch (version) { |
| case 0u: |
| return UpgradeLocalAggregateStoreFromVersion0(store); |
| default: |
| LOG(ERROR) << "Cannot upgrade LocalAggregateStore from version " << version; |
| return kInvalidArguments; |
| } |
| } |
| |
| // The current version is the earliest version, so no other versions are accepted. |
| Status AggregateStore::MaybeUpgradeObservationHistoryStore( |
| AggregatedObservationHistoryStore* store) { |
| uint32_t version = store->version(); |
| if (version == kCurrentObservationHistoryStoreVersion) { |
| return kOK; |
| } |
| LOG(ERROR) << "Cannot upgrade AggregatedObservationHistoryStore from version " << version; |
| return kInvalidArguments; |
| } |
| |
| } // namespace cobalt::local_aggregation |