| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1.1/local_aggregation.h" |
| |
| #include <memory> |
| |
| #include "src/lib/statusor/status_macros.h" |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/local_aggregation_1.1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1.1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/privacy_encoder.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/pb/observation_batch.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| LocalAggregation::LocalAggregation( |
| const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory, |
| MetadataBuilder *metadata_builder, util::FileSystem *fs, const logger::Encoder *encoder, |
| const logger::ObservationWriter *observation_writer) |
| : aggregate_storage_(LocalAggregateStorage::New(cfg.local_aggregate_store_strategy, |
| cfg.local_aggregate_store_dir, fs, |
| global_project_context_factory)), |
| metadata_builder_(metadata_builder), |
| observation_generator_(aggregate_storage_.get(), encoder, global_project_context_factory, |
| metadata_builder_, observation_writer, |
| logger::PrivacyEncoder::MakeSecurePrivacyEncoder()) {} |
| |
| util::Status LocalAggregation::AddEvent(const logger::EventRecord &event_record) { |
| if (IsDisabled()) { |
| return util::Status::OK; |
| } |
| |
| CB_ASSIGN_OR_RETURN( |
| LocalAggregateStorage::MetricAggregateRef aggregate, |
| aggregate_storage_->GetMetricAggregate( |
| event_record.project_context()->project().customer_id(), |
| event_record.project_context()->project().project_id(), event_record.metric()->id())); |
| |
| for (const auto &report : event_record.metric()->reports()) { |
| auto procedure = AggregationProcedure::Get(*event_record.metric(), report); |
| if (procedure) { |
| google::protobuf::Map<uint32_t, ReportAggregate> *by_report_id = |
| aggregate.aggregate()->mutable_by_report_id(); |
| ReportAggregate &report_aggregate = (*by_report_id)[report.id()]; |
| procedure->UpdateAggregate(event_record, &report_aggregate); |
| } |
| } |
| |
| return aggregate.Save(); |
| } |
| |
| void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) { |
| system_clock_ = std::move(clock); |
| observation_generator_.Start(system_clock_.get()); |
| } |
| |
| void LocalAggregation::ShutDown() { observation_generator_.ShutDown(); } |
| |
| util::Status LocalAggregation::GenerateAggregatedObservations(util::TimeInfo utc, |
| util::TimeInfo local) { |
| return observation_generator_.GenerateObservationsOnce(utc, local); |
| } |
| |
| void LocalAggregation::Disable(bool is_disabled) { |
| LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling") |
| << " local aggregation."; |
| is_disabled_ = is_disabled; |
| } |
| |
| } // namespace cobalt::local_aggregation |