blob: f3091aa863674a1ac0cb9d2c25f25198248d6498 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include <memory>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/statusor/statusor.h"
namespace cobalt::local_aggregation {
std::unique_ptr<LocalAggregateStorage> LocalAggregateStorage::New(
StorageStrategy strategy, std::string base_directory, util::FileSystem& fs,
const logger::ProjectContextFactory& global_project_context_factory,
MetadataBuilder& metadata_builder, int64_t per_project_reserved_bytes) {
switch (strategy) {
case StorageStrategy::Immediate:
return std::make_unique<ImmediateLocalAggregateStorage>(
std::move(base_directory), fs, global_project_context_factory, metadata_builder,
per_project_reserved_bytes);
case StorageStrategy::Delayed:
return std::make_unique<DelayedLocalAggregateStorage>(
std::move(base_directory), fs, global_project_context_factory, metadata_builder,
per_project_reserved_bytes);
}
}
namespace {
const ReportDefinition* FindReportDefinition(const MetricDefinition& metric_definition,
uint32_t report_id) {
for (const ReportDefinition& report_def : metric_definition.reports()) {
if (report_def.id() == report_id) {
return &report_def;
}
}
return nullptr;
}
} // namespace
void LocalAggregateStorage::MigrateAggregationPeriodBucket(
AggregationPeriodBucket& bucket, util::TimeInfo time_info, int64_t timestamp,
const MetricDefinition& metric_definition, const ReportDefinition& report_def,
const MetadataBuilder& metadata_builder) {
// Get the system profile that was active at the time of the aggregated events.
SystemProfile system_profile =
metadata_builder.GetSystemProfile(metric_definition, report_def, time_info);
// Build the filtered system profile for this report.
SystemProfile filtered_system_profile;
MetadataBuilder::FilteredSystemProfile(report_def, system_profile, &filtered_system_profile);
uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());
// Add the new SystemProfileAggregate for this filtered system profile.
SystemProfileAggregate* system_profile_agg = bucket.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->set_first_seen_timestamp(timestamp);
system_profile_agg->set_last_seen_timestamp(timestamp);
// Copy the existing aggregate data into the new SystemProfileAggregate.
for (const auto& deprecated_by_event_code : bucket.deprecated_by_event_code()) {
*system_profile_agg->add_by_event_code() = deprecated_by_event_code;
}
// Remove the old deprecated aggregate data.
bucket.clear_deprecated_by_event_code();
// Ensure the filtered SystemProfile is present in the aggregate storage.
StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}
bool LocalAggregateStorage::MigrateReportAllData(AggregationPeriodBucket& bucket,
const MetricDefinition& metric_definition,
const ReportDefinition& report_def) {
lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> procedure_or =
AggregationProcedure::Get(metric_definition, report_def);
if (!procedure_or.ok()) {
LOG(ERROR) << "Failed to create aggregation procedure to migrate REPORT_ALL data for report "
<< metric_definition.id() << "-" << report_def.id() << ": " << procedure_or.status();
return false;
}
std::unique_ptr<AggregationProcedure> procedure = std::move(procedure_or).value();
if (procedure == nullptr) {
// This is a non cobalt 1.1 report type, should be silently ignored.
return false;
}
google::protobuf::RepeatedPtrField<SystemProfileAggregate>* system_profile_aggregates =
bucket.mutable_system_profile_aggregates();
auto system_profile_aggregates_it = system_profile_aggregates->begin();
// Merge all the other aggregates into the first entry.
SystemProfileAggregate& merged_system_profile_aggregate = *system_profile_aggregates_it;
++system_profile_aggregates_it;
while (system_profile_aggregates_it != system_profile_aggregates->end()) {
procedure->MergeSystemProfileAggregates(merged_system_profile_aggregate,
*system_profile_aggregates_it);
// After merging, remove the old aggregate from the repeated field.
system_profile_aggregates_it = system_profile_aggregates->erase(system_profile_aggregates_it);
}
return true;
}
bool LocalAggregateStorage::MigrateStoredData(MetricAggregate& metric,
const MetricDefinition& metric_definition,
const MetadataBuilder& metadata_builder) {
bool changed = false;
if (metric.deprecated_by_system_profile_size() > 0) {
// NOTE(fxbug.dev/87271): This is needed to clean up excess by_system_profiles in the aggregate
// store. Do not delete unless you are *certain* there are no devices in the wild that still
// need this cleanup
metric.clear_deprecated_by_system_profile();
changed = true;
LOG(INFO) << "Cleared the local aggregate data storage use of the DEPRECATED_by_system_profile "
"field.";
}
// Migrate from AggregationPeriodBucket.deprecated_by_event_code to use system_profile_aggregates.
// ReportAggregates for reports not found in the Registry are also removed.
auto* by_report_id = metric.mutable_by_report_id();
for (auto report_it = by_report_id->begin(); report_it != by_report_id->end();) {
uint32_t report_id = report_it->first;
ReportAggregate& report = report_it->second;
const ReportDefinition* report_def = FindReportDefinition(metric_definition, report_id);
if (report_def == nullptr) {
LOG(WARNING) << "Found report that is not present in the registry: ("
<< metric_definition.customer_id() << ", " << metric_definition.project_id()
<< ", " << metric_definition.id() << ", " << report_id << "): Deleting it.";
changed = true;
report_it = by_report_id->erase(report_it);
continue;
}
++report_it;
switch (report.time_period_case()) {
case ReportAggregate::kHourly:
for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) {
if (bucket.deprecated_by_event_code_size() > 0) {
changed = true;
MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromHourId(hour_id),
util::HourIdToUnixSeconds(hour_id), metric_definition,
*report_def, metadata_builder);
LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles "
"for report: "
<< report_id << " hour id: " << hour_id;
}
if (report_def->system_profile_selection() != REPORT_ALL &&
bucket.system_profile_aggregates_size() > 1) {
if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
changed = true;
LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
<< SystemProfileSelectionPolicy_Name(
report_def->system_profile_selection());
}
}
}
break;
case ReportAggregate::kDaily:
for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) {
if (bucket.deprecated_by_event_code_size() > 0) {
changed = true;
MigrateAggregationPeriodBucket(bucket, util::TimeInfo::FromDayIndex(day_index),
util::DayIndexToUnixSeconds(day_index),
metric_definition, *report_def, metadata_builder);
LOG(INFO) << "Migrated the local aggregate data to store SystemProfiles "
"for report: "
<< report_id << " day index: " << day_index;
}
if (report_def->system_profile_selection() != REPORT_ALL &&
bucket.system_profile_aggregates_size() > 1) {
if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
changed = true;
LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
<< SystemProfileSelectionPolicy_Name(
report_def->system_profile_selection());
}
}
}
break;
default:
// Ignore empty ReportAggregate.
break;
}
}
// Migration from deprecated_at_least_once boolean to new AtLeastOnce message.
// Must come after the migration to use system_profile_aggregates.
for (auto& [report_id, report] : *metric.mutable_by_report_id()) {
if (report.has_daily()) {
uint32_t last_day_index = report.daily().last_day_index();
for (auto& [day_index, bucket] : *report.mutable_daily()->mutable_by_day_index()) {
for (SystemProfileAggregate& system_profile_agg :
*bucket.mutable_system_profile_aggregates()) {
for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) {
if (data.data().deprecated_at_least_once()) {
changed = true;
data.mutable_data()->clear_deprecated_at_least_once();
data.mutable_data()->mutable_at_least_once()->set_at_least_once(true);
data.mutable_data()->mutable_at_least_once()->set_last_day_index(last_day_index);
LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce "
"message for report: "
<< report_id
<< " system profile hash: " << system_profile_agg.system_profile_hash()
<< " day index: " << day_index;
}
}
}
}
}
if (report.has_hourly()) {
for (auto& [hour_id, bucket] : *report.mutable_hourly()->mutable_by_hour_id()) {
for (SystemProfileAggregate& system_profile_agg :
*bucket.mutable_system_profile_aggregates()) {
for (EventCodesAggregateData& data : *system_profile_agg.mutable_by_event_code()) {
// Migration from deprecated_at_least_once boolean to new AtLeastOnce message.
if (data.data().deprecated_at_least_once()) {
changed = true;
data.mutable_data()->clear_deprecated_at_least_once();
data.mutable_data()->mutable_at_least_once()->set_at_least_once(true);
LOG(INFO) << "Migrated the local aggregate data to use the new AtLeastOnce "
"message for report: "
<< report_id
<< " system profile hash: " << system_profile_agg.system_profile_hash()
<< " hour id: " << hour_id;
}
}
}
}
}
}
return changed;
}
void LocalAggregateStorage::UpdateProjectSizeBy(lib::ProjectIdentifier proj,
int64_t size_increase) {
// If the size hasn't changed, nothing needs to be done.
if (size_increase == 0) {
return;
}
auto locked = byte_tracking_.lock();
locked->total += size_increase;
// Subtract current slush usage for project.
if (locked->total_per_project[proj] > per_project_reserved_bytes_) {
locked->total_slush -= locked->total_per_project[proj] - per_project_reserved_bytes_;
}
locked->total_per_project[proj] += size_increase;
// This shouldn't happen, but a negative total_per_project is very undesirable.
if (locked->total_per_project[proj] < 0) {
LOG(ERROR) << "More data was deleted from a project than was previously present. This "
"represents a bug in Cobalt.";
locked->total_per_project[proj] = 0;
}
// Add current slush usage for project.
if (locked->total_per_project[proj] > per_project_reserved_bytes_) {
locked->total_slush += locked->total_per_project[proj] - per_project_reserved_bytes_;
}
}
} // namespace cobalt::local_aggregation