blob: b793fe810153493005b00d03436ab76623a606b5 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/aggregate_store.h"
#include <algorithm>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "src/algorithms/rappor/rappor_config_helper.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/proto_util.h"
#include "src/lib/util/status.h"
#include "src/local_aggregation/aggregation_utils.h"
#include "src/logger/internal_metrics.h"
#include "src/registry/packed_event_codes.h"
namespace cobalt::local_aggregation {
using google::protobuf::RepeatedField;
using logger::Encoder;
using logger::kInvalidArguments;
using logger::kOK;
using logger::kOther;
using logger::MetricRef;
using logger::ObservationWriter;
using logger::ProjectContext;
using logger::Status;
using rappor::RapporConfigHelper;
using util::ConsistentProtoStore;
using util::SerializeToBase64;
using util::StatusCode;
namespace {
////// General helper functions.
// Populates a ReportAggregationKey proto message and then populates a string
// with the base64 encoding of the serialized proto.
bool PopulateReportKey(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t report_id, std::string* key) {
ReportAggregationKey key_data;
key_data.set_customer_id(customer_id);
key_data.set_project_id(project_id);
key_data.set_metric_id(metric_id);
key_data.set_report_id(report_id);
return SerializeToBase64(key_data, key);
}
////// Helper functions used by the constructor and UpdateAggregationConfigs().
// Gets and validates the window sizes and/or aggregation windows from a ReportDefinition, converts
// window sizes to daily aggregation windows, sorts the aggregation windows in increasing order, and
// adds them to an AggregationConfig.
//
// TODO(pesk): Stop looking at the window_size field of |report| once all reports have been updated
// to have OnDeviceAggregationWindows only.
bool GetSortedAggregationWindowsFromReport(const ReportDefinition& report,
AggregationConfig* aggregation_config) {
if (report.window_size_size() == 0 && report.aggregation_window_size() == 0) {
LOG(ERROR) << "Report must have at least one window size or aggregation window.";
return false;
}
std::vector<uint32_t> aggregation_days;
std::vector<uint32_t> aggregation_hours;
for (const uint32_t window_size : report.window_size()) {
if (window_size == 0 || window_size > kMaxAllowedAggregationDays) {
LOG(ERROR) << "Window size must be positive and cannot exceed " << kMaxAllowedAggregationDays;
return false;
}
aggregation_days.push_back(window_size);
}
for (const auto& window : report.aggregation_window()) {
switch (window.units_case()) {
case OnDeviceAggregationWindow::kDays: {
uint32_t num_days = window.days();
if (num_days == 0 || num_days > kMaxAllowedAggregationDays) {
LOG(ERROR) << "Daily windows must contain at least 1 and no more than "
<< kMaxAllowedAggregationDays << " days";
return false;
}
aggregation_days.push_back(num_days);
break;
}
case OnDeviceAggregationWindow::kHours: {
uint32_t num_hours = window.hours();
if (num_hours == 0 || num_hours > kMaxAllowedAggregationHours) {
LOG(ERROR) << "Hourly windows must contain at least 1 and no more than "
<< kMaxAllowedAggregationHours << " hours";
return false;
}
aggregation_hours.push_back(num_hours);
break;
}
default:
LOG(ERROR) << "Invalid OnDeviceAggregationWindow type " << window.units_case();
}
}
std::sort(aggregation_hours.begin(), aggregation_hours.end());
std::sort(aggregation_days.begin(), aggregation_days.end());
for (auto num_hours : aggregation_hours) {
*aggregation_config->add_aggregation_window() = MakeHourWindow(num_hours);
}
for (auto num_days : aggregation_days) {
*aggregation_config->add_aggregation_window() = MakeDayWindow(num_days);
}
return true;
}
// Creates an AggregationConfig from a ProjectContext, MetricDefinition, and
// ReportDefinition and populates the aggregation_config field of a specified
// ReportAggregates. Also sets the type of the ReportAggregates based on the
// ReportDefinition's type.
//
// Accepts ReportDefinitions with either at least one WindowSize, or at least one
// OnDeviceAggregationWindow with units in days.
bool PopulateReportAggregates(const ProjectContext& project_context, const MetricDefinition& metric,
const ReportDefinition& report, ReportAggregates* report_aggregates) {
if (report.window_size_size() == 0 && report.aggregation_window_size() == 0) {
}
AggregationConfig* aggregation_config = report_aggregates->mutable_aggregation_config();
*aggregation_config->mutable_project() = project_context.project();
*aggregation_config->mutable_metric() = *project_context.GetMetric(metric.id());
*aggregation_config->mutable_report() = report;
if (!GetSortedAggregationWindowsFromReport(report, aggregation_config)) {
return false;
}
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
report_aggregates->set_allocated_unique_actives_aggregates(new UniqueActivesReportAggregates);
return true;
}
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
report_aggregates->set_allocated_numeric_aggregates(new PerDeviceNumericAggregates);
return true;
}
default:
return false;
}
}
// Move all items from the |window_size| field to the |aggregation_window| field
// of each AggregationConfig, preserving the order of the items. The |aggregation_window| field
// should be empty if the |window_size| field is nonempty. If for some reason this is not true, log
// an error and discard the contents of |aggregation_window| and replace them with the migrated
// |window_size| values.
void ConvertWindowSizesToAggregationDays(LocalAggregateStore* store) {
for (auto [key, aggregates] : store->by_report_key()) {
auto config = (*store->mutable_by_report_key())[key].mutable_aggregation_config();
if (config->window_size_size() > 0 && config->aggregation_window_size() > 0) {
LOG(ERROR) << "Config has both a window_size and an aggregation_window; discarding all "
"aggregation_windows";
config->clear_aggregation_window();
}
for (auto window_size : config->window_size()) {
*config->add_aggregation_window() = MakeDayWindow(window_size);
}
config->clear_window_size();
}
}
// Upgrades the LocalAggregateStore from version 0 to |kCurrentLocalAggregateStoreVersion|.
Status UpgradeLocalAggregateStoreFromVersion0(LocalAggregateStore* store) {
ConvertWindowSizesToAggregationDays(store);
store->set_version(kCurrentLocalAggregateStoreVersion);
return kOK;
}
} // namespace
AggregateStore::AggregateStore(const Encoder* encoder, const ObservationWriter* observation_writer,
ConsistentProtoStore* local_aggregate_proto_store,
ConsistentProtoStore* obs_history_proto_store,
const size_t backfill_days)
: encoder_(encoder),
observation_writer_(observation_writer),
local_aggregate_proto_store_(local_aggregate_proto_store),
obs_history_proto_store_(obs_history_proto_store),
internal_metrics_(logger::InternalMetrics::NewWithLogger(nullptr)) {
CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
backfill_days_ = backfill_days;
auto locked_store = protected_aggregate_store_.lock();
locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore();
auto restore_aggregates_status =
local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read LocalAggregateStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
"with empty LocalAggregateStore. File will be created on "
"first snapshot of the LocalAggregateStore.";
locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
break;
}
default: {
LOG(ERROR) << "Read to local_aggregate_proto_store failed with status code: "
<< restore_aggregates_status.error_code()
<< "\nError message: " << restore_aggregates_status.error_message()
<< "\nError details: " << restore_aggregates_status.error_details()
<< "\nProceeding with empty LocalAggregateStore.";
locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
}
}
if (auto status = MaybeUpgradeLocalAggregateStore(&(locked_store->local_aggregate_store));
status != kOK) {
LOG(ERROR) << "Failed to upgrade LocalAggregateStore to current version with status " << status
<< ".\nProceeding with empty "
"LocalAggregateStore.";
locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
}
auto locked_obs_history = protected_obs_history_.lock();
auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for obs_history_proto_store. Proceeding "
"with empty AggregatedObservationHistoryStore. File will be "
"created on first snapshot of the AggregatedObservationHistoryStore.";
break;
}
default: {
LOG(ERROR) << "Read to obs_history_proto_store failed with status code: "
<< restore_history_status.error_code()
<< "\nError message: " << restore_history_status.error_message()
<< "\nError details: " << restore_history_status.error_details()
<< "\nProceeding with empty AggregatedObservationHistoryStore.";
locked_obs_history->obs_history = MakeNewObservationHistoryStore();
}
}
if (auto status = MaybeUpgradeObservationHistoryStore(&locked_obs_history->obs_history);
status != kOK) {
LOG(ERROR)
<< "Failed to upgrade AggregatedObservationHistoryStore to current version with status "
<< status << ".\nProceeding with empty AggregatedObservationHistoryStore.";
locked_obs_history->obs_history = MakeNewObservationHistoryStore();
}
}
Status AggregateStore::MaybeInsertReportConfig(const ProjectContext& project_context,
const MetricDefinition& metric,
const ReportDefinition& report) {
auto locked = protected_aggregate_store_.lock();
std::string key;
if (!PopulateReportKey(project_context.project().customer_id(),
project_context.project().project_id(), metric.id(), report.id(), &key)) {
return kInvalidArguments;
}
ReportAggregates report_aggregates;
if (locked->local_aggregate_store.by_report_key().count(key) == 0) {
if (!PopulateReportAggregates(project_context, metric, report, &report_aggregates)) {
return kInvalidArguments;
}
(*locked->local_aggregate_store.mutable_by_report_key())[key] = report_aggregates;
}
// Make sure that the 'empty' store has the key as well.
ReportAggregates empty_report_aggregates;
if (locked->empty_local_aggregate_store.by_report_key().count(key) == 0) {
if (!PopulateReportAggregates(project_context, metric, report, &empty_report_aggregates)) {
return kInvalidArguments;
}
(*locked->empty_local_aggregate_store.mutable_by_report_key())[key] = empty_report_aggregates;
}
return kOK;
}
Status AggregateStore::SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
uint32_t report_id, uint64_t event_code, uint32_t day_index) {
if (is_disabled_) {
return kOK;
}
std::string key;
if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(key);
if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_unique_actives_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of type "
"UniqueActivesReportAggregates.";
return kInvalidArguments;
}
(*(*aggregates->second.mutable_unique_actives_aggregates()->mutable_by_event_code())[event_code]
.mutable_by_day_index())[day_index]
.mutable_activity_daily_aggregate()
->set_activity_indicator(true);
return kOK;
}
Status AggregateStore::UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t report_id,
const std::string& component, uint64_t event_code,
uint32_t day_index, int64_t value) {
if (is_disabled_) {
return kOK;
}
std::string report_key;
if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &report_key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(report_key);
if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_numeric_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of a "
"compatible type.";
return kInvalidArguments;
}
auto aggregates_by_day =
(*(*aggregates->second.mutable_numeric_aggregates()->mutable_by_component())[component]
.mutable_by_event_code())[event_code]
.mutable_by_day_index();
bool has_stored_aggregate = ((*aggregates_by_day).find(day_index) != aggregates_by_day->end());
auto day_aggregate = (*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate();
auto [status, updated_value] = GetUpdatedAggregate(
aggregates->second.aggregation_config().report().aggregation_type(),
has_stored_aggregate ? std::optional<int64_t>{day_aggregate->value()} : std::nullopt, value);
if (status == kOK) {
day_aggregate->set_value(updated_value);
return kOK;
}
return status;
}
RepeatedField<uint32_t> UnpackEventCodesProto(uint64_t packed_event_codes) {
RepeatedField<uint32_t> fields;
for (auto code : config::UnpackEventCodes(packed_event_codes)) {
*fields.Add() = code;
}
return fields;
}
Status AggregateStore::BackUpLocalAggregateStore() {
// Lock, copy the LocalAggregateStore, and release the lock. Write the copy
// to |local_aggregate_proto_store_|.
auto local_aggregate_store = CopyLocalAggregateStore();
size_t store_size = local_aggregate_store.ByteSizeLong();
// Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::AggregateStore,
static_cast<int64_t>(store_size));
auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: "
<< status.error_code() << "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
Status AggregateStore::BackUpObservationHistory() {
auto obs_history = protected_obs_history_.lock()->obs_history;
size_t history_size = obs_history.ByteSizeLong();
// Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationHistory,
static_cast<int64_t>(history_size));
auto status = obs_history_proto_store_->Write(obs_history);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
"::cobalt::util::Status error code: "
<< status.error_code() << "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
////////////////////// GarbageCollect and helper functions //////////////////
namespace {
void GarbageCollectUniqueActivesReportAggregates(uint32_t day_index, uint32_t max_aggregation_days,
uint32_t backfill_days,
UniqueActivesReportAggregates* report_aggregates) {
auto map_by_event_code = report_aggregates->mutable_by_event_code();
for (auto event_code = map_by_event_code->begin(); event_code != map_by_event_code->end();) {
auto map_by_day = event_code->second.mutable_by_day_index();
for (auto day = map_by_day->begin(); day != map_by_day->end();) {
if (day->first <= day_index - backfill_days - max_aggregation_days) {
day = map_by_day->erase(day);
} else {
++day;
}
}
if (map_by_day->empty()) {
event_code = map_by_event_code->erase(event_code);
} else {
++event_code;
}
}
}
void GarbageCollectNumericReportAggregates(uint32_t day_index, uint32_t max_aggregation_days,
uint32_t backfill_days,
PerDeviceNumericAggregates* report_aggregates) {
auto map_by_component = report_aggregates->mutable_by_component();
for (auto component = map_by_component->begin(); component != map_by_component->end();) {
auto map_by_event_code = component->second.mutable_by_event_code();
for (auto event_code = map_by_event_code->begin(); event_code != map_by_event_code->end();) {
auto map_by_day = event_code->second.mutable_by_day_index();
for (auto day = map_by_day->begin(); day != map_by_day->end();) {
if (day->first <= day_index - backfill_days - max_aggregation_days) {
day = map_by_day->erase(day);
} else {
++day;
}
}
if (map_by_day->empty()) {
event_code = map_by_event_code->erase(event_code);
} else {
++event_code;
}
}
if (map_by_event_code->empty()) {
component = map_by_component->erase(component);
} else {
++component;
}
}
}
} // namespace
Status AggregateStore::GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local) {
if (day_index_local == 0u) {
day_index_local = day_index_utc;
}
CHECK_LT(day_index_utc, UINT32_MAX);
CHECK_LT(day_index_local, UINT32_MAX);
CHECK_GE(day_index_utc, kMaxAllowedAggregationDays + backfill_days_);
CHECK_GE(day_index_local, kMaxAllowedAggregationDays + backfill_days_);
auto locked = protected_aggregate_store_.lock();
for (const auto& [report_key, aggregates] : locked->local_aggregate_store.by_report_key()) {
uint32_t day_index;
const auto& config = aggregates.aggregation_config();
switch (config.metric().time_zone_policy()) {
case MetricDefinition::UTC: {
day_index = day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
day_index = day_index_local;
break;
}
default:
LOG_FIRST_N(ERROR, 10) << "The TimeZonePolicy of this MetricDefinition is invalid.";
continue;
}
if (aggregates.aggregation_config().aggregation_window_size() == 0) {
LOG_FIRST_N(ERROR, 10) << "This ReportDefinition does not have an aggregation window.";
continue;
}
// PopulateReportAggregates ensured that aggregation_window has at least one element, that all
// aggregation windows are <= kMaxAllowedAggregationDays, and that config.aggregation_window()
// is sorted in increasing order.
uint32_t max_aggregation_days = 1u;
const OnDeviceAggregationWindow& largest_window =
config.aggregation_window(config.aggregation_window_size() - 1);
if (largest_window.units_case() == OnDeviceAggregationWindow::kDays) {
max_aggregation_days = largest_window.days();
}
if (max_aggregation_days == 0u || max_aggregation_days > day_index) {
LOG_FIRST_N(ERROR, 10) << "The maximum number of aggregation days " << max_aggregation_days
<< " of this ReportDefinition is out of range.";
continue;
}
// For each ReportAggregates, descend to and iterate over the sub-map of
// local aggregates keyed by day index. Keep buckets with day indices
// greater than |day_index| - |backfill_days_| - |max_aggregation_days|, and
// remove all buckets with smaller day indices.
switch (aggregates.type_case()) {
case ReportAggregates::kUniqueActivesAggregates: {
GarbageCollectUniqueActivesReportAggregates(
day_index, max_aggregation_days, backfill_days_,
locked->local_aggregate_store.mutable_by_report_key()
->at(report_key)
.mutable_unique_actives_aggregates());
break;
}
case ReportAggregates::kNumericAggregates: {
GarbageCollectNumericReportAggregates(day_index, max_aggregation_days, backfill_days_,
locked->local_aggregate_store.mutable_by_report_key()
->at(report_key)
.mutable_numeric_aggregates());
break;
}
default:
continue;
}
}
return kOK;
}
Status AggregateStore::GenerateObservations(uint32_t final_day_index_utc,
uint32_t final_day_index_local) {
if (final_day_index_local == 0u) {
final_day_index_local = final_day_index_utc;
}
CHECK_LT(final_day_index_utc, UINT32_MAX);
CHECK_LT(final_day_index_local, UINT32_MAX);
CHECK_GE(final_day_index_utc, kMaxAllowedAggregationDays + backfill_days_);
CHECK_GE(final_day_index_local, kMaxAllowedAggregationDays + backfill_days_);
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
const auto& config = aggregates.aggregation_config();
const auto& metric = config.metric();
auto metric_ref = MetricRef(&config.project(), &metric);
uint32_t final_day_index;
switch (metric.time_zone_policy()) {
case MetricDefinition::UTC: {
final_day_index = final_day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
final_day_index = final_day_index_local;
break;
}
default:
LOG_FIRST_N(ERROR, 10) << "The TimeZonePolicy of this MetricDefinition is invalid.";
continue;
}
const auto& report = config.report();
// PopulateReportAggregates ensured that aggregation_window has at least one element, that all
// aggregation windows are <= kMaxAllowedAggregationDays, and that config.aggregation_window()
// is sorted in increasing order.
if (config.aggregation_window_size() == 0u) {
LOG_FIRST_N(ERROR, 10) << "No aggregation_window found for this report.";
continue;
}
uint32_t max_aggregation_days = 1u;
const OnDeviceAggregationWindow& largest_window =
config.aggregation_window(config.aggregation_window_size() - 1);
if (largest_window.units_case() == OnDeviceAggregationWindow::kDays) {
max_aggregation_days = largest_window.days();
}
if (max_aggregation_days == 0u || max_aggregation_days > final_day_index) {
LOG_FIRST_N(ERROR, 10) << "The maximum number of aggregation days " << max_aggregation_days
<< " of this ReportDefinition is out of range.";
continue;
}
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
auto num_event_codes = RapporConfigHelper::BasicRapporNumCategories(metric);
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
auto status = GenerateUniqueActivesObservations(metric_ref, report_key, aggregates,
num_event_codes, final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
case MetricDefinition::EVENT_COUNT:
case MetricDefinition::ELAPSED_TIME:
case MetricDefinition::FRAME_RATE:
case MetricDefinition::MEMORY_USAGE: {
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
auto status = GenerateObsFromNumericAggregates(metric_ref, report_key, aggregates,
final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
default:
continue;
}
}
return kOK;
}
////////// GenerateUniqueActivesObservations and helper methods ////////////////
namespace {
// Given the set of daily aggregates for a fixed event code, and the size and
// end date of an aggregation window, returns the first day index within that
// window on which the event code occurred. Returns 0 if the event code did
// not occur within the window.
uint32_t FirstActiveDayIndexInWindow(const DailyAggregates& daily_aggregates,
uint32_t obs_day_index, uint32_t aggregation_days) {
for (uint32_t day_index = obs_day_index - aggregation_days + 1; day_index <= obs_day_index;
day_index++) {
auto day_aggregate = daily_aggregates.by_day_index().find(day_index);
if (day_aggregate != daily_aggregates.by_day_index().end() &&
day_aggregate->second.activity_daily_aggregate().activity_indicator()) {
return day_index;
}
}
return 0u;
}
// Given the day index of an event occurrence and the size and end date
// of an aggregation window, returns true if the occurrence falls within
// the window and false if not.
bool IsActivityInWindow(uint32_t active_day_index, uint32_t obs_day_index,
uint32_t aggregation_days) {
return (active_day_index <= obs_day_index && active_day_index > obs_day_index - aggregation_days);
}
} // namespace
uint32_t AggregateStore::GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
uint32_t event_code,
uint32_t aggregation_days) const {
auto obs_history = protected_obs_history_.const_lock()->obs_history;
auto report_history = obs_history.by_report_key().find(report_key);
if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
auto event_code_history =
report_history->second.unique_actives_history().by_event_code().find(event_code);
if (event_code_history == report_history->second.unique_actives_history().by_event_code().end()) {
return 0u;
}
auto window_history = event_code_history->second.by_window_size().find(aggregation_days);
if (window_history == event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_history->second;
}
void AggregateStore::SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
uint32_t event_code,
uint32_t aggregation_days,
uint32_t value) {
auto locked = protected_obs_history_.lock();
(*(*(*locked->obs_history.mutable_by_report_key())[report_key]
.mutable_unique_actives_history()
->mutable_by_event_code())[event_code]
.mutable_by_window_size())[aggregation_days] = value;
}
Status AggregateStore::GenerateSingleUniqueActivesObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const {
auto encoder_result = encoder_->EncodeUniqueActivesObservation(metric_ref, report, obs_day_index,
event_code, was_active, window);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode UniqueActivesObservation";
return kOther;
}
auto writer_status = observation_writer_->WriteObservation(encoder_result.observation,
std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status AggregateStore::GenerateUniqueActivesObservations(const MetricRef metric_ref,
const std::string& report_key,
const ReportAggregates& report_aggregates,
uint32_t num_event_codes,
uint32_t final_day_index) {
CHECK_GT(final_day_index, backfill_days_);
// The earliest day index for which we might need to generate an
// Observation.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
auto daily_aggregates =
report_aggregates.unique_actives_aggregates().by_event_code().find(event_code);
// Have any events ever been logged for this report and event code?
bool found_event_code =
(daily_aggregates != report_aggregates.unique_actives_aggregates().by_event_code().end());
for (const auto& window : report_aggregates.aggregation_config().aggregation_window()) {
// Skip all hourly windows, and all daily windows which are larger than
// kMaxAllowedAggregationDays.
//
// TODO(pesk): Generate observations for hourly windows.
if (window.units_case() != OnDeviceAggregationWindow::kDays) {
LOG(INFO) << "Skipping unsupported aggregation window.";
continue;
}
if (window.days() > kMaxAllowedAggregationDays) {
LOG(WARNING) << "GenerateUniqueActivesObservations ignoring a window "
"size exceeding the maximum allowed value";
continue;
}
// Find the earliest day index for which an Observation has not yet
// been generated for this report, event code, and window size. If
// that day index is later than |final_day_index|, no Observation is
// generated on this invocation.
auto last_gen = GetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days());
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
// The latest day index on which |event_type| is known to have
// occurred, so far. This value will be updated as we search
// forward from the earliest day index belonging to a window of
// interest.
uint32_t active_day_index = 0u;
// Iterate over the day indices |obs_day_index| for which we need
// to generate Observations. On each iteration, generate an
// Observation for the |window| ending on |obs_day_index|.
for (uint32_t obs_day_index = first_day_index; obs_day_index <= final_day_index;
obs_day_index++) {
bool was_active = false;
if (found_event_code) {
// If the current value of |active_day_index| falls within the
// window, generate an Observation of activity. If not, search
// forward in the window, update |active_day_index|, and generate an
// Observation of activity or inactivity depending on the result of
// the search.
if (IsActivityInWindow(active_day_index, obs_day_index, window.days())) {
was_active = true;
} else {
active_day_index =
FirstActiveDayIndexInWindow(daily_aggregates->second, obs_day_index, window.days());
was_active = IsActivityInWindow(active_day_index, obs_day_index, window.days());
}
}
auto status = GenerateSingleUniqueActivesObservation(
metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index, event_code,
window, was_active);
if (status != kOK) {
return status;
}
SetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days(), obs_day_index);
}
}
}
return kOK;
}
////////// GenerateObsFromNumericAggregates and helper methods /////////////
uint32_t AggregateStore::GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
const std::string& component,
uint32_t event_code,
uint32_t aggregation_days) const {
auto obs_history = protected_obs_history_.const_lock()->obs_history;
const auto& report_history = obs_history.by_report_key().find(report_key);
if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
if (!report_history->second.has_per_device_numeric_history()) {
return 0u;
}
const auto& component_history =
report_history->second.per_device_numeric_history().by_component().find(component);
if (component_history ==
report_history->second.per_device_numeric_history().by_component().end()) {
return 0u;
}
const auto& event_code_history = component_history->second.by_event_code().find(event_code);
if (event_code_history == component_history->second.by_event_code().end()) {
return 0u;
}
const auto& window_history = event_code_history->second.by_window_size().find(aggregation_days);
if (window_history == event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_history->second;
}
void AggregateStore::SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
const std::string& component,
uint32_t event_code,
uint32_t aggregation_days,
uint32_t value) {
auto locked = protected_obs_history_.lock();
(*(*(*(*locked->obs_history.mutable_by_report_key())[report_key]
.mutable_per_device_numeric_history()
->mutable_by_component())[component]
.mutable_by_event_code())[event_code]
.mutable_by_window_size())[aggregation_days] = value;
}
uint32_t AggregateStore::GetReportParticipationLastGeneratedDayIndex(
const std::string& report_key) const {
auto obs_history = protected_obs_history_.const_lock()->obs_history;
const auto& report_history = obs_history.by_report_key().find(report_key);
if (report_history == obs_history.by_report_key().end()) {
return 0u;
}
return report_history->second.report_participation_history().last_generated();
}
void AggregateStore::SetReportParticipationLastGeneratedDayIndex(const std::string& report_key,
uint32_t value) {
auto locked = protected_obs_history_.lock();
(*locked->obs_history.mutable_by_report_key())[report_key]
.mutable_report_participation_history()
->set_last_generated(value);
}
void AggregateStore::DeleteData() {
LOG(INFO) << "AggregateStore: Deleting stored data";
{
auto locked = protected_aggregate_store_.lock();
locked->local_aggregate_store = locked->empty_local_aggregate_store;
}
protected_obs_history_.lock()->obs_history = MakeNewObservationHistoryStore();
}
void AggregateStore::Disable(bool is_disabled) {
LOG(INFO) << "AggregateStore: " << (is_disabled ? "Disabling" : "Enabling")
<< " event aggregate storage.";
is_disabled_ = is_disabled;
}
Status AggregateStore::GenerateSinglePerDeviceNumericObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
int64_t value) const {
Encoder::Result encoder_result =
encoder_->EncodePerDeviceNumericObservation(metric_ref, report, obs_day_index, component,
UnpackEventCodesProto(event_code), value, window);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode PerDeviceNumericObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status AggregateStore::GenerateSinglePerDeviceHistogramObservation(
const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
int64_t value) const {
Encoder::Result encoder_result = encoder_->EncodePerDeviceHistogramObservation(
metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value,
window);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode PerDeviceNumericObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status AggregateStore::GenerateSingleReportParticipationObservation(const MetricRef metric_ref,
const ReportDefinition* report,
uint32_t obs_day_index) const {
auto encoder_result =
encoder_->EncodeReportParticipationObservation(metric_ref, report, obs_day_index);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr || encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode ReportParticipationObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status AggregateStore::GenerateObsFromNumericAggregates(const MetricRef metric_ref,
const std::string& report_key,
const ReportAggregates& report_aggregates,
uint32_t final_day_index) {
CHECK_GT(final_day_index, backfill_days_);
// The first day index for which we might have to generate an Observation.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
// Generate any necessary PerDeviceNumericObservations for this report.
for (const auto& [component, event_code_aggregates] :
report_aggregates.numeric_aggregates().by_component()) {
for (const auto& [event_code, daily_aggregates] : event_code_aggregates.by_event_code()) {
// Populate a helper map keyed by day indices which belong to the range
// [|backfill_period_start|, |final_day_index|]. The value at a day
// index is the list of windows, in increasing size order, for which an
// Observation should be generated for that day index.
std::map<uint32_t, std::vector<OnDeviceAggregationWindow>> windows_by_obs_day;
for (const auto& window : report_aggregates.aggregation_config().aggregation_window()) {
if (window.units_case() != OnDeviceAggregationWindow::kDays) {
LOG(INFO) << "Skipping unsupported aggregation window.";
continue;
}
auto last_gen = GetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code,
window.days());
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
for (auto obs_day_index = first_day_index; obs_day_index <= final_day_index;
obs_day_index++) {
windows_by_obs_day[obs_day_index].push_back(window);
}
}
// Iterate over the day indices |obs_day_index| for which we might need
// to generate an Observation. For each day index, generate an
// Observation for each needed window.
//
// More precisely, for each needed window, iterate over the days within that window. If at
// least one numeric event was logged during the window, compute the aggregate of the numeric
// values over the window and generate a PerDeviceNumericObservation. Whether or not a numeric
// event was found, update the AggregatedObservationHistory for this report, component, event
// code, and window size with |obs_day_index| as the most recent date of Observation
// generation. This reflects the fact that all needed Observations were generated for the
// window ending on that date.
for (auto obs_day_index = backfill_period_start; obs_day_index <= final_day_index;
obs_day_index++) {
const auto& windows = windows_by_obs_day.find(obs_day_index);
if (windows == windows_by_obs_day.end()) {
continue;
}
bool found_value_for_window = false;
int64_t window_aggregate = 0;
uint32_t num_days = 0;
for (const auto& window : windows->second) {
while (num_days < window.days()) {
bool found_value_for_day = false;
const auto& day_aggregates =
daily_aggregates.by_day_index().find(obs_day_index - num_days);
if (day_aggregates != daily_aggregates.by_day_index().end()) {
found_value_for_day = true;
}
const auto& aggregation_type =
report_aggregates.aggregation_config().report().aggregation_type();
switch (aggregation_type) {
case ReportDefinition::SUM:
if (found_value_for_day) {
window_aggregate += day_aggregates->second.numeric_daily_aggregate().value();
found_value_for_window = true;
}
break;
case ReportDefinition::MAX:
if (found_value_for_day) {
window_aggregate = std::max(
window_aggregate, day_aggregates->second.numeric_daily_aggregate().value());
found_value_for_window = true;
}
break;
case ReportDefinition::MIN:
if (found_value_for_day && !found_value_for_window) {
window_aggregate = day_aggregates->second.numeric_daily_aggregate().value();
found_value_for_window = true;
} else if (found_value_for_day) {
window_aggregate = std::min(
window_aggregate, day_aggregates->second.numeric_daily_aggregate().value());
}
break;
default:
LOG(ERROR) << "Unexpected aggregation type " << aggregation_type;
return kInvalidArguments;
}
num_days++;
}
if (found_value_for_window) {
Status status;
const ReportDefinition* report = &report_aggregates.aggregation_config().report();
switch (report->report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS: {
status = GenerateSinglePerDeviceNumericObservation(
metric_ref, report, obs_day_index, component, event_code, window,
window_aggregate);
if (status != kOK) {
return status;
}
break;
}
case ReportDefinition::PER_DEVICE_HISTOGRAM: {
auto status = GenerateSinglePerDeviceHistogramObservation(
metric_ref, report, obs_day_index, component, event_code, window,
window_aggregate);
if (status != kOK) {
return status;
}
break;
}
default:
LOG(ERROR) << "Unexpected report type " << report->report_type();
return kInvalidArguments;
}
}
SetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days(),
obs_day_index);
}
}
}
}
// Generate any necessary ReportParticipationObservations for this report.
auto participation_last_gen = GetReportParticipationLastGeneratedDayIndex(report_key);
auto participation_first_day_index = std::max(participation_last_gen + 1, backfill_period_start);
for (auto obs_day_index = participation_first_day_index; obs_day_index <= final_day_index;
obs_day_index++) {
GenerateSingleReportParticipationObservation(
metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index);
SetReportParticipationLastGeneratedDayIndex(report_key, obs_day_index);
}
return kOK;
}
LocalAggregateStore AggregateStore::MakeNewLocalAggregateStore(uint32_t version) {
LocalAggregateStore store;
store.set_version(version);
return store;
}
AggregatedObservationHistoryStore AggregateStore::MakeNewObservationHistoryStore(uint32_t version) {
AggregatedObservationHistoryStore store;
store.set_version(version);
return store;
}
// We can upgrade from v0, but no other versions.
Status AggregateStore::MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
uint32_t version = store->version();
if (version == kCurrentLocalAggregateStoreVersion) {
return kOK;
}
VLOG(4) << "Attempting to upgrade LocalAggregateStore from version " << version;
switch (version) {
case 0u:
return UpgradeLocalAggregateStoreFromVersion0(store);
default:
LOG(ERROR) << "Cannot upgrade LocalAggregateStore from version " << version;
return kInvalidArguments;
}
}
// The current version is the earliest version, so no other versions are accepted.
Status AggregateStore::MaybeUpgradeObservationHistoryStore(
AggregatedObservationHistoryStore* store) {
uint32_t version = store->version();
if (version == kCurrentObservationHistoryStoreVersion) {
return kOK;
}
LOG(ERROR) << "Cannot upgrade AggregatedObservationHistoryStore from version " << version;
return kInvalidArguments;
}
} // namespace cobalt::local_aggregation