blob: 01c3fca07ad760110bb9ee827373b5fb4aa120ff [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include <functional>
#include <memory>
#include <optional>
#include "src/lib/util/not_null.h"
#include "src/lib/util/status_builder.h"
#include "src/local_aggregation/aggregation_procedures/at_least_once_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/at_least_once_string_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/count_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/integer_histogram_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/select_first_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/select_most_common_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include "src/local_aggregation/aggregation_procedures/sum_and_count_aggregation_procedure.h"
#include "src/local_aggregation/civil_time_manager.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/event_vector_index.h"
#include "src/logging.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
lib::statusor::StatusOr<util::NotNullUniquePtr<AggregationProcedure>> AggregationProcedure::Get(
const std::string &customer_name, const std::string &project_name,
const MetricDefinition &metric, const ReportDefinition &report) {
VLOG(5) << "Getting aggregation procedure for report type " << report.report_type();
switch (report.report_type()) {
case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
return {util::MakeNotNullUniquePtr<CountAggregationProcedure>(metric, report)};
case ReportDefinition::UNIQUE_DEVICE_COUNTS:
switch (report.local_aggregation_procedure()) {
case ReportDefinition::AT_LEAST_ONCE:
return {util::MakeNotNullUniquePtr<AtLeastOnceAggregationProcedure>(metric, report)};
case ReportDefinition::SELECT_FIRST:
return {util::MakeNotNullUniquePtr<SelectFirstAggregationProcedure>(metric, report)};
case ReportDefinition::SELECT_MOST_COMMON:
return {util::MakeNotNullUniquePtr<SelectMostCommonAggregationProcedure>(metric, report)};
default:
return util::StatusBuilder(
StatusCode::FAILED_PRECONDITION,
"Report of type UNIQUE_DEVICE_COUNTS does not support selected aggregation "
"procedure: ")
.AppendMsg(report.local_aggregation_procedure())
.WithContext("Customer", customer_name)
.WithContext("Project", project_name)
.WithContexts(metric, report)
.LogError()
.Build();
}
break;
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
switch (metric.metric_type()) {
case MetricDefinition::OCCURRENCE:
return {util::MakeNotNullUniquePtr<CountAggregationProcedure>(metric, report)};
case MetricDefinition::INTEGER:
return {
NumericStatAggregationProcedure::New(customer_name, project_name, metric, report)};
default:
return util::StatusBuilder(
StatusCode::FAILED_PRECONDITION,
"Report of type UNIQUE_DEVICE_HISTOGRAMS is not valid for metric of type: ")
.AppendMsg(metric.metric_type())
.WithContext("Customer", customer_name)
.WithContext("Project", project_name)
.WithContexts(metric, report)
.LogError()
.Build();
}
break;
case ReportDefinition::FLEETWIDE_HISTOGRAMS:
return {util::MakeNotNullUniquePtr<IntegerHistogramAggregationProcedure>(metric, report)};
case ReportDefinition::FLEETWIDE_MEANS:
return {util::MakeNotNullUniquePtr<SumAndCountAggregationProcedure>(metric, report)};
case ReportDefinition::STRING_COUNTS:
return {util::MakeNotNullUniquePtr<StringHistogramAggregationProcedure>(metric, report)};
case ReportDefinition::UNIQUE_DEVICE_STRING_COUNTS:
return {util::MakeNotNullUniquePtr<AtLeastOnceStringAggregationProcedure>(metric, report)};
default:
return util::StatusBuilder(StatusCode::FAILED_PRECONDITION, "Report of unexpected type: ")
.AppendMsg(report.report_type())
.WithContext("Customer", customer_name)
.WithContext("Project", project_name)
.WithContexts(metric, report)
.LogError()
.Build();
}
}
AggregationProcedure::AggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: metric_(metric),
window_(report.local_aggregation_period()),
metric_type_(metric.metric_type()),
report_type_(report.report_type()),
system_profile_selection_policy_(report.system_profile_selection()) {
if (report.event_vector_buffer_max() > 0) {
event_vector_buffer_max_ = report.event_vector_buffer_max();
} else {
event_vector_buffer_max_ = logger::GetNumEventVectors(metric.metric_dimensions());
}
}
lib::statusor::StatusOr<ReportAggregate *> AggregationProcedure::GetReportAggregate(
MetricAggregate *metric_aggregate, uint32_t report_id, CivilTimeManager *civil_time_mgr) const {
bool new_report = (metric_aggregate->mutable_by_report_id()->count(report_id) == 0);
ReportAggregate *report_aggregate = &(*metric_aggregate->mutable_by_report_id())[report_id];
if (new_report) {
// For a newly found report, set the initial last day_index/hour_id to the last completed
// aggregation period. This method is only called if the report is new since the last
// aggregation run, which should have been at most an hour earlier, and it has not received any
// data since then.
if (IsDaily()) {
CB_ASSIGN_OR_RETURN(util::TimeInfo end_info, civil_time_mgr->GetTimeInfo(0, metric_));
report_aggregate->mutable_daily()->set_last_day_index(end_info.day_index - 1);
} else {
CB_ASSIGN_OR_RETURN(util::TimeInfo end_info, civil_time_mgr->GetTimeInfo(1, metric_));
report_aggregate->mutable_hourly()->set_last_hour_id(end_info.hour_id);
}
}
return report_aggregate;
}
AggregationPeriodBucket &AggregationProcedure::GetAggregationPeriodBucket(
const util::TimeInfo &time, ReportAggregate &aggregate) const {
if (IsDaily()) {
return (*aggregate.mutable_daily()->mutable_by_day_index())[time.day_index];
}
return (*aggregate.mutable_hourly()->mutable_by_hour_id())[time.hour_id];
}
bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
return type == Event::TypeCase::kOccurrenceEvent;
case MetricDefinition::INTEGER:
return type == Event::TypeCase::kIntegerEvent;
case MetricDefinition::INTEGER_HISTOGRAM:
return type == Event::TypeCase::kIntegerHistogramEvent;
case MetricDefinition::STRING:
return type == Event::TypeCase::kStringEvent;
default:
LOG(ERROR) << DebugString() << ": Metric of unexpected type: " << metric_type_;
return false;
}
}
lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record,
AggregationPeriodBucket &bucket,
uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) const {
google::protobuf::RepeatedField<uint32_t> event_codes;
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
event_codes = event_record.event()->occurrence_event().event_code();
break;
case MetricDefinition::INTEGER:
event_codes = event_record.event()->integer_event().event_code();
break;
case MetricDefinition::INTEGER_HISTOGRAM:
event_codes = event_record.event()->integer_histogram_event().event_code();
break;
case MetricDefinition::STRING:
event_codes = event_record.event()->string_event().event_code();
break;
default:
return util::StatusBuilder(StatusCode::INVALID_ARGUMENT)
.AppendMsg(DebugString())
.AppendMsg(": Unexpected metric of type")
.AppendMsg(metric_type_)
.LogError()
.Build();
}
int64_t system_timestamp = util::ToUnixSeconds(system_time);
if (system_profile_selection_policy_ == SELECT_FIRST ||
system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) {
SystemProfileAggregate *system_profile_aggregate;
if (bucket.system_profile_aggregates_size() >= 1) {
if (bucket.system_profile_aggregates_size() > 1) {
LOG(ERROR) << "There are " << bucket.system_profile_aggregates_size()
<< " system profile aggregates for a report with system_profile_selection "
<< system_profile_selection_policy_ << ". There should be only one.";
// Use the first aggregate anyway.
}
system_profile_aggregate = bucket.mutable_system_profile_aggregates(0);
if ((system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) &&
system_profile_aggregate->system_profile_hash() != system_profile_hash) {
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
}
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
} else {
system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
}
return GetAggregateData(*system_profile_aggregate, event_codes);
}
if (system_profile_selection_policy_ == REPORT_ALL) {
SystemProfileAggregate *system_profile_aggregate = nullptr;
for (SystemProfileAggregate &aggregate : *(bucket.mutable_system_profile_aggregates())) {
if (aggregate.system_profile_hash() == system_profile_hash) {
system_profile_aggregate = &aggregate;
}
}
if (system_profile_aggregate == nullptr) {
system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
}
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
return GetAggregateData(*system_profile_aggregate, event_codes);
}
return util::StatusBuilder(StatusCode::NOT_FOUND, "Unknown system_profile_selection_policy ")
.AppendMsg(system_profile_selection_policy_)
.LogError()
.Build();
}
lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
AggregationProcedure::GetAggregateData(
SystemProfileAggregate &system_profile_aggregate,
google::protobuf::RepeatedField<uint32_t> event_codes) const {
for (EventCodesAggregateData &aggregate_data :
*system_profile_aggregate.mutable_by_event_code()) {
// Find the event codes that match the event's.
if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
event_codes.begin(), event_codes.end())) {
return std::ref(*aggregate_data.mutable_data());
}
}
// Event codes were not found, so add them as a new entry.
if (system_profile_aggregate.by_event_code_size() < event_vector_buffer_max_) {
EventCodesAggregateData *aggregate_data = system_profile_aggregate.add_by_event_code();
aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
return std::ref(*aggregate_data->mutable_data());
}
return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
"SystemProfileAggregate has reached event_vector_buffer_max")
.WithContext("event_vector_buffer_max", event_vector_buffer_max_)
.Build();
}
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
ReportAggregate &aggregate, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) {
if (!IsValidEventType(event_record.event()->type_case())) {
LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
<< " with aggregation procedure of type " << DebugString() << ".";
return;
}
std::reference_wrapper<AggregationPeriodBucket> bucket =
GetAggregationPeriodBucket(event_record.GetTimeInfo(), aggregate);
if (lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> aggregate_data =
GetAggregateData(event_record, bucket, system_profile_hash, system_time);
aggregate_data.ok()) {
UpdateAggregateData(event_record, aggregate_data.value(), bucket);
}
}
void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
const SystemProfileAggregate &aggregate) {
if (system_profile_selection_policy_ == SELECT_FIRST) {
if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
}
if (aggregate.last_seen_timestamp() > merged_aggregate.last_seen_timestamp()) {
merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
}
} else { // SELECT_LAST or SELECT_DEFAULT
if (aggregate.last_seen_timestamp() >= merged_aggregate.last_seen_timestamp()) {
merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
}
if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
}
}
for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) {
// Find or create the corresponding AggregateData in the merged system profile aggregate.
lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> data =
GetAggregateData(merged_aggregate, aggregate_data.event_codes());
// A non-OK status here means that there is no room in event_vector_buffer_max_ for the event
// codes.
if (data.ok()) {
MergeAggregateData(data.value(), aggregate_data.data());
}
}
}
util::TimeInfo AggregationProcedure::GetStartTimeInfo(
const util::TimeInfo &current_time_info) const {
if (IsDaily()) {
// For daily aggregates, use the current day's day index, for multi-day aggregates (e.g. 7-day)
// the start is the oldest day index in the aggregation window.
uint32_t start_day_index = current_time_info.day_index - static_cast<uint32_t>(window_) + 1;
return util::TimeInfo::FromDayIndex(start_day_index);
}
// For hourly, always use the current time info's hour_id.
return current_time_info;
}
namespace {
// Create aggregate data needed to generate a single string histogram observation for a aggregation
// period bucket. Use legacy hashes if they're present. Use Farmhash Fingerprint 64 hashes
// otherwise.
//
// TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer
// store them.
AggregateDataToGenerate GetAggregateDataToGenerateForFF64Migration(AggregationPeriodBucket *agg) {
if (!agg->string_hashes().empty()) {
return AggregateDataToGenerate(agg->string_hashes(), /*use_legacy_hash=*/true);
}
return AggregateDataToGenerate(agg->string_hashes_ff64(), /*use_legacy_hash=*/false);
}
} // namespace
std::map<uint64_t, std::vector<AggregateDataToGenerate>>
AggregationProcedure::GetAggregateDataToGenerate(const util::TimeInfo &time_info,
ReportAggregate &aggregate) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate;
util::TimeInfo start_time_info = GetStartTimeInfo(time_info);
if (IsDaily()) {
if (system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_FIRST ||
system_profile_selection_policy_ == SELECT_DEFAULT) {
uint64_t system_profile_hash;
uint32_t last_seen_timestamp = 0;
uint32_t first_seen_timestamp = UINT32_MAX;
std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
if (!aggregate.daily().by_day_index().contains(i)) {
continue;
}
AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
AggregateDataToGenerate agg_to_generate = GetAggregateDataToGenerateForFF64Migration(agg);
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
// For SELECT_FIRST and SELECT_LAST there should only be one SystemProfileAggregate, but
// for multi-day reports the correct system profile to use has to be determined from the
// multiple AggregationPeriodBuckets for the days in the aggregation window.
if ((system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) &&
system_profile_aggregate.last_seen_timestamp() >= last_seen_timestamp) {
system_profile_hash = system_profile_aggregate.system_profile_hash();
last_seen_timestamp = system_profile_aggregate.last_seen_timestamp();
} else if (system_profile_selection_policy_ == SELECT_FIRST &&
system_profile_aggregate.first_seen_timestamp() < first_seen_timestamp) {
system_profile_hash = system_profile_aggregate.system_profile_hash();
first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
}
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
agg_to_generate.aggregate_data.push_back(data);
}
}
if (!agg_to_generate.aggregate_data.empty()) {
data_to_generate_for_system_profile.emplace_back(std::move(agg_to_generate));
}
}
if (!data_to_generate_for_system_profile.empty()) {
data_to_generate[system_profile_hash] = std::move(data_to_generate_for_system_profile);
}
} else if (system_profile_selection_policy_ == REPORT_ALL) {
for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
if (!aggregate.daily().by_day_index().contains(i)) {
continue;
}
AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
AggregateDataToGenerate agg_to_generate = GetAggregateDataToGenerateForFF64Migration(agg);
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
agg_to_generate.aggregate_data.push_back(data);
}
data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
std::move(agg_to_generate));
}
}
}
} else {
if (aggregate.hourly().by_hour_id().contains(start_time_info.hour_id)) {
AggregationPeriodBucket *agg =
&(*aggregate.mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
AggregateDataToGenerate agg_to_generate = GetAggregateDataToGenerateForFF64Migration(agg);
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
agg_to_generate.aggregate_data.push_back(data);
}
data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
std::move(agg_to_generate));
}
}
}
return data_to_generate;
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>>
AggregationProcedure::GenerateObservations(const util::TimeInfo &time_info,
ReportAggregate &aggregate) {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(time_info, aggregate);
std::vector<ObservationAndSystemProfile> observations;
for (const auto &[system_profile_hash, aggregates] : data_to_generate) {
CB_ASSIGN_OR_RETURN(std::unique_ptr<Observation> observation,
GenerateSingleObservation(
aggregates, SelectEventVectorsForObservation(aggregates), time_info));
observations.push_back(
{.system_profile_hash = system_profile_hash, .observation = std::move(observation)});
}
return observations;
}
void AggregationProcedure::ObservationsCommitted(
ReportAggregate &aggregate, util::TimeInfo time_info,
std::optional<uint64_t> system_profile_hash) const {
util::TimeInfo delete_before = GetStartTimeInfo(time_info);
// Clean up aggregates that will never be used again.
if (IsDaily()) {
std::vector<uint32_t> days_to_delete;
for (const auto &day : aggregate.daily().by_day_index()) {
if (day.first <= delete_before.day_index) {
days_to_delete.push_back(day.first);
}
}
for (auto day : days_to_delete) {
aggregate.mutable_daily()->mutable_by_day_index()->erase(day);
}
} else {
std::vector<uint32_t> hours_to_delete;
for (const auto &hour : aggregate.hourly().by_hour_id()) {
if (hour.first <= delete_before.hour_id) {
hours_to_delete.push_back(hour.first);
}
}
for (auto hour : hours_to_delete) {
aggregate.mutable_hourly()->mutable_by_hour_id()->erase(hour);
}
}
if (IsDaily()) {
aggregate.mutable_daily()->set_last_day_index(time_info.day_index);
} else {
aggregate.mutable_hourly()->set_last_hour_id(time_info.hour_id);
}
}
std::set<std::vector<uint32_t>> AggregationProcedure::SelectEventVectorsForObservation(
const std::vector<AggregateDataToGenerate> &buckets) const {
std::set<std::vector<uint32_t>> event_vectors;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
if (event_vectors.size() == event_vector_buffer_max_) {
return event_vectors;
}
event_vectors.insert(
{aggregate_data.event_codes().begin(), aggregate_data.event_codes().end()});
}
}
return event_vectors;
}
} // namespace cobalt::local_aggregation