blob: 31d3b0593208d6edafef8a046592965582aca0cd [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/local_aggregation.h"
#include <algorithm>
#include <memory>
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/not_null.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation/civil_time_manager.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/clock_interfaces.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/report_definition.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
LocalAggregation::LocalAggregation(
const CobaltConfig &cfg, const logger::ProjectContextFactory &global_project_context_factory,
system_data::SystemDataInterface &system_data, util::FileSystem &fs,
const logger::ObservationWriter &observation_writer,
util::CivilTimeConverterInterface &civil_time_converter,
logger::InternalMetrics *internal_metrics)
: global_project_context_factory_(global_project_context_factory),
system_data_(system_data),
aggregate_storage_(LocalAggregateStorage::New(cfg.local_aggregate_store_strategy,
cfg.local_aggregate_store_dir, fs,
global_project_context_factory, system_data,
cfg.storage_quotas.per_project_reserved_bytes)),
civil_time_converter_(civil_time_converter),
observation_generator_(
*aggregate_storage_, global_project_context_factory, system_data, observation_writer,
logger::PrivacyEncoder::MakeSecurePrivacyEncoder(), civil_time_converter,
cfg.generate_observations_with_current_system_profile,
cfg.test_dont_backfill_empty_reports, cfg.local_aggregation_backfill_days + 1),
storage_quotas_(cfg.storage_quotas),
internal_metrics_(internal_metrics) {
CHECK(SlushSize() > 0)
<< "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes "
"is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or "
"increase total_capacity_bytes.";
}
int64_t LocalAggregation::SlushUsed() const { return aggregate_storage_->SlushUsed(); }
int64_t LocalAggregation::SlushSize() const {
return storage_quotas_.total_capacity_bytes -
(storage_quotas_.per_project_reserved_bytes *
static_cast<int64_t>(global_project_context_factory_.ListProjects().size()));
}
bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const {
if (IsUnderQuota(project)) {
return true;
}
return SlushUsed() < SlushSize();
}
bool LocalAggregation::IsUnderQuota(lib::ProjectIdentifier project) const {
size_t project_used = aggregate_storage_->AmountStored(project);
return project_used < storage_quotas_.per_project_reserved_bytes;
}
Status LocalAggregation::AddEvent(const logger::EventRecord &event_record,
const std::chrono::system_clock::time_point &event_timestamp) {
if (IsDisabled()) {
return Status::OkStatus();
}
lib::ProjectIdentifier proj = event_record.project_context()->Identifier();
if (!CanStore(proj)) {
internal_metrics_->LocalAggregationQuotaEvent(proj, 3);
return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
"There is not enough space to add event: ")
.AppendMsg(event_record)
.WithContext("project_bytes", aggregate_storage_->AmountStored(proj))
.WithContext("per_project_reserved_bytes", storage_quotas_.per_project_reserved_bytes)
.WithContext("SlushUsed", SlushUsed())
.WithContext("SlushSize", SlushSize())
.Build();
}
internal_metrics_->LocalAggregationQuotaEvent(proj, IsUnderQuota(proj) ? 1 : 2);
CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate,
aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier()));
CivilTimeManager civil_time_mgr(event_timestamp, civil_time_converter_);
// Get the unfiltered global system profile to log with this event.
const SystemProfile &unfiltered_system_profile = *event_record.system_profile();
for (const auto &report : event_record.metric()->reports()) {
if (report.max_release_stage() != ReleaseStage::RELEASE_STAGE_NOT_SET &&
system_data_.release_stage() > report.max_release_stage()) {
// Quietly ignore this report.
LOG_FIRST_N(INFO, 10) << "Not logging to report `"
<< event_record.project_context()->FullMetricName(
*event_record.metric())
<< "." << report.report_name() << "` because its max_release_stage ("
<< report.max_release_stage()
<< ") is lower than the device's current release_stage: "
<< system_data_.release_stage();
continue;
}
lib::StatusOr<util::NotNullUniquePtr<AggregationProcedure>> not_null_procedure =
AggregationProcedure::Get(event_record.project_context()->project().customer_name(),
event_record.project_context()->project().project_name(),
*event_record.metric(), report);
if (!not_null_procedure.ok()) {
continue;
}
util::PinnedUniquePtr<AggregationProcedure> procedure(std::move(not_null_procedure).value());
CB_ASSIGN_OR_RETURN(
ReportAggregate * report_aggregate,
procedure->GetReportAggregate(aggregate.aggregate(), report.id(), &civil_time_mgr));
// Get the filtered global system profile for this report.
SystemProfile filtered_system_profile;
encoder::SystemData::FilteredSystemProfile(report, unfiltered_system_profile,
&filtered_system_profile);
// Calculate the hash of the final filtered system profile.
uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());
procedure->UpdateAggregate(event_record, *report_aggregate, system_profile_hash,
event_timestamp);
// Make sure the final filtered system profile is stored.
aggregate.StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}
return aggregate.Save();
}
void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) {
system_clock_ = std::move(clock);
observation_generator_.Start(system_clock_.get());
}
void LocalAggregation::ShutDown() {
observation_generator_.ShutDown();
aggregate_storage_->ShutDown();
}
Status LocalAggregation::GenerateAggregatedObservations(
std::chrono::system_clock::time_point system_time) {
return observation_generator_.GenerateObservationsOnce(system_time);
}
void LocalAggregation::Disable(bool is_disabled) {
LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling")
<< " local aggregation.";
is_disabled_ = is_disabled;
}
} // namespace cobalt::local_aggregation