blob: 31c37d4effd9c396364fc77b0b20cbe6b094104f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include <memory>
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/event_vector_index.h"
#include "src/logging.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> AggregationProcedure::Get(
const MetricDefinition &metric, const ReportDefinition &report) {
VLOG(5) << "Getting aggregation procedure for report type " << report.report_type();
switch (report.report_type()) {
case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
return {std::make_unique<CountAggregationProcedure>(metric, report)};
case ReportDefinition::UNIQUE_DEVICE_COUNTS:
switch (report.local_aggregation_procedure()) {
case ReportDefinition::AT_LEAST_ONCE:
return {std::make_unique<AtLeastOnceAggregationProcedure>(metric, report)};
case ReportDefinition::SELECT_FIRST:
return {std::make_unique<SelectFirstAggregationProcedure>(metric, report)};
case ReportDefinition::SELECT_MOST_COMMON:
return {std::make_unique<SelectMostCommonAggregationProcedure>(metric, report)};
default:
return util::StatusBuilder(
StatusCode::FAILED_PRECONDITION,
"Report of type UNIQUE_DEVICE_COUNTS does not support selected aggregation "
"procedure: ")
.AppendMsg(report.local_aggregation_procedure())
.WithContext("Customer", metric.customer_name())
.WithContext("Project", metric.project_name())
.WithContexts(metric, report)
.LogError()
.Build();
}
break;
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
switch (metric.metric_type()) {
case MetricDefinition::OCCURRENCE:
return {std::make_unique<CountAggregationProcedure>(metric, report)};
case MetricDefinition::INTEGER:
return {NumericStatAggregationProcedure::New(metric, report)};
default:
return util::StatusBuilder(
StatusCode::FAILED_PRECONDITION,
"Report of type UNIQUE_DEVICE_HISTOGRAMS is not valid for metric of type: ")
.AppendMsg(metric.metric_type())
.WithContext("Customer", metric.customer_name())
.WithContext("Project", metric.project_name())
.WithContexts(metric, report)
.LogError()
.Build();
}
break;
case ReportDefinition::FLEETWIDE_HISTOGRAMS:
return {std::make_unique<IntegerHistogramAggregationProcedure>(metric, report)};
case ReportDefinition::FLEETWIDE_MEANS:
return {std::make_unique<SumAndCountAggregationProcedure>(metric, report)};
case ReportDefinition::STRING_COUNTS:
return {std::make_unique<StringHistogramAggregationProcedure>(metric, report)};
default:
// This is a non cobalt 1.1 report type, should be silently ignored.
return {nullptr};
}
}
AggregationProcedure::AggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: metric_type_(metric.metric_type()),
report_type_(report.report_type()),
system_profile_selection_policy_(report.system_profile_selection()) {
if (report.event_vector_buffer_max() > 0) {
event_vector_buffer_max_ = report.event_vector_buffer_max();
} else {
event_vector_buffer_max_ = logger::GetNumEventVectors(metric.metric_dimensions());
}
// TODO(fxbug.dev/53691): Switch when local_aggregation_period has been
// switched to OnDeviceAggregationWindow.
window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period()));
}
AggregationPeriodBucket *AggregationProcedure::GetAggregationPeriodBucket(
uint32_t time, ReportAggregate *aggregate) const {
if (IsDaily()) {
return &(*aggregate->mutable_daily()->mutable_by_day_index())[time];
}
return &(*aggregate->mutable_hourly()->mutable_by_hour_id())[time];
}
bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
return type == Event::TypeCase::kOccurrenceEvent;
case MetricDefinition::INTEGER:
return type == Event::TypeCase::kIntegerEvent;
case MetricDefinition::INTEGER_HISTOGRAM:
return type == Event::TypeCase::kIntegerHistogramEvent;
case MetricDefinition::STRING:
return type == Event::TypeCase::kStringEvent;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a cobalt 1.1 metric";
return false;
}
}
AggregateData *AggregationProcedure::GetAggregateData(
const logger::EventRecord &event_record, AggregationPeriodBucket *bucket,
uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const {
google::protobuf::RepeatedField<uint32_t> event_codes;
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
event_codes = event_record.event()->occurrence_event().event_code();
break;
case MetricDefinition::INTEGER:
event_codes = event_record.event()->integer_event().event_code();
break;
case MetricDefinition::INTEGER_HISTOGRAM:
event_codes = event_record.event()->integer_histogram_event().event_code();
break;
case MetricDefinition::STRING:
event_codes = event_record.event()->string_event().event_code();
break;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a Cobalt 1.1 metric";
return nullptr;
}
int64_t system_timestamp = util::ToUnixSeconds(system_time);
if (system_profile_selection_policy_ == SELECT_FIRST ||
system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) {
SystemProfileAggregate *system_profile_aggregate;
if (bucket->system_profile_aggregates_size() >= 1) {
if (bucket->system_profile_aggregates_size() > 1) {
LOG(ERROR) << "There are " << bucket->system_profile_aggregates_size()
<< " system profile aggregates for a report with system_profile_selection "
<< system_profile_selection_policy_ << ". There should be only one.";
// Use the first aggregate anyway.
}
system_profile_aggregate = bucket->mutable_system_profile_aggregates(0);
if ((system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) &&
system_profile_aggregate->system_profile_hash() != system_profile_hash) {
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
}
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
} else {
system_profile_aggregate = bucket->add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
}
for (EventCodesAggregateData &aggregate_data :
*system_profile_aggregate->mutable_by_event_code()) {
// Find the event codes that match the event's.
if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
event_codes.begin(), event_codes.end())) {
return aggregate_data.mutable_data();
}
}
// Event codes were not found, so add them as a new entry.
if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
return aggregate_data->mutable_data();
}
}
// TODO(fxbug.dev/85440): add support for finding or adding the system profile for REPORT_ALL.
return nullptr;
}
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
ReportAggregate *aggregate, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) {
if (!IsValidEventType(event_record.event()->type_case())) {
LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
<< " with aggregation procedure of type " << DebugString() << ".";
return;
}
AggregationPeriodBucket *bucket;
if (IsDaily()) {
bucket = GetAggregationPeriodBucket(event_record.event()->day_index(), aggregate);
} else {
bucket = GetAggregationPeriodBucket(event_record.event()->hour_id(), aggregate);
}
if (AggregateData *aggregate_data =
GetAggregateData(event_record, bucket, system_profile_hash, system_time);
aggregate_data != nullptr) {
UpdateAggregateData(event_record, aggregate_data, bucket);
}
}
util::TimeInfo AggregationProcedure::GetStartTimeInfo(
const util::TimeInfo &current_time_info) const {
if (IsDaily()) {
// For daily aggregates, use the current day's day index, for multi-day aggregates (e.g. 7-day)
// the start is the oldest day index in the aggregation window.
uint32_t start_day_index =
current_time_info.day_index - static_cast<uint32_t>(window_.days()) + 1;
return util::TimeInfo::FromDayIndex(start_day_index);
}
// For hourly, always use the current time info's hour_id.
return current_time_info;
}
std::map<uint64_t, std::vector<AggregateDataToGenerate>>
AggregationProcedure::GetAggregateDataToGenerate(const util::TimeInfo &time_info,
ReportAggregate *aggregate) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate;
util::TimeInfo start_time_info = GetStartTimeInfo(time_info);
if (IsDaily()) {
// TODO(fxbug.dev/85440): add multiple system profiles to data_to_generate for REPORT_ALL.
uint64_t system_profile_hash;
uint32_t last_seen_timestamp = 0;
uint32_t first_seen_timestamp = UINT32_MAX;
std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
if (!aggregate->daily().by_day_index().contains(i)) {
continue;
}
AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
// For SELECT_FIRST and SELECT_LAST there should only be one SystemProfileAggregate, but
// for multi-day reports the correct system profile to use has to be determined from the
// multiple AggregationPeriodBuckets for the days in the aggregation window.
if ((system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) &&
system_profile_aggregate.last_seen_timestamp() >= last_seen_timestamp) {
system_profile_hash = system_profile_aggregate.system_profile_hash();
last_seen_timestamp = system_profile_aggregate.last_seen_timestamp();
} else if (system_profile_selection_policy_ == SELECT_FIRST &&
system_profile_aggregate.first_seen_timestamp() < first_seen_timestamp) {
system_profile_hash = system_profile_aggregate.system_profile_hash();
first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
}
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
agg_to_generate.aggregate_data.push_back(&data);
}
}
if (!agg_to_generate.aggregate_data.empty()) {
data_to_generate_for_system_profile.emplace_back(std::move(agg_to_generate));
}
}
if (!data_to_generate_for_system_profile.empty()) {
data_to_generate[system_profile_hash] = std::move(data_to_generate_for_system_profile);
}
} else {
if (aggregate->hourly().by_hour_id().contains(start_time_info.hour_id)) {
AggregationPeriodBucket *agg =
&(*aggregate->mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
AggregateDataToGenerate agg_to_generate{.string_hashes = agg->string_hashes()};
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
agg_to_generate.aggregate_data.push_back(&data);
}
data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
std::move(agg_to_generate));
}
}
}
return data_to_generate;
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>>
AggregationProcedure::GenerateObservations(const util::TimeInfo &time_info,
ReportAggregate *aggregate) {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(time_info, aggregate);
std::vector<ObservationAndSystemProfile> observations;
for (const auto &[system_profile_hash, aggregates] : data_to_generate) {
CB_ASSIGN_OR_RETURN(std::unique_ptr<Observation> observation,
GenerateSingleObservation(
aggregates, SelectEventVectorsForObservation(aggregates), time_info));
observations.push_back(
{.system_profile_hash = system_profile_hash, .observation = std::move(observation)});
}
return observations;
}
void AggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
util::TimeInfo time_info,
uint64_t system_profile_hash) const {
util::TimeInfo delete_before = GetStartTimeInfo(time_info);
// Clean up aggregates that will never be used again.
if (IsDaily()) {
std::vector<uint32_t> days_to_delete;
for (const auto &day : aggregate->daily().by_day_index()) {
if (day.first <= delete_before.day_index) {
days_to_delete.push_back(day.first);
}
}
for (auto day : days_to_delete) {
aggregate->mutable_daily()->mutable_by_day_index()->erase(day);
}
} else {
std::vector<uint32_t> hours_to_delete;
for (const auto &hour : aggregate->hourly().by_hour_id()) {
if (hour.first <= delete_before.hour_id) {
hours_to_delete.push_back(hour.first);
}
}
for (auto hour : hours_to_delete) {
aggregate->mutable_hourly()->mutable_by_hour_id()->erase(hour);
}
}
if (IsDaily()) {
aggregate->mutable_daily()->set_last_day_index(time_info.day_index);
} else {
aggregate->mutable_hourly()->set_last_hour_id(time_info.hour_id);
}
}
std::set<std::vector<uint32_t>> AggregationProcedure::SelectEventVectorsForObservation(
const std::vector<AggregateDataToGenerate> &buckets) const {
std::set<std::vector<uint32_t>> event_vectors;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
if (event_vectors.size() == event_vector_buffer_max_) {
return event_vectors;
}
event_vectors.insert(
{aggregate_data->event_codes().begin(), aggregate_data->event_codes().end()});
}
}
return event_vectors;
}
} // namespace cobalt::local_aggregation