| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregation.h" |
| |
| #include <algorithm> |
| #include <memory> |
| |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/hash.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/privacy_encoder.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/pb/observation_batch.pb.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/registry/report_definition.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| LocalAggregation::LocalAggregation( |
| const CobaltConfig &cfg, const logger::ProjectContextFactory *global_project_context_factory, |
| system_data::SystemDataInterface *system_data, MetadataBuilder *metadata_builder, |
| util::FileSystem *fs, const logger::ObservationWriter *observation_writer, |
| util::CivilTimeConverterInterface *civil_time_converter) |
| : global_project_context_factory_(global_project_context_factory), |
| aggregate_storage_(LocalAggregateStorage::New( |
| cfg.local_aggregate_store_strategy, cfg.local_aggregate_store_dir, fs, |
| global_project_context_factory, metadata_builder, |
| cfg.storage_quotas.per_project_reserved_bytes)), |
| observation_generator_(aggregate_storage_.get(), global_project_context_factory, system_data, |
| observation_writer, logger::PrivacyEncoder::MakeSecurePrivacyEncoder(), |
| civil_time_converter, |
| cfg.generate_observations_with_current_system_profile, |
| cfg.local_aggregation_backfill_days + 1), |
| storage_quotas_(cfg.storage_quotas) { |
| CHECK(SlushSize() > 0) |
| << "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes " |
| "is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or " |
| "increase total_capacity_bytes."; |
| } |
| |
| int64_t LocalAggregation::SlushUsed() const { return aggregate_storage_->SlushUsed(); } |
| |
| int64_t LocalAggregation::SlushSize() const { |
| return storage_quotas_.total_capacity_bytes - |
| (storage_quotas_.per_project_reserved_bytes * |
| static_cast<int64_t>(global_project_context_factory_->ListProjects().size())); |
| } |
| |
| bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const { |
| size_t project_used = aggregate_storage_->AmountStored(project); |
| if (project_used < storage_quotas_.per_project_reserved_bytes) { |
| return true; |
| } |
| return SlushUsed() < SlushSize(); |
| } |
| |
| Status LocalAggregation::AddEvent(const logger::EventRecord &event_record, |
| const std::chrono::system_clock::time_point &event_timestamp) { |
| if (IsDisabled()) { |
| return Status::OkStatus(); |
| } |
| |
| lib::ProjectIdentifier proj = event_record.project_context()->Identifier(); |
| if (!CanStore(proj)) { |
| return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED, |
| "There is not enough space to add event: ") |
| .AppendMsg(event_record) |
| .WithContext("project_bytes", aggregate_storage_->AmountStored(proj)) |
| .WithContext("per_project_reserved_bytes", storage_quotas_.per_project_reserved_bytes) |
| .WithContext("SlushUsed", SlushUsed()) |
| .WithContext("SlushSize", SlushSize()) |
| .Build(); |
| } |
| |
| CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate, |
| aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier())); |
| |
| // Get the unfiltered global system profile to log with this event. |
| const SystemProfile &unfiltered_system_profile = *event_record.system_profile(); |
| |
| for (const auto &report : event_record.metric()->reports()) { |
| CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure, |
| AggregationProcedure::Get(*event_record.metric(), report)); |
| if (procedure) { |
| google::protobuf::Map<uint32_t, ReportAggregate> *by_report_id = |
| aggregate.aggregate()->mutable_by_report_id(); |
| ReportAggregate &report_aggregate = (*by_report_id)[report.id()]; |
| |
| // Get the filtered global system profile for this report. |
| SystemProfile filtered_system_profile; |
| MetadataBuilder::FilteredSystemProfile(report, unfiltered_system_profile, |
| &filtered_system_profile); |
| |
| // Calculate the hash of the final filtered system profile. |
| uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString()); |
| |
| procedure->UpdateAggregate(event_record, &report_aggregate, system_profile_hash, |
| event_timestamp); |
| |
| // Make sure the final filtered system profile is stored. |
| aggregate.StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile); |
| } |
| } |
| |
| return aggregate.Save(); |
| } |
| |
| void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) { |
| system_clock_ = std::move(clock); |
| observation_generator_.Start(system_clock_.get()); |
| } |
| |
| void LocalAggregation::ShutDown() { |
| observation_generator_.ShutDown(); |
| aggregate_storage_->ShutDown(); |
| } |
| |
| Status LocalAggregation::GenerateAggregatedObservations( |
| std::chrono::system_clock::time_point system_time) { |
| return observation_generator_.GenerateObservationsOnce(system_time); |
| } |
| |
| void LocalAggregation::Disable(bool is_disabled) { |
| LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling") |
| << " local aggregation."; |
| is_disabled_ = is_disabled; |
| } |
| |
| } // namespace cobalt::local_aggregation |