blob: 8b4af42a4f412d869fab35dfd1d272ba253037e4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregation.h"
#include <algorithm>
#include <memory>
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/report_definition.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
LocalAggregation::LocalAggregation(
const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory,
system_data::SystemDataInterface &system_data, MetadataBuilder &metadata_builder,
util::FileSystem &fs, const logger::ObservationWriter &observation_writer,
util::CivilTimeConverterInterface *civil_time_converter,
logger::InternalMetrics *internal_metrics)
: global_project_context_factory_(global_project_context_factory),
aggregate_storage_(LocalAggregateStorage::New(
cfg.local_aggregate_store_strategy, cfg.local_aggregate_store_dir, fs,
global_project_context_factory, metadata_builder,
cfg.storage_quotas.per_project_reserved_bytes)),
observation_generator_(*aggregate_storage_, global_project_context_factory, system_data,
observation_writer, logger::PrivacyEncoder::MakeSecurePrivacyEncoder(),
civil_time_converter,
cfg.generate_observations_with_current_system_profile,
cfg.local_aggregation_backfill_days + 1),
storage_quotas_(cfg.storage_quotas),
internal_metrics_(internal_metrics) {
CHECK(SlushSize() > 0)
<< "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes "
"is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or "
"increase total_capacity_bytes.";
}
int64_t LocalAggregation::SlushUsed() const { return aggregate_storage_->SlushUsed(); }
int64_t LocalAggregation::SlushSize() const {
return storage_quotas_.total_capacity_bytes -
(storage_quotas_.per_project_reserved_bytes *
static_cast<int64_t>(global_project_context_factory_->ListProjects().size()));
}
bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const {
if (IsUnderQuota(project)) {
return true;
}
return SlushUsed() < SlushSize();
}
bool LocalAggregation::IsUnderQuota(lib::ProjectIdentifier project) const {
size_t project_used = aggregate_storage_->AmountStored(project);
return project_used < storage_quotas_.per_project_reserved_bytes;
}
Status LocalAggregation::AddEvent(const logger::EventRecord &event_record,
const std::chrono::system_clock::time_point &event_timestamp) {
using EventType = logger::LocalAggregationQuotaMetricDimensionEventType;
if (IsDisabled()) {
return Status::OkStatus();
}
lib::ProjectIdentifier proj = event_record.project_context()->Identifier();
if (!CanStore(proj)) {
internal_metrics_->LocalAggregationQuotaEvent(proj, EventType::Rejected);
return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
"There is not enough space to add event: ")
.AppendMsg(event_record)
.WithContext("project_bytes", aggregate_storage_->AmountStored(proj))
.WithContext("per_project_reserved_bytes", storage_quotas_.per_project_reserved_bytes)
.WithContext("SlushUsed", SlushUsed())
.WithContext("SlushSize", SlushSize())
.Build();
}
internal_metrics_->LocalAggregationQuotaEvent(
proj, IsUnderQuota(proj) ? EventType::Under : EventType::Over);
CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate,
aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier()));
// Get the unfiltered global system profile to log with this event.
const SystemProfile &unfiltered_system_profile = *event_record.system_profile();
for (const auto &report : event_record.metric()->reports()) {
CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure,
AggregationProcedure::Get(*event_record.metric(), report));
if (procedure) {
google::protobuf::Map<uint32_t, ReportAggregate> *by_report_id =
aggregate.aggregate()->mutable_by_report_id();
ReportAggregate &report_aggregate = (*by_report_id)[report.id()];
// Get the filtered global system profile for this report.
SystemProfile filtered_system_profile;
MetadataBuilder::FilteredSystemProfile(report, unfiltered_system_profile,
&filtered_system_profile);
// Calculate the hash of the final filtered system profile.
uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());
procedure->UpdateAggregate(event_record, report_aggregate, system_profile_hash,
event_timestamp);
// Make sure the final filtered system profile is stored.
aggregate.StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}
}
return aggregate.Save();
}
void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) {
system_clock_ = std::move(clock);
observation_generator_.Start(system_clock_.get());
}
void LocalAggregation::ShutDown() {
observation_generator_.ShutDown();
aggregate_storage_->ShutDown();
}
Status LocalAggregation::GenerateAggregatedObservations(
std::chrono::system_clock::time_point system_time) {
return observation_generator_.GenerateObservationsOnce(system_time);
}
void LocalAggregation::Disable(bool is_disabled) {
LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling")
<< " local aggregation.";
is_disabled_ = is_disabled;
}
} // namespace cobalt::local_aggregation