| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| |
| #include <memory> |
| |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/hash.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h" |
| #include "src/logging.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/statusor/statusor.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| std::unique_ptr<LocalAggregateStorage> LocalAggregateStorage::New( |
| StorageStrategy strategy, std::string base_directory, util::FileSystem& fs, |
| const logger::ProjectContextFactory& global_project_context_factory, |
| MetadataBuilder& metadata_builder, int64_t per_project_reserved_bytes) { |
| switch (strategy) { |
| case StorageStrategy::Immediate: |
| return std::make_unique<ImmediateLocalAggregateStorage>( |
| std::move(base_directory), fs, global_project_context_factory, metadata_builder, |
| per_project_reserved_bytes); |
| |
| case StorageStrategy::Delayed: |
| return std::make_unique<DelayedLocalAggregateStorage>( |
| std::move(base_directory), fs, global_project_context_factory, metadata_builder, |
| per_project_reserved_bytes); |
| } |
| } |
| |
| namespace { |
| |
| const ReportDefinition* FindReportDefinition(const MetricDefinition& metric_definition, |
| uint32_t report_id) { |
| for (const ReportDefinition& report_def : metric_definition.reports()) { |
| if (report_def.id() == report_id) { |
| return &report_def; |
| } |
| } |
| return nullptr; |
| } |
| |
| } // namespace |
| |
| void LocalAggregateStorage::MigrateAggregationPeriodBucket( |
| AggregationPeriodBucket& bucket, util::TimeInfo time_info, int64_t timestamp, |
| const MetricDefinition& metric_definition, const ReportDefinition& report_def, |
| const MetadataBuilder& metadata_builder) { |
| // Get the system profile that was active at the time of the aggregated events. |
| SystemProfile system_profile = |
| metadata_builder.GetSystemProfile(metric_definition, report_def, time_info); |
| |
| // Build the filtered system profile for this report. |
| SystemProfile filtered_system_profile; |
| MetadataBuilder::FilteredSystemProfile(report_def, system_profile, &filtered_system_profile); |
| uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString()); |
| |
| // Add the new SystemProfileAggregate for this filtered system profile. |
| SystemProfileAggregate* system_profile_agg = bucket.add_system_profile_aggregates(); |
| system_profile_agg->set_system_profile_hash(system_profile_hash); |
| system_profile_agg->set_first_seen_timestamp(timestamp); |
| system_profile_agg->set_last_seen_timestamp(timestamp); |
| |
| // Copy the existing aggregate data into the new SystemProfileAggregate. |
| for (const auto& deprecated_by_event_code : bucket.deprecated_by_event_code()) { |
| *system_profile_agg->add_by_event_code() = deprecated_by_event_code; |
| } |
| // Remove the old deprecated aggregate data. |
| bucket.clear_deprecated_by_event_code(); |
| |
| // Ensure the filtered SystemProfile is present in the aggregate storage. |
| StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile); |
| } |
| |
| bool LocalAggregateStorage::MigrateReportAllData(AggregationPeriodBucket& bucket, |
| const MetricDefinition& metric_definition, |
| const ReportDefinition& report_def) { |
| lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> procedure_or = |
| AggregationProcedure::Get(metric_definition, report_def); |
| if (!procedure_or.ok()) { |
| LOG(ERROR) << "Failed to create aggregation procedure to migrate REPORT_ALL data for report " |
| << metric_definition.id() << "-" << report_def.id() << ": " << procedure_or.status(); |
| return false; |
| } |
| std::unique_ptr<AggregationProcedure> procedure = std::move(procedure_or).value(); |
| if (procedure == nullptr) { |
| // This is a non cobalt 1.1 report type, should be silently ignored. |
| return false; |
| } |
| |
| google::protobuf::RepeatedPtrField<SystemProfileAggregate>* system_profile_aggregates = |
| bucket.mutable_system_profile_aggregates(); |
| auto system_profile_aggregates_it = system_profile_aggregates->begin(); |
| |
| // Merge all the other aggregates into the first entry. |
| SystemProfileAggregate& merged_system_profile_aggregate = *system_profile_aggregates_it; |
| |
| ++system_profile_aggregates_it; |
| while (system_profile_aggregates_it != system_profile_aggregates->end()) { |
| procedure->MergeSystemProfileAggregates(merged_system_profile_aggregate, |
| *system_profile_aggregates_it); |
| |
| // After merging, remove the old aggregate from the repeated field. |
| system_profile_aggregates_it = system_profile_aggregates->erase(system_profile_aggregates_it); |
| } |
| return true; |
| } |
| |
| bool LocalAggregateStorage::MigrateStoredData(MetricAggregate& metric, |
| const MetricDefinition& metric_definition, |
| const MetadataBuilder& metadata_builder) { |
| bool changed = false; |
| |
| if (metric.deprecated_by_system_profile_size() > 0) { |
| // NOTE(fxbug.dev/87271): This is needed to clean up excess by_system_profiles in the aggregate |
| // store. Do not delete unless you are *certain* there are no devices in the wild that still |
| // need this cleanup |
| metric.clear_deprecated_by_system_profile(); |
| changed = true; |
| LOG(INFO) << "Cleared the local aggregate data storage use of the DEPRECATED_by_system_profile " |
| "field."; |
| } |
| |
| // Migrate from AggregationPeriodBucket.deprecated_by_event_code to use system_profile_aggregates. |
| // ReportAggregates for reports not found in the Registry are also removed. |
| auto* by_report_id = metric.mutable_by_report_id(); |
| for (auto report_it = by_report_id->begin(); report_it != by_report_id->end();) { |
| uint32_t report_id = report_it->first; |
| ReportAggregate& report = report_it->second; |
| const ReportDefinition* report_def = FindReportDefinition(metric_definition, report_id); |
| |
| if (report_def == nullptr) { |
| LOG(WARNING) << "Found report that is not present in the registry: (" |
| << metric_definition.customer_id() << ", " << metric_definition.project_id() |
| << ", " << metric_definition.id() << ", " << report_id << "): Deleting it."; |
| changed = true; |
| report_it = by_report_id->erase(report_it); |
| continue; |
| } |
| ++report_it; |
| |
| switch (report.time_period_case()) { |
| case ReportAggregate::kHourly: |
| for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) { |
| if (bucket.deprecated_by_event_code_size() > 0) { |
| changed = true; |
| MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromHourId(hour_id), |
| util::HourIdToUnixSeconds(hour_id), metric_definition, |
| *report_def, metadata_builder); |
| LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles " |
| "for report: " |
| << report_id << " hour id: " << hour_id; |
| } |
| if (report_def->system_profile_selection() != REPORT_ALL && |
| bucket.system_profile_aggregates_size() > 1) { |
| if (MigrateReportAllData(bucket, metric_definition, *report_def)) { |
| changed = true; |
| LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: " |
| << SystemProfileSelectionPolicy_Name( |
| report_def->system_profile_selection()); |
| } |
| } |
| } |
| break; |
| case ReportAggregate::kDaily: |
| for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) { |
| if (bucket.deprecated_by_event_code_size() > 0) { |
| changed = true; |
| MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromDayIndex(day_index), |
| util::DayIndexToUnixSeconds(day_index), |
| metric_definition, *report_def, metadata_builder); |
| LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles " |
| "for report: " |
| << report_id << " day index: " << day_index; |
| } |
| if (report_def->system_profile_selection() != REPORT_ALL && |
| bucket.system_profile_aggregates_size() > 1) { |
| if (MigrateReportAllData(bucket, metric_definition, *report_def)) { |
| changed = true; |
| LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: " |
| << SystemProfileSelectionPolicy_Name( |
| report_def->system_profile_selection()); |
| } |
| } |
| } |
| break; |
| default: |
| // Ignore empty ReportAggregate. |
| break; |
| } |
| } |
| |
| // Migration from deprecated_at_least_once boolean to new AtLeastOnce message. |
| // Must come after the migration to use system_profile_aggregates. |
| for (auto& [report_id, report] : *metric.mutable_by_report_id()) { |
| if (report.has_daily()) { |
| uint32_t last_day_index = report.daily().last_day_index(); |
| for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) { |
| for (SystemProfileAggregate& system_profile_agg : |
| *bucket.mutable_system_profile_aggregates()) { |
| for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) { |
| if (data.data().deprecated_at_least_once()) { |
| changed = true; |
| data.mutable_data()->clear_deprecated_at_least_once(); |
| data.mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| data.mutable_data()->mutable_at_least_once()->set_last_day_index(last_day_index); |
| LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce " |
| "message for report: " |
| << report_id |
| << " system profile hash: " << system_profile_agg.system_profile_hash() |
| << " day index: " << day_index; |
| } |
| } |
| } |
| } |
| } |
| if (report.has_hourly()) { |
| for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) { |
| for (SystemProfileAggregate& system_profile_agg : |
| *bucket.mutable_system_profile_aggregates()) { |
| for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) { |
| // Migration from deprecated_at_least_once boolean to new AtLeastOnce message. |
| if (data.data().deprecated_at_least_once()) { |
| changed = true; |
| data.mutable_data()->clear_deprecated_at_least_once(); |
| data.mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce " |
| "message for report: " |
| << report_id |
| << " system profile hash: " << system_profile_agg.system_profile_hash() |
| << " hour id: " << hour_id; |
| } |
| } |
| } |
| } |
| } |
| } |
| return changed; |
| } |
| |
| void LocalAggregateStorage::UpdateProjectSizeBy(lib::ProjectIdentifier proj, |
| int64_t size_increase) { |
| // If the size hasn't changed, nothing needs to be done. |
| if (size_increase == 0) { |
| return; |
| } |
| auto locked = byte_tracking_.lock(); |
| locked->total += size_increase; |
| |
| // Subtract current slush usage for project. |
| if (locked->total_per_project[proj] > per_project_reserved_bytes_) { |
| locked->total_slush -= locked->total_per_project[proj] - per_project_reserved_bytes_; |
| } |
| |
| locked->total_per_project[proj] += size_increase; |
| // This shouldn't happen, but a negative total_per_project is very undesirable. |
| if (locked->total_per_project[proj] < 0) { |
| LOG(ERROR) << "More data was deleted from a project than was previously present. This " |
| "represents a bug in Cobalt."; |
| locked->total_per_project[proj] = 0; |
| } |
| |
| // Add current slush usage for project. |
| if (locked->total_per_project[proj] > per_project_reserved_bytes_) { |
| locked->total_slush += locked->total_per_project[proj] - per_project_reserved_bytes_; |
| } |
| } |
| |
| } // namespace cobalt::local_aggregation |