| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/local_aggregation.h" |
| |
| #include <algorithm> |
| #include <memory> |
| |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/hash.h" |
| #include "src/lib/util/not_null.h" |
| #include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation/civil_time_manager.h" |
| #include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/privacy_encoder.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/observation_batch.pb.h" |
| #include "src/public/lib/clock_interfaces.h" |
| #include "src/public/lib/registry_identifiers.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/registry/report_definition.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| LocalAggregation::LocalAggregation( |
| const CobaltConfig &cfg, const logger::ProjectContextFactory &global_project_context_factory, |
| system_data::SystemDataInterface &system_data, util::FileSystem &fs, |
| const logger::ObservationWriter &observation_writer, |
| util::CivilTimeConverterInterface &civil_time_converter, |
| logger::InternalMetrics *internal_metrics) |
| : global_project_context_factory_(global_project_context_factory), |
| system_data_(system_data), |
| aggregate_storage_(LocalAggregateStorage::New(cfg.local_aggregate_store_strategy, |
| cfg.local_aggregate_store_dir, fs, |
| global_project_context_factory, system_data, |
| cfg.storage_quotas.per_project_reserved_bytes)), |
| civil_time_converter_(civil_time_converter), |
| observation_generator_( |
| *aggregate_storage_, global_project_context_factory, system_data, observation_writer, |
| logger::PrivacyEncoder::MakeSecurePrivacyEncoder(), civil_time_converter, |
| cfg.generate_observations_with_current_system_profile, |
| cfg.test_dont_backfill_empty_reports, cfg.local_aggregation_backfill_days + 1), |
| storage_quotas_(cfg.storage_quotas), |
| internal_metrics_(internal_metrics) { |
| CHECK(SlushSize() > 0) |
| << "There is no space in slush! The number of cobalt customers * per_project_reserved_bytes " |
| "is greater than total_capacity_bytes. Please reduce per_project_reserved_bytes or " |
| "increase total_capacity_bytes."; |
| } |
| |
| int64_t LocalAggregation::SlushUsed() const { return aggregate_storage_->SlushUsed(); } |
| |
| int64_t LocalAggregation::SlushSize() const { |
| return storage_quotas_.total_capacity_bytes - |
| (storage_quotas_.per_project_reserved_bytes * |
| static_cast<int64_t>(global_project_context_factory_.ListProjects().size())); |
| } |
| |
| bool LocalAggregation::CanStore(lib::ProjectIdentifier project) const { |
| if (IsUnderQuota(project)) { |
| return true; |
| } |
| return SlushUsed() < SlushSize(); |
| } |
| |
| bool LocalAggregation::IsUnderQuota(lib::ProjectIdentifier project) const { |
| size_t project_used = aggregate_storage_->AmountStored(project); |
| return project_used < storage_quotas_.per_project_reserved_bytes; |
| } |
| |
| Status LocalAggregation::AddEvent(const logger::EventRecord &event_record, |
| const std::chrono::system_clock::time_point &event_timestamp) { |
| if (IsDisabled()) { |
| return Status::OkStatus(); |
| } |
| |
| lib::ProjectIdentifier proj = event_record.project_context()->Identifier(); |
| |
| if (!CanStore(proj)) { |
| internal_metrics_->LocalAggregationQuotaEvent(proj, 3); |
| return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED, |
| "There is not enough space to add event: ") |
| .AppendMsg(event_record) |
| .WithContext("project_bytes", aggregate_storage_->AmountStored(proj)) |
| .WithContext("per_project_reserved_bytes", storage_quotas_.per_project_reserved_bytes) |
| .WithContext("SlushUsed", SlushUsed()) |
| .WithContext("SlushSize", SlushSize()) |
| .Build(); |
| } |
| |
| internal_metrics_->LocalAggregationQuotaEvent(proj, IsUnderQuota(proj) ? 1 : 2); |
| |
| CB_ASSIGN_OR_RETURN(LocalAggregateStorage::MetricAggregateRef aggregate, |
| aggregate_storage_->GetMetricAggregate(event_record.MetricIdentifier())); |
| |
| CivilTimeManager civil_time_mgr(event_timestamp, civil_time_converter_); |
| |
| // Get the unfiltered global system profile to log with this event. |
| const SystemProfile &unfiltered_system_profile = *event_record.system_profile(); |
| |
| for (const auto &report : event_record.metric()->reports()) { |
| if (report.max_release_stage() != ReleaseStage::RELEASE_STAGE_NOT_SET && |
| system_data_.release_stage() > report.max_release_stage()) { |
| // Quietly ignore this report. |
| LOG_FIRST_N(INFO, 10) << "Not logging to report `" |
| << event_record.project_context()->FullMetricName( |
| *event_record.metric()) |
| << "." << report.report_name() << "` because its max_release_stage (" |
| << report.max_release_stage() |
| << ") is lower than the device's current release_stage: " |
| << system_data_.release_stage(); |
| continue; |
| } |
| |
| lib::StatusOr<util::NotNullUniquePtr<AggregationProcedure>> not_null_procedure = |
| AggregationProcedure::Get(event_record.project_context()->project().customer_name(), |
| event_record.project_context()->project().project_name(), |
| *event_record.metric(), report); |
| if (!not_null_procedure.ok()) { |
| continue; |
| } |
| |
| util::PinnedUniquePtr<AggregationProcedure> procedure(std::move(not_null_procedure).value()); |
| CB_ASSIGN_OR_RETURN( |
| ReportAggregate * report_aggregate, |
| procedure->GetReportAggregate(aggregate.aggregate(), report.id(), &civil_time_mgr)); |
| |
| // Get the filtered global system profile for this report. |
| SystemProfile filtered_system_profile; |
| encoder::SystemData::FilteredSystemProfile(report, unfiltered_system_profile, |
| &filtered_system_profile); |
| |
| // Calculate the hash of the final filtered system profile. |
| uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString()); |
| |
| procedure->UpdateAggregate(event_record, *report_aggregate, system_profile_hash, |
| event_timestamp); |
| |
| // Make sure the final filtered system profile is stored. |
| aggregate.StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile); |
| } |
| |
| return aggregate.Save(); |
| } |
| |
| void LocalAggregation::Start(std::unique_ptr<util::SystemClockInterface> clock) { |
| system_clock_ = std::move(clock); |
| observation_generator_.Start(system_clock_.get()); |
| } |
| |
| void LocalAggregation::ShutDown() { |
| observation_generator_.ShutDown(); |
| aggregate_storage_->ShutDown(); |
| } |
| |
| Status LocalAggregation::GenerateAggregatedObservations( |
| std::chrono::system_clock::time_point system_time) { |
| return observation_generator_.GenerateObservationsOnce(system_time); |
| } |
| |
| void LocalAggregation::Disable(bool is_disabled) { |
| LOG(INFO) << "LocalAggregation: " << (is_disabled ? "Disabling" : "Enabling") |
| << " local aggregation."; |
| is_disabled_ = is_disabled; |
| } |
| |
| } // namespace cobalt::local_aggregation |