| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| |
| #include <functional> |
| #include <memory> |
| |
| #include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/event_vector_index.h" |
| #include "src/logging.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/registry/metric_definition.pb.h" |
| #include "src/registry/report_definition.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> AggregationProcedure::Get( |
| const MetricDefinition &metric, const ReportDefinition &report) { |
| VLOG(5) << "Getting aggregation procedure for report type " << report.report_type(); |
| switch (report.report_type()) { |
| case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS: |
| return {std::make_unique<CountAggregationProcedure>(metric, report)}; |
| case ReportDefinition::UNIQUE_DEVICE_COUNTS: |
| switch (report.local_aggregation_procedure()) { |
| case ReportDefinition::AT_LEAST_ONCE: |
| return {std::make_unique<AtLeastOnceAggregationProcedure>(metric, report)}; |
| case ReportDefinition::SELECT_FIRST: |
| return {std::make_unique<SelectFirstAggregationProcedure>(metric, report)}; |
| case ReportDefinition::SELECT_MOST_COMMON: |
| return {std::make_unique<SelectMostCommonAggregationProcedure>(metric, report)}; |
| default: |
| return util::StatusBuilder( |
| StatusCode::FAILED_PRECONDITION, |
| "Report of type UNIQUE_DEVICE_COUNTS does not support selected aggregation " |
| "procedure: ") |
| .AppendMsg(report.local_aggregation_procedure()) |
| .WithContext("Customer", metric.customer_name()) |
| .WithContext("Project", metric.project_name()) |
| .WithContexts(metric, report) |
| .LogError() |
| .Build(); |
| } |
| break; |
| case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS: |
| case ReportDefinition::HOURLY_VALUE_HISTOGRAMS: |
| case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS: |
| case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS: |
| switch (metric.metric_type()) { |
| case MetricDefinition::OCCURRENCE: |
| return {std::make_unique<CountAggregationProcedure>(metric, report)}; |
| case MetricDefinition::INTEGER: |
| return {NumericStatAggregationProcedure::New(metric, report)}; |
| default: |
| return util::StatusBuilder( |
| StatusCode::FAILED_PRECONDITION, |
| "Report of type UNIQUE_DEVICE_HISTOGRAMS is not valid for metric of type: ") |
| .AppendMsg(metric.metric_type()) |
| .WithContext("Customer", metric.customer_name()) |
| .WithContext("Project", metric.project_name()) |
| .WithContexts(metric, report) |
| .LogError() |
| .Build(); |
| } |
| break; |
| case ReportDefinition::FLEETWIDE_HISTOGRAMS: |
| return {std::make_unique<IntegerHistogramAggregationProcedure>(metric, report)}; |
| case ReportDefinition::FLEETWIDE_MEANS: |
| return {std::make_unique<SumAndCountAggregationProcedure>(metric, report)}; |
| case ReportDefinition::STRING_COUNTS: |
| return {std::make_unique<StringHistogramAggregationProcedure>(metric, report)}; |
| case ReportDefinition::UNIQUE_DEVICE_STRING_COUNTS: |
| return {std::make_unique<AtLeastOnceStringAggregationProcedure>(metric, report)}; |
| default: |
| // This is a non cobalt 1.1 report type, should be silently ignored. |
| return {nullptr}; |
| } |
| } |
| |
| AggregationProcedure::AggregationProcedure(const MetricDefinition &metric, |
| const ReportDefinition &report) |
| : metric_type_(metric.metric_type()), |
| report_type_(report.report_type()), |
| system_profile_selection_policy_(report.system_profile_selection()) { |
| if (report.event_vector_buffer_max() > 0) { |
| event_vector_buffer_max_ = report.event_vector_buffer_max(); |
| } else { |
| event_vector_buffer_max_ = logger::GetNumEventVectors(metric.metric_dimensions()); |
| } |
| // TODO(fxbug.dev/53691): Switch when local_aggregation_period has been |
| // switched to OnDeviceAggregationWindow. |
| window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period())); |
| } |
| |
| AggregationPeriodBucket &AggregationProcedure::GetAggregationPeriodBucket( |
| const util::TimeInfo &time, ReportAggregate &aggregate) const { |
| if (IsDaily()) { |
| return (*aggregate.mutable_daily()->mutable_by_day_index())[time.day_index]; |
| } |
| return (*aggregate.mutable_hourly()->mutable_by_hour_id())[time.hour_id]; |
| } |
| |
| bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const { |
| switch (metric_type_) { |
| case MetricDefinition::OCCURRENCE: |
| return type == Event::TypeCase::kOccurrenceEvent; |
| case MetricDefinition::INTEGER: |
| return type == Event::TypeCase::kIntegerEvent; |
| case MetricDefinition::INTEGER_HISTOGRAM: |
| return type == Event::TypeCase::kIntegerHistogramEvent; |
| case MetricDefinition::STRING: |
| return type == Event::TypeCase::kStringEvent; |
| default: |
| LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_ |
| << " does not appear to be a cobalt 1.1 metric"; |
| return false; |
| } |
| } |
| |
| lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> |
| AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record, |
| AggregationPeriodBucket &bucket, |
| uint64_t system_profile_hash, |
| std::chrono::system_clock::time_point system_time) const { |
| google::protobuf::RepeatedField<uint32_t> event_codes; |
| |
| switch (metric_type_) { |
| case MetricDefinition::OCCURRENCE: |
| event_codes = event_record.event()->occurrence_event().event_code(); |
| break; |
| case MetricDefinition::INTEGER: |
| event_codes = event_record.event()->integer_event().event_code(); |
| break; |
| case MetricDefinition::INTEGER_HISTOGRAM: |
| event_codes = event_record.event()->integer_histogram_event().event_code(); |
| break; |
| case MetricDefinition::STRING: |
| event_codes = event_record.event()->string_event().event_code(); |
| break; |
| default: |
| return util::StatusBuilder(StatusCode::INVALID_ARGUMENT) |
| .AppendMsg(DebugString()) |
| .AppendMsg(": Metric of type") |
| .AppendMsg(metric_type_) |
| .AppendMsg(" does not appear to be a Cobalt 1.1 metric") |
| .LogError() |
| .Build(); |
| } |
| |
| int64_t system_timestamp = util::ToUnixSeconds(system_time); |
| |
| if (system_profile_selection_policy_ == SELECT_FIRST || |
| system_profile_selection_policy_ == SELECT_LAST || |
| system_profile_selection_policy_ == SELECT_DEFAULT) { |
| SystemProfileAggregate *system_profile_aggregate; |
| if (bucket.system_profile_aggregates_size() >= 1) { |
| if (bucket.system_profile_aggregates_size() > 1) { |
| LOG(ERROR) << "There are " << bucket.system_profile_aggregates_size() |
| << " system profile aggregates for a report with system_profile_selection " |
| << system_profile_selection_policy_ << ". There should be only one."; |
| // Use the first aggregate anyway. |
| } |
| system_profile_aggregate = bucket.mutable_system_profile_aggregates(0); |
| if ((system_profile_selection_policy_ == SELECT_LAST || |
| system_profile_selection_policy_ == SELECT_DEFAULT) && |
| system_profile_aggregate->system_profile_hash() != system_profile_hash) { |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash); |
| } |
| system_profile_aggregate->set_last_seen_timestamp(system_timestamp); |
| } else { |
| system_profile_aggregate = bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash); |
| system_profile_aggregate->set_first_seen_timestamp(system_timestamp); |
| system_profile_aggregate->set_last_seen_timestamp(system_timestamp); |
| } |
| |
| return GetAggregateData(*system_profile_aggregate, event_codes); |
| } |
| |
| if (system_profile_selection_policy_ == REPORT_ALL) { |
| SystemProfileAggregate *system_profile_aggregate = nullptr; |
| for (SystemProfileAggregate &aggregate : *(bucket.mutable_system_profile_aggregates())) { |
| if (aggregate.system_profile_hash() == system_profile_hash) { |
| system_profile_aggregate = &aggregate; |
| } |
| } |
| if (system_profile_aggregate == nullptr) { |
| system_profile_aggregate = bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash); |
| system_profile_aggregate->set_first_seen_timestamp(system_timestamp); |
| } |
| system_profile_aggregate->set_last_seen_timestamp(system_timestamp); |
| |
| return GetAggregateData(*system_profile_aggregate, event_codes); |
| } |
| |
| return util::StatusBuilder(StatusCode::NOT_FOUND, "Unknown system_profile_selection_policy ") |
| .AppendMsg(system_profile_selection_policy_) |
| .LogError() |
| .Build(); |
| } |
| |
| lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> |
| AggregationProcedure::GetAggregateData( |
| SystemProfileAggregate &system_profile_aggregate, |
| google::protobuf::RepeatedField<uint32_t> event_codes) const { |
| for (EventCodesAggregateData &aggregate_data : |
| *system_profile_aggregate.mutable_by_event_code()) { |
| // Find the event codes that match the event's. |
| if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(), |
| event_codes.begin(), event_codes.end())) { |
| return std::ref(*aggregate_data.mutable_data()); |
| } |
| } |
| // Event codes were not found, so add them as a new entry. |
| if (system_profile_aggregate.by_event_code_size() < event_vector_buffer_max_) { |
| EventCodesAggregateData *aggregate_data = system_profile_aggregate.add_by_event_code(); |
| aggregate_data->mutable_event_codes()->CopyFrom(event_codes); |
| return std::ref(*aggregate_data->mutable_data()); |
| } |
| |
| return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED, |
| "SystemProfileAggregate has reached event_vector_buffer_max") |
| .WithContext("event_vector_buffer_max", event_vector_buffer_max_) |
| .Build(); |
| } |
| |
| void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record, |
| ReportAggregate &aggregate, uint64_t system_profile_hash, |
| std::chrono::system_clock::time_point system_time) { |
| if (!IsValidEventType(event_record.event()->type_case())) { |
| LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case() |
| << " with aggregation procedure of type " << DebugString() << "."; |
| return; |
| } |
| |
| std::reference_wrapper<AggregationPeriodBucket> bucket = |
| GetAggregationPeriodBucket(event_record.GetTimeInfo(), aggregate); |
| |
| if (lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> aggregate_data = |
| GetAggregateData(event_record, bucket, system_profile_hash, system_time); |
| aggregate_data.ok()) { |
| UpdateAggregateData(event_record, aggregate_data.ConsumeValueOrDie(), bucket); |
| } |
| } |
| |
| void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate, |
| const SystemProfileAggregate &aggregate) { |
| if (system_profile_selection_policy_ == SELECT_FIRST) { |
| if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) { |
| merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash()); |
| merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp()); |
| } |
| if (aggregate.last_seen_timestamp() > merged_aggregate.last_seen_timestamp()) { |
| merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp()); |
| } |
| } else { // SELECT_LAST or SELECT_DEFAULT |
| if (aggregate.last_seen_timestamp() >= merged_aggregate.last_seen_timestamp()) { |
| merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash()); |
| merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp()); |
| } |
| if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) { |
| merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp()); |
| } |
| } |
| |
| for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) { |
| // Find or create the corresponding AggregateData in the merged system profile aggregate. |
| lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> data = |
| GetAggregateData(merged_aggregate, aggregate_data.event_codes()); |
| // A non-OK status here means that there is no room in event_vector_buffer_max_ for the event |
| // codes. |
| if (data.ok()) { |
| MergeAggregateData(data.ValueOrDie(), aggregate_data.data()); |
| } |
| } |
| } |
| |
| util::TimeInfo AggregationProcedure::GetStartTimeInfo( |
| const util::TimeInfo ¤t_time_info) const { |
| if (IsDaily()) { |
| // For daily aggregates, use the current day's day index, for multi-day aggregates (e.g. 7-day) |
| // the start is the oldest day index in the aggregation window. |
| uint32_t start_day_index = |
| current_time_info.day_index - static_cast<uint32_t>(window_.days()) + 1; |
| return util::TimeInfo::FromDayIndex(start_day_index); |
| } |
| // For hourly, always use the current time info's hour_id. |
| return current_time_info; |
| } |
| |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> |
| AggregationProcedure::GetAggregateDataToGenerate(const util::TimeInfo &time_info, |
| ReportAggregate &aggregate) const { |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate; |
| |
| util::TimeInfo start_time_info = GetStartTimeInfo(time_info); |
| if (IsDaily()) { |
| if (system_profile_selection_policy_ == SELECT_LAST || |
| system_profile_selection_policy_ == SELECT_FIRST || |
| system_profile_selection_policy_ == SELECT_DEFAULT) { |
| uint64_t system_profile_hash; |
| uint32_t last_seen_timestamp = 0; |
| uint32_t first_seen_timestamp = UINT32_MAX; |
| std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile; |
| for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) { |
| if (!aggregate.daily().by_day_index().contains(i)) { |
| continue; |
| } |
| AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i]; |
| AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()}; |
| for (SystemProfileAggregate &system_profile_aggregate : |
| *agg->mutable_system_profile_aggregates()) { |
| // For SELECT_FIRST and SELECT_LAST there should only be one SystemProfileAggregate, but |
| // for multi-day reports the correct system profile to use has to be determined from the |
| // multiple AggregationPeriodBuckets for the days in the aggregation window. |
| if ((system_profile_selection_policy_ == SELECT_LAST || |
| system_profile_selection_policy_ == SELECT_DEFAULT) && |
| system_profile_aggregate.last_seen_timestamp() >= last_seen_timestamp) { |
| system_profile_hash = system_profile_aggregate.system_profile_hash(); |
| last_seen_timestamp = system_profile_aggregate.last_seen_timestamp(); |
| } else if (system_profile_selection_policy_ == SELECT_FIRST && |
| system_profile_aggregate.first_seen_timestamp() < first_seen_timestamp) { |
| system_profile_hash = system_profile_aggregate.system_profile_hash(); |
| first_seen_timestamp = system_profile_aggregate.first_seen_timestamp(); |
| } |
| for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) { |
| agg_to_generate.aggregate_data.push_back(data); |
| } |
| } |
| if (!agg_to_generate.aggregate_data.empty()) { |
| data_to_generate_for_system_profile.emplace_back(std::move(agg_to_generate)); |
| } |
| } |
| if (!data_to_generate_for_system_profile.empty()) { |
| data_to_generate[system_profile_hash] = std::move(data_to_generate_for_system_profile); |
| } |
| } else if (system_profile_selection_policy_ == REPORT_ALL) { |
| for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) { |
| if (!aggregate.daily().by_day_index().contains(i)) { |
| continue; |
| } |
| AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i]; |
| for (SystemProfileAggregate &system_profile_aggregate : |
| *agg->mutable_system_profile_aggregates()) { |
| AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()}; |
| for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) { |
| agg_to_generate.aggregate_data.push_back(data); |
| } |
| data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back( |
| std::move(agg_to_generate)); |
| } |
| } |
| } |
| } else { |
| if (aggregate.hourly().by_hour_id().contains(start_time_info.hour_id)) { |
| AggregationPeriodBucket *agg = |
| &(*aggregate.mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id]; |
| for (SystemProfileAggregate &system_profile_aggregate : |
| *agg->mutable_system_profile_aggregates()) { |
| AggregateDataToGenerate agg_to_generate{.string_hashes = agg->string_hashes()}; |
| for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) { |
| agg_to_generate.aggregate_data.push_back(data); |
| } |
| data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back( |
| std::move(agg_to_generate)); |
| } |
| } |
| } |
| return data_to_generate; |
| } |
| |
| lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> |
| AggregationProcedure::GenerateObservations(const util::TimeInfo &time_info, |
| ReportAggregate &aggregate) { |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate = |
| GetAggregateDataToGenerate(time_info, aggregate); |
| |
| std::vector<ObservationAndSystemProfile> observations; |
| for (const auto &[system_profile_hash, aggregates] : data_to_generate) { |
| CB_ASSIGN_OR_RETURN(std::unique_ptr<Observation> observation, |
| GenerateSingleObservation( |
| aggregates, SelectEventVectorsForObservation(aggregates), time_info)); |
| observations.push_back( |
| {.system_profile_hash = system_profile_hash, .observation = std::move(observation)}); |
| } |
| |
| return observations; |
| } |
| |
| void AggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate, |
| util::TimeInfo time_info, |
| uint64_t system_profile_hash) const { |
| util::TimeInfo delete_before = GetStartTimeInfo(time_info); |
| |
| // Clean up aggregates that will never be used again. |
| if (IsDaily()) { |
| std::vector<uint32_t> days_to_delete; |
| for (const auto &day : aggregate.daily().by_day_index()) { |
| if (day.first <= delete_before.day_index) { |
| days_to_delete.push_back(day.first); |
| } |
| } |
| |
| for (auto day : days_to_delete) { |
| aggregate.mutable_daily()->mutable_by_day_index()->erase(day); |
| } |
| } else { |
| std::vector<uint32_t> hours_to_delete; |
| for (const auto &hour : aggregate.hourly().by_hour_id()) { |
| if (hour.first <= delete_before.hour_id) { |
| hours_to_delete.push_back(hour.first); |
| } |
| } |
| |
| for (auto hour : hours_to_delete) { |
| aggregate.mutable_hourly()->mutable_by_hour_id()->erase(hour); |
| } |
| } |
| |
| if (IsDaily()) { |
| aggregate.mutable_daily()->set_last_day_index(time_info.day_index); |
| } else { |
| aggregate.mutable_hourly()->set_last_hour_id(time_info.hour_id); |
| } |
| } |
| |
| std::set<std::vector<uint32_t>> AggregationProcedure::SelectEventVectorsForObservation( |
| const std::vector<AggregateDataToGenerate> &buckets) const { |
| std::set<std::vector<uint32_t>> event_vectors; |
| for (const AggregateDataToGenerate &bucket : buckets) { |
| for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) { |
| if (event_vectors.size() == event_vector_buffer_max_) { |
| return event_vectors; |
| } |
| event_vectors.insert( |
| {aggregate_data.event_codes().begin(), aggregate_data.event_codes().end()}); |
| } |
| } |
| return event_vectors; |
| } |
| |
| } // namespace cobalt::local_aggregation |