blob: 3b9e2c292d36c287b21785411db213021e98759f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregation.h"
#include <memory>
#include "src/lib/statusor/status_macros.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
LocalAggregation::LocalAggregation(
const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory,
MetadataBuilder *metadata_builder, util::FileSystem *fs,
const logger::ObservationWriter *observation_writer)
: aggregate_storage_(LocalAggregateStorage::New(cfg.local_aggregate_store_strategy,
cfg.local_aggregate_store_dir, fs,
global_project_context_factory)),
metadata_builder_(metadata_builder),
observation_generator_(aggregate_storage_.get(), global_project_context_factory,
metadata_builder_, observation_writer,
logger::PrivacyEncoder::MakeSecurePrivacyEncoder()) {}
util::Status LocalAggregation::AddEvent(const logger::EventRecord &event_record) {
if (IsDisabled()) {
return util::Status::OK;
}
CB_ASSIGN_OR_RETURN(
LocalAggregateStorage::MetricAggregateRef aggregate,
aggregate_storage_->GetMetricAggregate(
event_record.project_context()->project().customer_id(),
event_record.project_context()->project().project_id(), event_record.metric()->id()));
for (const auto &report : event_record.metric()->reports()) {
auto procedure = AggregationProcedure::Get(*event_record.metric(), report);
if (procedure) {
google::protobuf::Map<uint32_t, ReportAggregate> *by_report_id =
aggregate.aggregate()->mutable_by_report_id();
ReportAggregate &report_aggregate = (*by_report_id)[report.id()];
procedure->UpdateAggregate(event_record, &report_aggregate);
}
}
return aggregate.Save();
}
void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) {
system_clock_ = std::move(clock);
observation_generator_.Start(system_clock_.get());
}
void LocalAggregation::ShutDown() { observation_generator_.ShutDown(); }
util::Status LocalAggregation::GenerateAggregatedObservations(util::TimeInfo utc,
util::TimeInfo local) {
return observation_generator_.GenerateObservationsOnce(utc, local);
}
void LocalAggregation::Disable(bool is_disabled) {
LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling")
<< " local aggregation.";
is_disabled_ = is_disabled;
}
} // namespace cobalt::local_aggregation