blob: ac3e43f41a515a2377dd060943713ffff152853c [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "logger/event_aggregator.h"
#include <algorithm>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "algorithms/rappor/rappor_config_helper.h"
#include "config/metric_definition.pb.h"
#include "config/packed_event_codes.h"
#include "logger/project_context.h"
#include "util/datetime_util.h"
#include "util/proto_util.h"
#include "util/status.h"
namespace cobalt {
using google::protobuf::RepeatedField;
using rappor::RapporConfigHelper;
using util::ConsistentProtoStore;
using util::SerializeToBase64;
using util::StatusCode;
using util::SystemClock;
using util::TimeToDayIndex;
namespace logger {
namespace {
// Creates an AggregationConfig from a ProjectContext, MetricDefinition, and
// ReportDefinition and populates the aggregation_config field of a specified
// ReportAggregates. Also sets the type of the ReportAggregates based on the
// ReportDefinition's type.
bool PopulateReportAggregates(const ProjectContext& project_context,
const MetricDefinition& metric,
const ReportDefinition& report,
ReportAggregates* report_aggregates) {
if (report.window_size_size() == 0) {
LOG(ERROR) << "Report must have at least one window size.";
return false;
}
AggregationConfig* aggregation_config =
report_aggregates->mutable_aggregation_config();
*aggregation_config->mutable_project() = project_context.project();
*aggregation_config->mutable_metric() =
*project_context.GetMetric(metric.id());
*aggregation_config->mutable_report() = report;
std::vector<uint32_t> window_sizes;
for (const uint32_t window_size : report.window_size()) {
if (window_size == 0 ||
window_size > EventAggregator::kMaxAllowedAggregationWindowSize) {
LOG(ERROR) << "Window size must be positive and cannot exceed "
<< EventAggregator::kMaxAllowedAggregationWindowSize;
return false;
}
window_sizes.push_back(window_size);
}
std::sort(window_sizes.begin(), window_sizes.end());
for (const uint32_t window_size : window_sizes) {
aggregation_config->add_window_size(window_size);
}
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
report_aggregates->set_allocated_unique_actives_aggregates(
new UniqueActivesReportAggregates);
return true;
}
case ReportDefinition::PER_DEVICE_NUMERIC_STATS: {
report_aggregates->set_allocated_numeric_aggregates(
new PerDeviceNumericAggregates);
return true;
}
default:
return false;
}
}
// Populates a ReportAggregationKey proto message and then populates a string
// with the base64 encoding of the serialized proto.
bool PopulateReportKey(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t report_id,
std::string* key) {
ReportAggregationKey key_data;
key_data.set_customer_id(customer_id);
key_data.set_project_id(project_id);
key_data.set_metric_id(metric_id);
key_data.set_report_id(report_id);
return SerializeToBase64(key_data, key);
}
// Given a ProjectContext, MetricDefinition, and ReportDefinition and a pointer
// to the LocalAggregateStore, checks whether a key with the same customer,
// project, metric, and report ID already exists in the LocalAggregateStore. If
// not, creates and inserts a new key and value. Returns kInvalidArguments if
// creation of the key or value fails, and kOK otherwise. The caller should hold
// the mutex protecting the LocalAggregateStore.
Status MaybeInsertReportConfigLocked(const ProjectContext& project_context,
const MetricDefinition& metric,
const ReportDefinition& report,
LocalAggregateStore* store) {
std::string key;
if (!PopulateReportKey(project_context.project().customer_id(),
project_context.project().project_id(), metric.id(),
report.id(), &key)) {
return kInvalidArguments;
}
ReportAggregates report_aggregates;
if (store->by_report_key().count(key) == 0) {
if (!PopulateReportAggregates(project_context, metric, report,
&report_aggregates)) {
return kInvalidArguments;
}
(*store->mutable_by_report_key())[key] = report_aggregates;
}
return kOK;
}
RepeatedField<uint32_t> UnpackEventCodesProto(uint64_t packed_event_codes) {
RepeatedField<uint32_t> fields;
for (auto code : config::UnpackEventCodes(packed_event_codes)) {
*fields.Add() = code;
}
return fields;
}
} // namespace
EventAggregator::EventAggregator(
const Encoder* encoder, const ObservationWriter* observation_writer,
ConsistentProtoStore* local_aggregate_proto_store,
ConsistentProtoStore* obs_history_proto_store, const size_t backfill_days,
const std::chrono::seconds aggregate_backup_interval,
const std::chrono::seconds generate_obs_interval,
const std::chrono::seconds gc_interval)
: encoder_(encoder),
observation_writer_(observation_writer),
local_aggregate_proto_store_(local_aggregate_proto_store),
obs_history_proto_store_(obs_history_proto_store) {
CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count())
<< "aggregate_backup_interval must be less than or equal to "
"generate_obs_interval";
CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
<< "aggregate_backup_interval must be less than or equal to gc_interval";
CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to "
<< kMaxAllowedBackfillDays;
aggregate_backup_interval_ = aggregate_backup_interval;
generate_obs_interval_ = generate_obs_interval;
gc_interval_ = gc_interval;
backfill_days_ = backfill_days;
auto locked = protected_aggregate_store_.lock();
auto restore_aggregates_status =
local_aggregate_proto_store_->Read(&(locked->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read LocalAggregateStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
"with empty LocalAggregateStore. File will be created on "
"first snapshot of the LocalAggregateStore.";
locked->local_aggregate_store.Clear();
break;
}
default: {
LOG(ERROR)
<< "Read to local_aggregate_proto_store failed with status code: "
<< restore_aggregates_status.error_code()
<< "\nError message: " << restore_aggregates_status.error_message()
<< "\nError details: " << restore_aggregates_status.error_details()
<< "\nProceeding with empty LocalAggregateStore.";
locked->local_aggregate_store.Clear();
}
}
auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for obs_history_proto_store. Proceeding "
"with empty AggregatedObservationHistoryStore. File will be "
"created on first snapshot of the "
"AggregatedObservationHistoryStore.";
break;
}
default: {
LOG(ERROR) << "Read to obs_history_proto_store failed with status code: "
<< restore_history_status.error_code() << "\nError message: "
<< restore_history_status.error_message()
<< "\nError details: "
<< restore_history_status.error_details()
<< "\nProceeding with empty AggregatedObservationHistory.";
obs_history_ = AggregatedObservationHistoryStore();
}
}
clock_.reset(new SystemClock());
}
void EventAggregator::Start() {
auto locked = protected_shutdown_flag_.lock();
locked->shut_down = false;
std::thread t([this] { this->Run(); });
worker_thread_ = std::move(t);
}
// TODO(pesk): update the EventAggregator's view of a Metric
// or ReportDefinition when appropriate.
Status EventAggregator::UpdateAggregationConfigs(
const ProjectContext& project_context) {
auto locked = protected_aggregate_store_.lock();
Status status;
for (const auto& metric : project_context.metrics()) {
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
status = MaybeInsertReportConfigLocked(
project_context, metric, report,
&(locked->local_aggregate_store));
if (status != kOK) {
return status;
}
}
default:
continue;
}
}
}
case MetricDefinition::EVENT_COUNT:
case MetricDefinition::ELAPSED_TIME:
case MetricDefinition::FRAME_RATE:
case MetricDefinition::MEMORY_USAGE: {
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS: {
status = MaybeInsertReportConfigLocked(
project_context, metric, report,
&(locked->local_aggregate_store));
if (status != kOK) {
return status;
}
}
default:
continue;
}
}
}
default:
continue;
}
}
return kOK;
}
Status EventAggregator::LogUniqueActivesEvent(uint32_t report_id,
EventRecord* event_record) {
if (!event_record->event->has_occurrence_event()) {
LOG(ERROR) << "EventAggregator::LogUniqueActivesEvent can only "
"accept OccurrenceEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates =
locked->local_aggregate_store.mutable_by_report_key()->find(key);
if (aggregates ==
locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_unique_actives_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of type "
"UniqueActivesReportAggregates.";
return kInvalidArguments;
}
(*(*aggregates->second.mutable_unique_actives_aggregates()
->mutable_by_event_code())[event_record->event->occurrence_event()
.event_code()]
.mutable_by_day_index())[event_record->event->day_index()]
.mutable_activity_daily_aggregate()
->set_activity_indicator(true);
return kOK;
}
Status EventAggregator::LogCountEvent(uint32_t report_id,
EventRecord* event_record) {
if (event_record->event->type_case() != Event::kCountEvent) {
LOG(ERROR) << "EventAggregator: LogCountEvent can only accept "
"CountEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
const CountEvent& count_event = event_record->event->count_event();
return LogNumericEvent(
key, event_record->event->day_index(), count_event.component(),
config::PackEventCodes(count_event.event_code()), count_event.count());
}
Status EventAggregator::LogElapsedTimeEvent(uint32_t report_id,
EventRecord* event_record) {
if (event_record->event->type_case() != Event::kElapsedTimeEvent) {
LOG(ERROR) << "EventAggregator: LogElapsedTimeEvent can only accept "
"ElapsedTimeEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
const ElapsedTimeEvent& elapsed_time_event =
event_record->event->elapsed_time_event();
return LogNumericEvent(
key, event_record->event->day_index(), elapsed_time_event.component(),
config::PackEventCodes(elapsed_time_event.event_code()),
elapsed_time_event.elapsed_micros());
}
Status EventAggregator::LogFrameRateEvent(uint32_t report_id,
EventRecord* event_record) {
if (event_record->event->type_case() != Event::kFrameRateEvent) {
LOG(ERROR) << "EventAggregator: LogFrameRateEvent can only accept "
"FrameRateEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
const FrameRateEvent& frame_rate_event =
event_record->event->frame_rate_event();
return LogNumericEvent(key, event_record->event->day_index(),
frame_rate_event.component(),
config::PackEventCodes(frame_rate_event.event_code()),
frame_rate_event.frames_per_1000_seconds());
}
Status EventAggregator::LogMemoryUsageEvent(uint32_t report_id,
EventRecord* event_record) {
if (event_record->event->type_case() != Event::kMemoryUsageEvent) {
LOG(ERROR) << "EventAggregator: LogMemoryUsageEvent can only accept "
"MemoryUsageEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
const MemoryUsageEvent& memory_usage_event =
event_record->event->memory_usage_event();
return LogNumericEvent(
key, event_record->event->day_index(), memory_usage_event.component(),
config::PackEventCodes(memory_usage_event.event_code()),
memory_usage_event.bytes());
}
Status EventAggregator::LogNumericEvent(const std::string& report_key,
uint32_t day_index,
const std::string& component,
uint64_t event_code, int64_t value) {
auto locked = protected_aggregate_store_.lock();
auto aggregates =
locked->local_aggregate_store.mutable_by_report_key()->find(report_key);
if (aggregates ==
locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_numeric_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of a "
"compatible type.";
return kInvalidArguments;
}
auto aggregates_by_day = (*(*aggregates->second.mutable_numeric_aggregates()
->mutable_by_component())[component]
.mutable_by_event_code())[event_code]
.mutable_by_day_index();
bool first_event_today =
((*aggregates_by_day).find(day_index) == aggregates_by_day->end());
auto day_aggregate =
(*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate();
const auto& aggregation_type =
aggregates->second.aggregation_config().report().aggregation_type();
switch (aggregation_type) {
case ReportDefinition::SUM:
day_aggregate->set_value(value + day_aggregate->value());
return kOK;
case ReportDefinition::MAX:
day_aggregate->set_value(std::max(value, day_aggregate->value()));
return kOK;
case ReportDefinition::MIN:
if (first_event_today) {
day_aggregate->set_value(value);
} else {
day_aggregate->set_value(std::min(value, day_aggregate->value()));
}
return kOK;
default:
LOG(ERROR) << "Unexpected aggregation type " << aggregation_type;
return kInvalidArguments;
}
}
Status EventAggregator::GenerateObservationsNoWorker(
uint32_t final_day_index_utc, uint32_t final_day_index_local) {
if (worker_thread_.joinable()) {
LOG(ERROR) << "GenerateObservationsNoWorker() was called while "
"worker thread was running.";
return kOther;
}
return GenerateObservations(final_day_index_utc, final_day_index_local);
}
Status EventAggregator::BackUpLocalAggregateStore() {
// Lock, copy the LocalAggregateStore, and release the lock. Write the copy
// to |local_aggregate_proto_store_|.
auto local_aggregate_store = CopyLocalAggregateStore();
auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: "
<< status.error_code()
<< "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
Status EventAggregator::BackUpObservationHistory() {
auto status = obs_history_proto_store_->Write(obs_history_);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
"::cobalt::util::Status error code: "
<< status.error_code()
<< "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
void EventAggregator::ShutDown() {
if (worker_thread_.joinable()) {
{
auto locked = protected_shutdown_flag_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
}
worker_thread_.join();
} else {
protected_shutdown_flag_.lock()->shut_down = true;
}
}
void EventAggregator::Run() {
auto current_time = clock_->now();
// Schedule Observation generation to happen in the first cycle.
next_generate_obs_ = current_time;
// Schedule garbage collection to happen |gc_interval_| seconds from now.
next_gc_ = current_time + gc_interval_;
// Acquire the mutex protecting the shutdown flag and condition variable.
auto locked = protected_shutdown_flag_.lock();
while (true) {
// If shutdown has been requested, back up the LocalAggregateStore and
// exit.
if (locked->shut_down) {
BackUpLocalAggregateStore();
return;
}
// Sleep until the next scheduled backup of the LocalAggregateStore or
// until notified of shutdown. Back up the LocalAggregateStore after
// waking.
auto shutdown_requested = locked.wait_for_with(
&(locked->shutdown_notifier), aggregate_backup_interval_,
[&locked]() { return locked->shut_down; });
BackUpLocalAggregateStore();
// If the worker thread was woken up by a shutdown request, exit.
// Otherwise, complete any scheduled Observation generation and garbage
// collection.
if (shutdown_requested) {
return;
}
// Check whether it is time to generate Observations or to garbage-collect
// the LocalAggregate store. If so, do that task and schedule the next
// occurrence.
DoScheduledTasks(clock_->now());
}
}
void EventAggregator::DoScheduledTasks(
std::chrono::system_clock::time_point current_time) {
auto current_time_t = std::chrono::system_clock::to_time_t(current_time);
auto current_day_index_utc =
TimeToDayIndex(current_time_t, MetricDefinition::UTC);
auto current_day_index_local =
TimeToDayIndex(current_time_t, MetricDefinition::LOCAL);
// Skip the tasks (but do schedule a retry) if either day index is too small.
uint32_t min_allowed_day_index =
kMaxAllowedAggregationWindowSize + backfill_days_ + 1;
bool skip_tasks = (current_day_index_utc < min_allowed_day_index ||
current_day_index_local < min_allowed_day_index);
if (current_time >= next_generate_obs_) {
next_generate_obs_ += generate_obs_interval_;
if (skip_tasks) {
LOG_FIRST_N(ERROR, 10)
<< "EventAggregator is skipping Observation generation because the "
"current day index is too small.";
} else {
auto obs_status = GenerateObservations(current_day_index_utc - 1,
current_day_index_local - 1);
if (obs_status == kOK) {
BackUpObservationHistory();
} else {
LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
}
}
}
if (current_time >= next_gc_) {
next_gc_ += gc_interval_;
if (skip_tasks) {
LOG_FIRST_N(ERROR, 10)
<< "EventAggregator is skipping garbage collection because the "
"current day index is too small.";
} else {
auto gc_status = GarbageCollect(current_day_index_utc - 1,
current_day_index_local - 1);
if (gc_status == kOK) {
BackUpLocalAggregateStore();
} else {
LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
}
}
}
}
////////////////////// GarbageCollect and helper functions /////////////////////
namespace {
void GarbageCollectUniqueActivesReportAggregates(
uint32_t day_index, uint32_t max_window_size, uint32_t backfill_days,
UniqueActivesReportAggregates* report_aggregates) {
auto map_by_event_code = report_aggregates->mutable_by_event_code();
for (auto event_code = map_by_event_code->begin();
event_code != map_by_event_code->end();) {
auto map_by_day = event_code->second.mutable_by_day_index();
for (auto day = map_by_day->begin(); day != map_by_day->end();) {
if (day->first <= day_index - backfill_days - max_window_size) {
day = map_by_day->erase(day);
} else {
++day;
}
}
if (map_by_day->empty()) {
event_code = map_by_event_code->erase(event_code);
} else {
++event_code;
}
}
}
void GarbageCollectNumericReportAggregates(
uint32_t day_index, uint32_t max_window_size, uint32_t backfill_days,
PerDeviceNumericAggregates* report_aggregates) {
auto map_by_component = report_aggregates->mutable_by_component();
for (auto component = map_by_component->begin();
component != map_by_component->end();) {
auto map_by_event_code = component->second.mutable_by_event_code();
for (auto event_code = map_by_event_code->begin();
event_code != map_by_event_code->end();) {
auto map_by_day = event_code->second.mutable_by_day_index();
for (auto day = map_by_day->begin(); day != map_by_day->end();) {
if (day->first <= day_index - backfill_days - max_window_size) {
day = map_by_day->erase(day);
} else {
++day;
}
}
if (map_by_day->empty()) {
event_code = map_by_event_code->erase(event_code);
} else {
++event_code;
}
}
if (map_by_event_code->empty()) {
component = map_by_component->erase(component);
} else {
++component;
}
}
}
} // namespace
Status EventAggregator::GarbageCollect(uint32_t day_index_utc,
uint32_t day_index_local) {
if (day_index_local == 0u) {
day_index_local = day_index_utc;
}
CHECK_LT(day_index_utc, UINT32_MAX);
CHECK_LT(day_index_local, UINT32_MAX);
CHECK_GE(day_index_utc, kMaxAllowedAggregationWindowSize + backfill_days_);
CHECK_GE(day_index_local, kMaxAllowedAggregationWindowSize + backfill_days_);
auto locked = protected_aggregate_store_.lock();
for (auto pair : locked->local_aggregate_store.by_report_key()) {
uint32_t day_index;
switch (pair.second.aggregation_config().metric().time_zone_policy()) {
case MetricDefinition::UTC: {
day_index = day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
day_index = day_index_local;
break;
}
default:
LOG_FIRST_N(ERROR, 10)
<< "The TimeZonePolicy of this MetricDefinition is invalid.";
continue;
}
if (pair.second.aggregation_config().window_size_size() == 0) {
LOG_FIRST_N(ERROR, 10)
<< "This ReportDefinition does not have a window size.";
continue;
}
uint32_t max_window_size = pair.second.aggregation_config().window_size(
pair.second.aggregation_config().window_size_size() - 1);
if (max_window_size == 0u || max_window_size > day_index) {
LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
<< " of this ReportDefinition is out of range.";
continue;
}
// For each ReportAggregates, descend to and iterate over the sub-map of
// local aggregates keyed by day index. Keep buckets with day indices
// greater than |day_index| - |backfill_days_| - |max_window_size|, and
// remove all buckets with smaller day indices.
switch (pair.second.type_case()) {
case ReportAggregates::kUniqueActivesAggregates: {
GarbageCollectUniqueActivesReportAggregates(
day_index, max_window_size, backfill_days_,
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_unique_actives_aggregates());
break;
}
case ReportAggregates::kNumericAggregates: {
GarbageCollectNumericReportAggregates(
day_index, max_window_size, backfill_days_,
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_numeric_aggregates());
break;
}
default:
continue;
}
}
return kOK;
}
Status EventAggregator::GenerateObservations(uint32_t final_day_index_utc,
uint32_t final_day_index_local) {
if (final_day_index_local == 0u) {
final_day_index_local = final_day_index_utc;
}
CHECK_LT(final_day_index_utc, UINT32_MAX);
CHECK_LT(final_day_index_local, UINT32_MAX);
CHECK_GE(final_day_index_utc,
kMaxAllowedAggregationWindowSize + backfill_days_);
CHECK_GE(final_day_index_local,
kMaxAllowedAggregationWindowSize + backfill_days_);
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
for (auto& pair : local_aggregate_store.by_report_key()) {
const auto& config = pair.second.aggregation_config();
const auto& metric = config.metric();
auto metric_ref = MetricRef(&config.project(), &metric);
uint32_t final_day_index;
switch (metric.time_zone_policy()) {
case MetricDefinition::UTC: {
final_day_index = final_day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
final_day_index = final_day_index_local;
break;
}
default:
LOG_FIRST_N(ERROR, 10)
<< "The TimeZonePolicy of this MetricDefinition is invalid.";
continue;
}
const auto& report = config.report();
// PopulateReportAggregates has checked that there is at least one window
// size and that all window sizes are positive and <=
// kMaxAllowedAggregationWindowSize, and has sorted the elements of
// config.window_size() in increasing order.
if (config.window_size_size() == 0u) {
LOG_FIRST_N(ERROR, 10) << "This ReportDefinition has no window_size.";
continue;
}
uint32_t max_window_size =
config.window_size(config.window_size_size() - 1);
if (max_window_size == 0u || max_window_size > final_day_index) {
LOG_FIRST_N(ERROR, 10) << "The maximum window size " << max_window_size
<< " of this ReportDefinition is out of range.";
continue;
}
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
auto num_event_codes =
RapporConfigHelper::BasicRapporNumCategories(metric);
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
auto status = GenerateUniqueActivesObservations(
metric_ref, pair.first, pair.second, num_event_codes,
final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
case MetricDefinition::EVENT_COUNT:
case MetricDefinition::ELAPSED_TIME:
case MetricDefinition::FRAME_RATE:
case MetricDefinition::MEMORY_USAGE: {
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_NUMERIC_STATS: {
auto status = GeneratePerDeviceNumericObservations(
metric_ref, pair.first, pair.second, final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
default:
continue;
}
}
return kOK;
}
////////// GenerateUniqueActivesObservations and helper methods ////////////////
namespace {
// Given the set of daily aggregates for a fixed event code, and the size and
// end date of an aggregation window, returns the first day index within that
// window on which the event code occurred. Returns 0 if the event code did
// not occur within the window.
uint32_t FirstActiveDayIndexInWindow(const DailyAggregates& daily_aggregates,
uint32_t obs_day_index,
uint32_t window_size) {
for (uint32_t day_index = obs_day_index - window_size + 1;
day_index <= obs_day_index; day_index++) {
auto day_aggregate = daily_aggregates.by_day_index().find(day_index);
if (day_aggregate != daily_aggregates.by_day_index().end() &&
day_aggregate->second.activity_daily_aggregate().activity_indicator() ==
true) {
return day_index;
}
}
return 0u;
}
// Given the day index of an event occurrence and the size and end date
// of an aggregation window, returns true if the occurrence falls within
// the window and false if not.
bool IsActivityInWindow(uint32_t active_day_index, uint32_t obs_day_index,
uint32_t window_size) {
return (active_day_index <= obs_day_index &&
active_day_index > obs_day_index - window_size);
}
} // namespace
uint32_t EventAggregator::UniqueActivesLastGeneratedDayIndex(
const std::string& report_key, uint32_t event_code,
uint32_t window_size) const {
auto report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
auto event_code_history =
report_history->second.unique_actives_history().by_event_code().find(
event_code);
if (event_code_history ==
report_history->second.unique_actives_history().by_event_code().end()) {
return 0u;
}
auto window_size_history =
event_code_history->second.by_window_size().find(window_size);
if (window_size_history ==
event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_size_history->second;
}
Status EventAggregator::GenerateSingleUniqueActivesObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index, uint32_t event_code, uint32_t window_size,
bool was_active) const {
auto encoder_result = encoder_->EncodeUniqueActivesObservation(
metric_ref, report, obs_day_index, event_code, was_active, window_size);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode UniqueActivesObservation";
return kOther;
}
auto writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status EventAggregator::GenerateUniqueActivesObservations(
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t num_event_codes,
uint32_t final_day_index) {
CHECK_GT(final_day_index, backfill_days_);
// The earliest day index for which we might need to generate an
// Observation.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
auto daily_aggregates =
report_aggregates.unique_actives_aggregates().by_event_code().find(
event_code);
// Have any events ever been logged for this report and event code?
bool found_event_code =
(daily_aggregates !=
report_aggregates.unique_actives_aggregates().by_event_code().end());
for (uint32_t window_size :
report_aggregates.aggregation_config().report().window_size()) {
// Skip any window size larger than
// kMaxAllowedAggregationWindowSize.
if (window_size > kMaxAllowedAggregationWindowSize) {
LOG(WARNING) << "GenerateUniqueActivesObservations ignoring a window "
"size exceeding the maximum allowed value";
continue;
}
// Find the earliest day index for which an Observation has not yet
// been generated for this report, event code, and window size. If
// that day index is later than |final_day_index|, no Observation is
// generated on this invocation.
auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
window_size);
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
// The latest day index on which |event_type| is known to have
// occurred, so far. This value will be updated as we search
// forward from the earliest day index belonging to a window of
// interest.
uint32_t active_day_index = 0u;
// Iterate over the day indices |obs_day_index| for which we need
// to generate Observations. On each iteration, generate an
// Observation for the window of size |window_size| ending on
// |obs_day_index|.
for (uint32_t obs_day_index = first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
bool was_active = false;
if (found_event_code) {
// If the current value of |active_day_index| falls within the
// window, generate an Observation of activity. If not, search
// forward in the window, update |active_day_index|, and generate an
// Observation of activity or inactivity depending on the result of
// the search.
if (IsActivityInWindow(active_day_index, obs_day_index,
window_size)) {
was_active = true;
} else {
active_day_index = FirstActiveDayIndexInWindow(
daily_aggregates->second, obs_day_index, window_size);
was_active = IsActivityInWindow(active_day_index, obs_day_index,
window_size);
}
}
auto status = GenerateSingleUniqueActivesObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index, event_code, window_size, was_active);
if (status != kOK) {
return status;
}
// Update |obs_history_| with the latest date of Observation
// generation for this report, event code, and window size.
(*(*(*obs_history_.mutable_by_report_key())[report_key]
.mutable_unique_actives_history()
->mutable_by_event_code())[event_code]
.mutable_by_window_size())[window_size] = obs_day_index;
}
}
}
return kOK;
}
////////// GeneratePerDeviceNumericObservations and helper methods /////////////
uint32_t EventAggregator::PerDeviceNumericLastGeneratedDayIndex(
const std::string& report_key, const std::string& component,
uint32_t event_code, uint32_t window_size) const {
const auto& report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
if (!report_history->second.has_per_device_numeric_history()) {
return 0u;
}
const auto& component_history =
report_history->second.per_device_numeric_history().by_component().find(
component);
if (component_history == report_history->second.per_device_numeric_history()
.by_component()
.end()) {
return 0u;
}
const auto& event_code_history =
component_history->second.by_event_code().find(event_code);
if (event_code_history == component_history->second.by_event_code().end()) {
return 0u;
}
const auto& window_size_history =
event_code_history->second.by_window_size().find(window_size);
if (window_size_history ==
event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_size_history->second;
}
uint32_t EventAggregator::ReportParticipationLastGeneratedDayIndex(
const std::string& report_key) const {
const auto& report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
return report_history->second.report_participation_history().last_generated();
}
Status EventAggregator::GenerateSinglePerDeviceNumericObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index, const std::string& component, uint32_t event_code,
uint32_t window_size, int64_t value) const {
auto encoder_result = encoder_->EncodePerDeviceNumericObservation(
metric_ref, report, obs_day_index, component,
UnpackEventCodesProto(event_code), value, window_size);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode PerDeviceNumericObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status EventAggregator::GenerateSingleReportParticipationObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index) const {
auto encoder_result = encoder_->EncodeReportParticipationObservation(
metric_ref, report, obs_day_index);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode ReportParticipationObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status EventAggregator::GeneratePerDeviceNumericObservations(
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t final_day_index) {
CHECK_GT(final_day_index, backfill_days_);
// The first day index for which we might have to generate an Observation.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
// Generate any necessary PerDeviceNumericObservations for this report.
for (const auto& component_pair :
report_aggregates.numeric_aggregates().by_component()) {
const auto& component = component_pair.first;
for (const auto& event_code_pair : component_pair.second.by_event_code()) {
auto event_code = event_code_pair.first;
const auto& event_code_aggregates = event_code_pair.second;
// Populate a helper map keyed by day indices which belong to the range
// [|backfill_period_start|, |final_day_index|]. The value at a day
// index is the list of window sizes, in increasing order, for which an
// Observation should be generated for that day index.
std::map<uint32_t, std::vector<uint32_t>> window_sizes_by_obs_day;
for (auto window_size :
report_aggregates.aggregation_config().window_size()) {
auto last_gen = PerDeviceNumericLastGeneratedDayIndex(
report_key, component, event_code, window_size);
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
for (auto obs_day_index = first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
window_sizes_by_obs_day[obs_day_index].push_back(window_size);
}
}
// Iterate over the day indices |obs_day_index| for which we might need
// to generate an Observation. For each day index, generate an
// Observation for each needed window size.
//
// More precisely, for each needed window size, iterate over the days
// within that window. If at least one numeric event was logged during the
// window, compute the aggregate of the numeric values over the window and
// generate a PerDeviceNumericObservation. Whether or not a numeric event
// was found, update the AggregatedObservationHistory for this report,
// component, event code, and window size with |obs_day_index| as the most
// recent date of Observation generation. This reflects the fact that all
// needed Observations were generated for the window ending on that date.
for (auto obs_day_index = backfill_period_start;
obs_day_index <= final_day_index; obs_day_index++) {
const auto& window_sizes = window_sizes_by_obs_day.find(obs_day_index);
if (window_sizes == window_sizes_by_obs_day.end()) {
continue;
}
bool found_value_for_window = false;
int64_t window_aggregate = 0;
uint32_t num_days = 0;
for (auto window_size : window_sizes->second) {
while (num_days < window_size) {
bool found_value_for_day = false;
const auto& day_aggregates =
event_code_aggregates.by_day_index().find(obs_day_index -
num_days);
if (day_aggregates != event_code_aggregates.by_day_index().end()) {
found_value_for_day = true;
}
const auto& aggregation_type =
report_aggregates.aggregation_config()
.report()
.aggregation_type();
switch (aggregation_type) {
case ReportDefinition::SUM:
if (found_value_for_day) {
window_aggregate +=
day_aggregates->second.numeric_daily_aggregate().value();
found_value_for_window = true;
}
break;
case ReportDefinition::MAX:
if (found_value_for_day) {
window_aggregate = std::max(
window_aggregate,
day_aggregates->second.numeric_daily_aggregate().value());
found_value_for_window = true;
}
break;
case ReportDefinition::MIN:
if (found_value_for_day && !found_value_for_window) {
window_aggregate =
day_aggregates->second.numeric_daily_aggregate().value();
found_value_for_window = true;
} else if (found_value_for_day) {
window_aggregate = std::min(
window_aggregate,
day_aggregates->second.numeric_daily_aggregate().value());
}
break;
default:
LOG(ERROR) << "Unexpected aggregation type "
<< aggregation_type;
return kInvalidArguments;
}
num_days++;
}
if (found_value_for_window) {
auto status = GenerateSinglePerDeviceNumericObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index, component, event_code, window_size,
window_aggregate);
if (status != kOK) {
return status;
}
}
// Update |obs_history_| with the latest date of Observation
// generation for this report, component, event code, and window
// size.
(*(*(*(*obs_history_.mutable_by_report_key())[report_key]
.mutable_per_device_numeric_history()
->mutable_by_component())[component]
.mutable_by_event_code())[event_code]
.mutable_by_window_size())[window_size] = obs_day_index;
}
}
}
}
// Generate any necessary ReportParticipationObservations for this report.
auto participation_last_gen =
ReportParticipationLastGeneratedDayIndex(report_key);
auto participation_first_day_index =
std::max(participation_last_gen + 1, backfill_period_start);
for (auto obs_day_index = participation_first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
GenerateSingleReportParticipationObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index);
(*obs_history_.mutable_by_report_key())[report_key]
.mutable_report_participation_history()
->set_last_generated(obs_day_index);
}
return kOK;
}
} // namespace logger
} // namespace cobalt