blob: 4baa1465c9c36c57ba2e46a35697a4713ef63bb4 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "logger/event_aggregator.h"
#include <algorithm>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "algorithms/rappor/rappor_config_helper.h"
#include "config/metric_definition.pb.h"
#include "logger/project_context.h"
#include "util/datetime_util.h"
#include "util/proto_util.h"
#include "util/status.h"
namespace cobalt {
using google::protobuf::RepeatedField;
using rappor::RapporConfigHelper;
using util::ConsistentProtoStore;
using util::SerializeToBase64;
using util::StatusCode;
using util::SystemClock;
using util::TimeToDayIndex;
namespace logger {
namespace {
// Creates an AggregationConfig from a ProjectContext, MetricDefinition, and
// ReportDefinition and populates the aggregation_config field of a specified
// ReportAggregates. Also sets the type of the ReportAggregates based on the
// ReportDefinition's type.
bool PopulateReportAggregates(const ProjectContext& project_context,
const MetricDefinition& metric,
const ReportDefinition& report,
ReportAggregates* report_aggregates) {
AggregationConfig* aggregation_config =
report_aggregates->mutable_aggregation_config();
*aggregation_config->mutable_project() = project_context.project();
*aggregation_config->mutable_metric() =
*project_context.GetMetric(metric.id());
*aggregation_config->mutable_report() = report;
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
report_aggregates->set_allocated_unique_actives_aggregates(
new UniqueActivesReportAggregates);
return true;
}
case ReportDefinition::PER_DEVICE_COUNT_STATS: {
report_aggregates->set_allocated_count_aggregates(
new PerDeviceCountReportAggregates);
return true;
}
default:
return false;
}
}
// Populates a ReportAggregationKey proto message and then populates a string
// with the base64 encoding of the serialized proto.
bool PopulateReportKey(uint32_t customer_id, uint32_t project_id,
uint32_t metric_id, uint32_t report_id,
std::string* key) {
ReportAggregationKey key_data;
key_data.set_customer_id(customer_id);
key_data.set_project_id(project_id);
key_data.set_metric_id(metric_id);
key_data.set_report_id(report_id);
return SerializeToBase64(key_data, key);
}
// Given a ProjectContext, MetricDefinition, and ReportDefinition and a pointer
// to the LocalAggregateStore, checks whether a key with the same customer,
// project, metric, and report ID already exists in the LocalAggregateStore. If
// not, creates and inserts a new key and value. Returns kInvalidArguments if
// creation of the key or value fails, and kOK otherwise. The caller should hold
// the mutex protecting the LocalAggregateStore.
Status MaybeInsertReportConfig(const ProjectContext& project_context,
const MetricDefinition& metric,
const ReportDefinition& report,
LocalAggregateStore* store) {
std::string key;
if (!PopulateReportKey(project_context.project().customer_id(),
project_context.project().project_id(), metric.id(),
report.id(), &key)) {
return kInvalidArguments;
}
ReportAggregates report_aggregates;
if (store->by_report_key().count(key) == 0) {
if (!PopulateReportAggregates(project_context, metric, report,
&report_aggregates)) {
return kInvalidArguments;
}
(*store->mutable_by_report_key())[key] = report_aggregates;
}
return kOK;
}
// PackEventCodes converts a list of event_codes into an int64_t, for putting
// into an Observation.
//
// |event_codes| The list of event_codes to be packed into the int64_t. Each
// value gets a separate section of 10 bits, and as such this must
// be no longer than 5.
uint64_t PackEventCodes(const RepeatedField<uint32_t>& event_codes) {
CHECK_LE(event_codes.size(), 5);
uint64_t event_code = 0;
for (int i = 0; i < event_codes.size(); i++) {
event_code <<= 10;
event_code |= event_codes.Get(i);
}
return event_code;
}
RepeatedField<uint32_t> UnpackEventCodes(uint64_t event_code) {
std::vector<uint32_t> event_codes;
while (event_code != 0) {
event_codes.insert(event_codes.begin(), event_code & 0x3FF);
event_code >>= 10;
}
RepeatedField<uint32_t> fields;
for (auto code : event_codes) {
*fields.Add() = code;
}
return fields;
}
} // namespace
EventAggregator::EventAggregator(
const Encoder* encoder, const ObservationWriter* observation_writer,
ConsistentProtoStore* local_aggregate_proto_store,
ConsistentProtoStore* obs_history_proto_store, const size_t backfill_days,
const std::chrono::seconds aggregate_backup_interval,
const std::chrono::seconds generate_obs_interval,
const std::chrono::seconds gc_interval)
: encoder_(encoder),
observation_writer_(observation_writer),
local_aggregate_proto_store_(local_aggregate_proto_store),
obs_history_proto_store_(obs_history_proto_store) {
CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count())
<< "aggregate_backup_interval must be less than or equal to "
"generate_obs_interval";
CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
<< "aggregate_backup_interval must be less than or equal to gc_interval";
CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
<< "backfill_days must be less than or equal to "
<< kMaxAllowedBackfillDays;
aggregate_backup_interval_ = aggregate_backup_interval;
generate_obs_interval_ = generate_obs_interval;
gc_interval_ = gc_interval;
backfill_days_ = backfill_days;
auto locked = protected_aggregate_store_.lock();
auto restore_aggregates_status =
local_aggregate_proto_store_->Read(&(locked->local_aggregate_store));
switch (restore_aggregates_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read LocalAggregateStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
"with empty LocalAggregateStore. File will be created on "
"first snapshot of the LocalAggregateStore.";
break;
}
default: {
LOG(ERROR)
<< "Read to local_aggregate_proto_store failed with status code: "
<< restore_aggregates_status.error_code()
<< "\nError message: " << restore_aggregates_status.error_message()
<< "\nError details: " << restore_aggregates_status.error_details()
<< "\nProceeding with empty LocalAggregateStore.";
locked->local_aggregate_store = LocalAggregateStore();
}
}
auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
switch (restore_history_status.error_code()) {
case StatusCode::OK: {
VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
break;
}
case StatusCode::NOT_FOUND: {
VLOG(4) << "No file found for obs_history_proto_store. Proceeding "
"with empty AggregatedObservationHistoryStore. File will be "
"created on first snapshot of the "
"AggregatedObservationHistoryStore.";
break;
}
default: {
LOG(ERROR) << "Read to obs_history_proto_store failed with status code: "
<< restore_history_status.error_code() << "\nError message: "
<< restore_history_status.error_message()
<< "\nError details: "
<< restore_history_status.error_details()
<< "\nProceeding with empty AggregatedObservationHistory.";
obs_history_ = AggregatedObservationHistoryStore();
}
}
clock_.reset(new SystemClock());
}
void EventAggregator::Start() {
auto locked = protected_shutdown_flag_.lock();
locked->shut_down = false;
std::thread t([this] { this->Run(); });
worker_thread_ = std::move(t);
}
// TODO(pesk): Have the config parser verify that each locally
// aggregated report has at least one window size and that all window
// sizes are <= |kMaxAllowedAggregationWindowSize|. Additionally, have
// this method filter out any window sizes larger than
// |kMaxAllowedAggregationWindowSize|.
//
// TODO(pesk): update the EventAggregator's view of a Metric
// or ReportDefinition when appropriate.
Status EventAggregator::UpdateAggregationConfigs(
const ProjectContext& project_context) {
auto locked = protected_aggregate_store_.lock();
Status status;
for (const auto& metric : project_context.metrics()) {
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
status =
MaybeInsertReportConfig(project_context, metric, report,
&(locked->local_aggregate_store));
if (status != kOK) {
return status;
}
}
default:
continue;
}
}
}
case MetricDefinition::EVENT_COUNT: {
for (const auto& report : metric.reports()) {
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_COUNT_STATS: {
status =
MaybeInsertReportConfig(project_context, metric, report,
&(locked->local_aggregate_store));
if (status != kOK) {
return status;
}
}
default:
continue;
}
}
}
default:
continue;
}
}
return kOK;
}
Status EventAggregator::LogUniqueActivesEvent(uint32_t report_id,
EventRecord* event_record) {
if (!event_record->event->has_occurrence_event()) {
LOG(ERROR) << "EventAggregator::LogUniqueActivesEvent can only "
"accept OccurrenceEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates =
locked->local_aggregate_store.mutable_by_report_key()->find(key);
if (aggregates ==
locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_unique_actives_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of type "
"UniqueActivesReportAggregates.";
return kInvalidArguments;
}
(*(*aggregates->second.mutable_unique_actives_aggregates()
->mutable_by_event_code())[event_record->event->occurrence_event()
.event_code()]
.mutable_by_day_index())[event_record->event->day_index()]
.mutable_activity_daily_aggregate()
->set_activity_indicator(true);
return kOK;
}
Status EventAggregator::LogPerDeviceCountEvent(uint32_t report_id,
EventRecord* event_record) {
if (!event_record->event->has_count_event()) {
LOG(ERROR) << "EventAggregator::LogPerDeviceCountEvent can only accept "
"CountEvents.";
return kInvalidArguments;
}
std::string key;
if (!PopulateReportKey(event_record->metric->customer_id(),
event_record->metric->project_id(),
event_record->metric->id(), report_id, &key)) {
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
auto aggregates =
locked->local_aggregate_store.mutable_by_report_key()->find(key);
if (aggregates ==
locked->local_aggregate_store.mutable_by_report_key()->end()) {
LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
return kInvalidArguments;
}
if (!aggregates->second.has_count_aggregates()) {
LOG(ERROR) << "The local aggregates for this report key are not of type "
"PerDeviceCountReportAggregates.";
return kInvalidArguments;
}
auto daily_aggregate =
(*(*(*aggregates->second.mutable_count_aggregates()
->mutable_by_component())[event_record->event->count_event()
.component()]
.mutable_by_event_code())
[PackEventCodes(event_record->event->count_event().event_code())]
.mutable_by_day_index())[event_record->event->day_index()]
.mutable_count_daily_aggregate();
daily_aggregate->set_count(daily_aggregate->count() +
event_record->event->count_event().count());
return kOK;
}
Status EventAggregator::GenerateObservationsNoWorker(
uint32_t final_day_index_utc, uint32_t final_day_index_local) {
if (worker_thread_.joinable()) {
LOG(ERROR) << "GenerateObservationsNoWorker() was called while "
"worker thread was running.";
return kOther;
}
return GenerateObservations(final_day_index_utc, final_day_index_local);
}
Status EventAggregator::BackUpLocalAggregateStore() {
// Lock, copy the LocalAggregateStore, and release the lock. Write the copy
// to |local_aggregate_proto_store_|.
auto local_aggregate_store = CopyLocalAggregateStore();
auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: "
<< status.error_code()
<< "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
Status EventAggregator::BackUpObservationHistory() {
auto status = obs_history_proto_store_->Write(obs_history_);
if (!status.ok()) {
LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
"::cobalt::util::Status error code: "
<< status.error_code()
<< "\nError message: " << status.error_message()
<< "\nError details: " << status.error_details();
return kOther;
}
return kOK;
}
void EventAggregator::ShutDown() {
if (worker_thread_.joinable()) {
{
auto locked = protected_shutdown_flag_.lock();
locked->shut_down = true;
locked->shutdown_notifier.notify_all();
}
worker_thread_.join();
} else {
protected_shutdown_flag_.lock()->shut_down = true;
}
}
void EventAggregator::Run() {
auto current_time = clock_->now();
// Schedule Observation generation to happen in the first cycle.
next_generate_obs_ = current_time;
// Schedule garbage collection to happen |gc_interval_| seconds from now.
next_gc_ = current_time + gc_interval_;
// Acquire the mutex protecting the shutdown flag and condition variable.
auto locked = protected_shutdown_flag_.lock();
while (true) {
// If shutdown has been requested, back up the LocalAggregateStore and
// exit.
if (locked->shut_down) {
BackUpLocalAggregateStore();
return;
}
// Sleep until the next scheduled backup of the LocalAggregateStore or
// until notified of shutdown. Back up the LocalAggregateStore after
// waking.
auto shutdown_requested = locked.wait_for_with(
&(locked->shutdown_notifier), aggregate_backup_interval_,
[&locked]() { return locked->shut_down; });
BackUpLocalAggregateStore();
// If the worker thread was woken up by a shutdown request, exit.
// Otherwise, complete any scheduled Observation generation and garbage
// collection.
if (shutdown_requested) {
return;
}
// Check whether it is time to generate Observations or to garbage-collect
// the LocalAggregate store. If so, do that task and schedule the next
// occurrence.
DoScheduledTasks(clock_->now());
}
}
void EventAggregator::DoScheduledTasks(
std::chrono::system_clock::time_point current_time) {
auto current_time_t = std::chrono::system_clock::to_time_t(current_time);
auto current_day_index_utc =
TimeToDayIndex(current_time_t, MetricDefinition::UTC);
auto current_day_index_local =
TimeToDayIndex(current_time_t, MetricDefinition::LOCAL);
if (current_time >= next_generate_obs_) {
auto obs_status = GenerateObservations(current_day_index_utc - 1,
current_day_index_local - 1);
if (obs_status == kOK) {
BackUpObservationHistory();
} else {
LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
}
next_generate_obs_ += generate_obs_interval_;
}
if (current_time >= next_gc_) {
auto gc_status =
GarbageCollect(current_day_index_utc - 1, current_day_index_local - 1);
if (gc_status == kOK) {
BackUpLocalAggregateStore();
} else {
LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
}
next_gc_ += gc_interval_;
}
}
Status EventAggregator::GenerateObservations(uint32_t final_day_index_utc,
uint32_t final_day_index_local) {
if (final_day_index_local == 0u) {
final_day_index_local = final_day_index_utc;
}
if (std::min(final_day_index_utc, final_day_index_local) <
backfill_days_ + kMaxAllowedAggregationWindowSize) {
LOG(ERROR) << "GenerateObservations: Day index of Observation must be >= "
"backfill_days_ + kMaxAllowedAggregationWindowSize.";
return kInvalidArguments;
}
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
for (auto pair : local_aggregate_store.by_report_key()) {
const auto& config = pair.second.aggregation_config();
const auto& metric = config.metric();
auto metric_ref = MetricRef(&config.project(), &metric);
uint32_t final_day_index;
switch (metric.time_zone_policy()) {
case MetricDefinition::UTC: {
final_day_index = final_day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
final_day_index = final_day_index_local;
break;
}
default:
LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
return kInvalidConfig;
}
const auto& report = config.report();
auto max_window_size = 0u;
for (uint32_t window_size : report.window_size()) {
if (window_size > kMaxAllowedAggregationWindowSize) {
LOG(WARNING) << "Window size exceeding "
"kMaxAllowedAggregationWindowSize will be "
"ignored by GenerateObservations";
} else if (window_size > max_window_size) {
max_window_size = window_size;
}
}
if (max_window_size == 0) {
LOG(ERROR) << "Each locally aggregated report must specify a positive "
"window size.";
return kInvalidConfig;
}
if (final_day_index < max_window_size) {
LOG(ERROR) << "final_day_index must be >= max_window_size.";
return kInvalidArguments;
}
switch (metric.metric_type()) {
case MetricDefinition::EVENT_OCCURRED: {
auto num_event_codes =
RapporConfigHelper::BasicRapporNumCategories(metric);
switch (report.report_type()) {
case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
auto status = GenerateUniqueActivesObservations(
metric_ref, pair.first, pair.second, num_event_codes,
final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
case MetricDefinition::EVENT_COUNT: {
switch (report.report_type()) {
case ReportDefinition::PER_DEVICE_COUNT_STATS: {
auto status = GeneratePerDeviceCountObservations(
metric_ref, pair.first, pair.second, final_day_index);
if (status != kOK) {
return status;
}
break;
}
default:
continue;
}
break;
}
default:
continue;
}
}
return kOK;
}
Status EventAggregator::GarbageCollect(uint32_t day_index_utc,
uint32_t day_index_local) {
if (day_index_local == 0u) {
day_index_local = day_index_utc;
}
if (std::min(day_index_utc, day_index_local) <
backfill_days_ + kMaxAllowedAggregationWindowSize) {
LOG(ERROR) << "GarbageCollect: Day index must be >= backfill_days_ + "
"kMaxAllowedAggregationWindowSize.";
return kInvalidArguments;
}
auto locked = protected_aggregate_store_.lock();
for (auto pair : locked->local_aggregate_store.by_report_key()) {
uint32_t day_index;
switch (pair.second.aggregation_config().metric().time_zone_policy()) {
case MetricDefinition::UTC: {
day_index = day_index_utc;
break;
}
case MetricDefinition::LOCAL: {
day_index = day_index_local;
break;
}
default:
LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid.";
return kInvalidConfig;
}
// Determine the largest window size in the report associated to this
// key-value pair.
uint32_t max_window_size = 1;
for (uint32_t window_size :
pair.second.aggregation_config().report().window_size()) {
if (window_size > max_window_size &&
window_size <= kMaxAllowedAggregationWindowSize) {
max_window_size = window_size;
}
}
if (max_window_size == 0) {
LOG(ERROR) << "Each locally aggregated report must specify a positive "
"window size.";
return kInvalidConfig;
}
// For each ReportAggregates, descend to and iterate over the sub-map of
// local aggregates keyed by day index. Keep buckets with day indices
// greater than |day_index| - |backfill_days_| - |max_window_size|, and
// remove all buckets with smaller day indices.
switch (pair.second.type_case()) {
case ReportAggregates::kUniqueActivesAggregates: {
for (auto event_code_aggregates :
pair.second.unique_actives_aggregates().by_event_code()) {
for (auto day_aggregates :
event_code_aggregates.second.by_day_index()) {
if (day_aggregates.first <=
day_index - backfill_days_ - max_window_size) {
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_unique_actives_aggregates()
->mutable_by_event_code()
->at(event_code_aggregates.first)
.mutable_by_day_index()
->erase(day_aggregates.first);
}
}
// If the day index map under this event code is empty, remove the
// event code from the event code-keyed map under this
// ReportAggregationKey.
if (locked->local_aggregate_store.by_report_key()
.at(pair.first)
.unique_actives_aggregates()
.by_event_code()
.at(event_code_aggregates.first)
.by_day_index()
.empty()) {
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_unique_actives_aggregates()
->mutable_by_event_code()
->erase(event_code_aggregates.first);
}
}
break;
}
case ReportAggregates::kCountAggregates: {
for (auto component_aggregates :
pair.second.count_aggregates().by_component()) {
for (auto event_code_aggregates :
component_aggregates.second.by_event_code()) {
for (auto day_aggregates :
event_code_aggregates.second.by_day_index()) {
if (day_aggregates.first <=
day_index - backfill_days_ - max_window_size) {
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_count_aggregates()
->mutable_by_component()
->at(component_aggregates.first)
.mutable_by_event_code()
->at(event_code_aggregates.first)
.mutable_by_day_index()
->erase(day_aggregates.first);
}
}
// If the day index map under this event code is empty, remove the
// event code from the event code-keyed map under this
// ReportAggregationKey.
if (locked->local_aggregate_store.by_report_key()
.at(pair.first)
.count_aggregates()
.by_component()
.at(component_aggregates.first)
.by_event_code()
.at(event_code_aggregates.first)
.by_day_index()
.empty()) {
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_count_aggregates()
->mutable_by_component()
->at(component_aggregates.first)
.mutable_by_event_code()
->erase(event_code_aggregates.first);
}
}
// If the event code map under this component string is empty,
// remove the component string from the component-keyed map under
// this ReportAggregationKey.
if (locked->local_aggregate_store.by_report_key()
.at(pair.first)
.count_aggregates()
.by_component()
.at(component_aggregates.first)
.by_event_code()
.empty()) {
locked->local_aggregate_store.mutable_by_report_key()
->at(pair.first)
.mutable_count_aggregates()
->mutable_by_component()
->erase(component_aggregates.first);
}
}
break;
}
default:
continue;
}
}
return kOK;
}
////////// GenerateUniqueActivesObservations and helper methods ///////////////
// Given the set of daily aggregates for a fixed event code, and the size and
// end date of an aggregation window, returns the first day index within that
// window on which the event code occurred. Returns 0 if the event code did
// not occur within the window.
uint32_t FirstActiveDayIndexInWindow(const DailyAggregates& daily_aggregates,
uint32_t obs_day_index,
uint32_t window_size) {
for (uint32_t day_index = obs_day_index - window_size + 1;
day_index <= obs_day_index; day_index++) {
auto day_aggregate = daily_aggregates.by_day_index().find(day_index);
if (day_aggregate != daily_aggregates.by_day_index().end() &&
day_aggregate->second.activity_daily_aggregate().activity_indicator() ==
true) {
return day_index;
}
}
return 0u;
}
// Given the day index of an event occurrence and the size and end date
// of an aggregation window, returns true if the occurrence falls within
// the window and false if not.
bool IsActivityInWindow(uint32_t active_day_index, uint32_t obs_day_index,
uint32_t window_size) {
return (active_day_index <= obs_day_index &&
active_day_index > obs_day_index - window_size);
}
uint32_t EventAggregator::UniqueActivesLastGeneratedDayIndex(
const std::string& report_key, uint32_t event_code,
uint32_t window_size) const {
auto report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
auto event_code_history =
report_history->second.unique_actives_history().by_event_code().find(
event_code);
if (event_code_history ==
report_history->second.unique_actives_history().by_event_code().end()) {
return 0u;
}
auto window_size_history =
event_code_history->second.by_window_size().find(window_size);
if (window_size_history ==
event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_size_history->second;
}
Status EventAggregator::GenerateSingleUniqueActivesObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index, uint32_t event_code, uint32_t window_size,
bool was_active) const {
auto encoder_result = encoder_->EncodeUniqueActivesObservation(
metric_ref, report, obs_day_index, event_code, was_active, window_size);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode UniqueActivesObservation";
return kOther;
}
auto writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status EventAggregator::GenerateUniqueActivesObservations(
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t num_event_codes,
uint32_t final_day_index) {
// The earliest day index for which we might need to generate an Observation.
// GenerateObservations() has checked that this value is > 0.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
auto daily_aggregates =
report_aggregates.unique_actives_aggregates().by_event_code().find(
event_code);
// Have any events ever been logged for this report and event code?
bool found_event_code =
(daily_aggregates !=
report_aggregates.unique_actives_aggregates().by_event_code().end());
for (uint32_t window_size :
report_aggregates.aggregation_config().report().window_size()) {
// Skip any window size larger than
// kMaxAllowedAggregationWindowSize.
if (window_size > kMaxAllowedAggregationWindowSize) {
LOG(WARNING) << "GenerateUniqueActivesObservations ignoring a window "
"size exceeding the maximum allowed value";
continue;
}
// Find the earliest day index for which an Observation has not yet
// been generated for this report, event code, and window size. If
// that day index is later than |final_day_index|, no Observation is
// generated on this invocation.
auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
window_size);
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
// The latest day index on which |event_type| is known to have
// occurred, so far. This value will be updated as we search
// forward from the earliest day index belonging to a window of
// interest.
uint32_t active_day_index = 0u;
// Iterate over the day indices |obs_day_index| for which we need
// to generate Observations. On each iteration, generate an
// Observation for the window of size |window_size| ending on
// |obs_day_index|.
for (uint32_t obs_day_index = first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
bool was_active = false;
if (found_event_code) {
// If the current value of |active_day_index| falls within the
// window, generate an Observation of activity. If not, search
// forward in the window, update |active_day_index|, and generate an
// Observation of activity or inactivity depending on the result of
// the search.
if (IsActivityInWindow(active_day_index, obs_day_index,
window_size)) {
was_active = true;
} else {
active_day_index = FirstActiveDayIndexInWindow(
daily_aggregates->second, obs_day_index, window_size);
was_active = IsActivityInWindow(active_day_index, obs_day_index,
window_size);
}
}
auto status = GenerateSingleUniqueActivesObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index, event_code, window_size, was_active);
if (status != kOK) {
return status;
}
// Update |obs_history_| with the latest date of Observation
// generation for this report, event code, and window size.
(*(*(*obs_history_.mutable_by_report_key())[report_key]
.mutable_unique_actives_history()
->mutable_by_event_code())[event_code]
.mutable_by_window_size())[window_size] = obs_day_index;
}
}
}
return kOK;
}
////////// GeneratePerDeviceCountObservations and helper methods /////////////
uint32_t EventAggregator::PerDeviceCountLastGeneratedDayIndex(
const std::string& report_key, const std::string& component,
uint32_t event_code, uint32_t window_size) const {
const auto& report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
if (!report_history->second.has_per_device_count_history()) {
return 0u;
}
const auto& component_history =
report_history->second.per_device_count_history().by_component().find(
component);
if (component_history ==
report_history->second.per_device_count_history().by_component().end()) {
return 0u;
}
const auto& event_code_history =
component_history->second.by_event_code().find(event_code);
if (event_code_history == component_history->second.by_event_code().end()) {
return 0u;
}
const auto& window_size_history =
event_code_history->second.by_window_size().find(window_size);
if (window_size_history ==
event_code_history->second.by_window_size().end()) {
return 0u;
}
return window_size_history->second;
}
Status EventAggregator::GenerateSinglePerDeviceCountObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index, const std::string& component, uint32_t event_code,
uint32_t window_size, int64_t count) const {
auto encoder_result = encoder_->EncodePerDeviceCountObservation(
metric_ref, report, obs_day_index, component,
UnpackEventCodes(event_code), count, window_size);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode PerDeviceCountObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
Status EventAggregator::GeneratePerDeviceCountObservations(
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t final_day_index) {
// Get the window sizes for this report and sort them in increasing order.
// TODO(pesk): Instead, have UpdateAggregationConfigs() store the window sizes
// in increasing order.
std::vector<uint32_t> window_sizes;
for (uint32_t window_size :
report_aggregates.aggregation_config().report().window_size()) {
if (window_size > kMaxAllowedAggregationWindowSize) {
LOG(WARNING) << "GeneratePerDeviceCountObservations ignoring a window "
"size exceeding the maximum allowed value";
continue;
}
window_sizes.push_back(window_size);
}
std::sort(window_sizes.begin(), window_sizes.end());
// The first day index for which we might have to generate an Observation.
// GenerateObservations() has checked that this value is > 0.
auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
// Generate any necessary PerDeviceCountObservations for this report.
for (const auto& component_pair :
report_aggregates.count_aggregates().by_component()) {
const auto& component = component_pair.first;
for (const auto& event_code_pair : component_pair.second.by_event_code()) {
auto event_code = event_code_pair.first;
const auto& event_code_aggregates = event_code_pair.second;
// Populate a helper map keyed by day indices which belong to the range
// [|backfill_period_start|, |final_day_index|]. The value at a day index
// is the list of window sizes, in increasing order, for which an
// Observation should be generated for that day index.
std::map<uint32_t, std::vector<uint32_t>> window_sizes_by_obs_day;
for (auto window_size : window_sizes) {
auto last_gen = PerDeviceCountLastGeneratedDayIndex(
report_key, component, event_code, window_size);
auto first_day_index = std::max(last_gen + 1, backfill_period_start);
for (auto obs_day_index = first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
window_sizes_by_obs_day[obs_day_index].push_back(window_size);
}
}
// Iterate over the day indices |obs_day_index| for which we might need to
// generate an Observation. For each day index, generate an Observation
// for each needed window size.
//
// More precisely, for each needed window size, compute the count over the
// window and generate a PerDeviceCountObservation only if the count is
// nonzero. Whether or not the count was zero, update the
// AggregatedObservationHistory for this report, component, event code,
// and window size with |obs_day_index| as the most recent date of
// Observation generation. This reflects the fact that the count was
// computed for the window ending on that date, even though an Observation
// is only sent if the count is nonzero.
for (auto obs_day_index = backfill_period_start;
obs_day_index <= final_day_index; obs_day_index++) {
const auto& window_sizes = window_sizes_by_obs_day.find(obs_day_index);
if (window_sizes == window_sizes_by_obs_day.end()) {
continue;
}
int64_t count = 0;
uint32_t num_days = 0;
for (auto window_size : window_sizes->second) {
while (num_days < window_size) {
const auto& day_aggregates =
event_code_aggregates.by_day_index().find(obs_day_index -
num_days);
if (day_aggregates != event_code_aggregates.by_day_index().end()) {
count += day_aggregates->second.count_daily_aggregate().count();
}
num_days++;
}
if (count != 0) {
auto status = GenerateSinglePerDeviceCountObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index, component, event_code, window_size, count);
if (status != kOK) {
return status;
}
}
// Update |obs_history_| with the latest date of Observation
// generation for this report, component, event code, and window
// size.
(*(*(*(*obs_history_.mutable_by_report_key())[report_key]
.mutable_per_device_count_history()
->mutable_by_component())[component]
.mutable_by_event_code())[event_code]
.mutable_by_window_size())[window_size] = obs_day_index;
}
}
}
}
// Generate any necessary ReportParticipationObservations for this report.
auto participation_last_gen =
ReportParticipationLastGeneratedDayIndex(report_key);
auto participation_first_day_index =
std::max(participation_last_gen + 1, backfill_period_start);
for (auto obs_day_index = participation_first_day_index;
obs_day_index <= final_day_index; obs_day_index++) {
GenerateSingleReportParticipationObservation(
metric_ref, &report_aggregates.aggregation_config().report(),
obs_day_index);
(*obs_history_.mutable_by_report_key())[report_key]
.mutable_report_participation_history()
->set_last_generated(obs_day_index);
}
return kOK;
}
uint32_t EventAggregator::ReportParticipationLastGeneratedDayIndex(
const std::string& report_key) const {
const auto& report_history = obs_history_.by_report_key().find(report_key);
if (report_history == obs_history_.by_report_key().end()) {
return 0u;
}
return report_history->second.report_participation_history().last_generated();
}
Status EventAggregator::GenerateSingleReportParticipationObservation(
const MetricRef metric_ref, const ReportDefinition* report,
uint32_t obs_day_index) const {
auto encoder_result = encoder_->EncodeReportParticipationObservation(
metric_ref, report, obs_day_index);
if (encoder_result.status != kOK) {
return encoder_result.status;
}
if (encoder_result.observation == nullptr ||
encoder_result.metadata == nullptr) {
LOG(ERROR) << "Failed to encode ReportParticipationObservation";
return kOther;
}
const auto& writer_status = observation_writer_->WriteObservation(
*encoder_result.observation, std::move(encoder_result.metadata));
if (writer_status != kOK) {
return writer_status;
}
return kOK;
}
} // namespace logger
} // namespace cobalt