| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/observation_generator.h" |
| |
| #include <chrono> |
| #include <map> |
| #include <optional> |
| #include <thread> |
| |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation/backfill_manager.h" |
| #include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/pb/observation_batch.pb.h" |
| #include "src/public/lib/status_codes.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/registry/window_size.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| const std::chrono::seconds kDefaultGenerateObsInterval = std::chrono::hours(1); |
| // Wait at least 3 minutes from startup before running the first observation generation. |
| // This allows expedited device count metrics to be logged and sent quickly for devices that |
| // won't be up for very long. |
| const std::chrono::seconds kDefaultInitialWaitToGenerate = std::chrono::minutes(3); |
| |
| ObservationGenerator::ObservationGenerator( |
| LocalAggregateStorage& aggregate_storage, |
| const logger::ProjectContextFactory& global_project_context_factory, |
| system_data::SystemDataInterface& system_data, |
| const logger::ObservationWriter& observation_writer, |
| std::unique_ptr<logger::PrivacyEncoder> privacy_encoder, |
| util::CivilTimeConverterInterface& civil_time_converter, |
| bool generate_observations_with_current_system_profile, bool test_dont_backfill_empty_reports, |
| uint32_t backfill_manager_days) |
| : aggregate_storage_(aggregate_storage), |
| global_project_context_factory_(global_project_context_factory), |
| system_data_(system_data), |
| observation_writer_(observation_writer), |
| steady_clock_(new util::SteadyClock()), |
| generate_obs_interval_(kDefaultGenerateObsInterval), |
| backfill_manager_(backfill_manager_days), |
| privacy_encoder_(std::move(privacy_encoder)), |
| civil_time_converter_(civil_time_converter), |
| generate_observations_with_current_system_profile_( |
| generate_observations_with_current_system_profile), |
| test_dont_backfill_empty_reports_(test_dont_backfill_empty_reports) { |
| next_generate_obs_ = steady_clock_->now() + kDefaultInitialWaitToGenerate; |
| } |
| |
| void ObservationGenerator::Start(util::SystemClockInterface* clock) { |
| auto locked = protected_fields_.lock(); |
| locked->shut_down = false; |
| LOG(INFO) << "Starting ObservationGenerator Worker Thread"; |
| worker_thread_ = std::thread([this, clock]() mutable { this->Run(clock); }); |
| } |
| |
| void ObservationGenerator::ShutDown() { |
| if (worker_thread_.joinable()) { |
| { |
| auto locked = protected_fields_.lock(); |
| locked->shut_down = true; |
| locked->wakeup_notifier.notify_all(); |
| } |
| worker_thread_.join(); |
| } else { |
| protected_fields_.lock()->shut_down = true; |
| } |
| } |
| |
| void ObservationGenerator::Run(util::SystemClockInterface* clock) { |
| LOG(INFO) << "ObservationGenerator Worker Thread running"; |
| auto locked = protected_fields_.lock(); |
| while (true) { |
| // Exit if a shutdown has been requested. |
| if (locked->shut_down) { |
| return; |
| } |
| |
| // Sleep until the next scheduled observation generation or until notified of a shutdown. |
| locked->wakeup_notifier.wait_until(locked, next_generate_obs_, |
| [&locked]() { return locked->shut_down; }); |
| |
| const Status status = GenerateObservations(clock->now(), steady_clock_->now()); |
| if (!status.ok()) { |
| LOG(ERROR) << "Error occurred while generating observations: " << status; |
| } |
| } |
| } |
| |
| Status ObservationGenerator::GenerateObservations( |
| std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time) { |
| VLOG(5) << "ObservationGenerator::GenerateObservations"; |
| |
| if (steady_time >= next_generate_obs_) { |
| next_generate_obs_ += generate_obs_interval_; |
| return GenerateObservationsOnce(system_time); |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status ObservationGenerator::GenerateObservationsOnce( |
| std::chrono::system_clock::time_point system_time) { |
| logger::InternalMetrics::InternalMetricsFlusher flusher = internal_metrics_->Flusher(); |
| CivilTimeManager civil_time_mgr(system_time, civil_time_converter_); |
| util::TimeInfo utc_time_info = civil_time_mgr.GetInitialUtc(); |
| LOG(INFO) << "Generating aggregated observations for periods ending before system time: " |
| << std::chrono::system_clock::to_time_t(system_time) << " (day index " |
| << utc_time_info.day_index << ", hour ID " << utc_time_info.hour_id << " in UTC)"; |
| std::vector<Status> generation_errors; |
| for (lib::ProjectIdentifier project_identifier : global_project_context_factory_.ListProjects()) { |
| std::unique_ptr<logger::ProjectContext> project = |
| global_project_context_factory_.NewProjectContext(project_identifier); |
| |
| for (lib::MetricIdentifier metric_identifier : project->ListMetrics()) { |
| const MetricDefinition* metric = project->GetMetric(metric_identifier); |
| lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> aggregate_or = |
| aggregate_storage_.GetMetricAggregate(metric_identifier); |
| if (!aggregate_or.ok()) { |
| generation_errors.push_back(aggregate_or.status()); |
| continue; |
| } |
| LocalAggregateStorage::MetricAggregateRef aggregate = std::move(aggregate_or.value()); |
| logger::MetricRef metric_ref = project->RefMetric(metric); |
| |
| for (const ReportDefinition& report : metric->reports()) { |
| CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure, |
| AggregationProcedure::Get(*metric, report)); |
| |
| if (procedure) { |
| if (test_dont_backfill_empty_reports_ && |
| aggregate.aggregate()->mutable_by_report_id()->count(report.id()) == 0) { |
| continue; |
| } |
| |
| ReportAggregate& report_aggregate = |
| (*aggregate.aggregate()->mutable_by_report_id())[report.id()]; |
| |
| Status status = |
| GenerateObservationsForReportAggregate(aggregate, report_aggregate, *procedure, |
| &civil_time_mgr, *metric, metric_ref, report); |
| |
| if (!status.ok()) { |
| generation_errors.push_back(status); |
| } |
| } |
| } |
| |
| aggregate.Save(); |
| } |
| } |
| |
| Status status = aggregate_storage_.GarbageCollection(); |
| if (!status.ok()) { |
| generation_errors.push_back(status); |
| } |
| |
| if (!generation_errors.empty()) { |
| // Set the status code to the error that first occurred during the generation of observations. |
| util::StatusBuilder sb(generation_errors.at(0).error_code(), "Encountered "); |
| sb.AppendMsg(generation_errors.size()).AppendMsg(" errors while generating observations"); |
| for (const Status& error : generation_errors) { |
| sb.WithContext("error", error); |
| } |
| return sb.Build(); |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status ObservationGenerator::GenerateObservationsForReportAggregate( |
| const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate, |
| AggregationProcedure& procedure, CivilTimeManager* civil_time_mgr, |
| const MetricDefinition& metric, const logger::MetricRef& metric_ref, |
| const ReportDefinition& report) { |
| SystemProfile current_filtered_system_profile; |
| MetadataBuilder::FilteredSystemProfile(report, system_data_.system_profile(), |
| ¤t_filtered_system_profile); |
| std::map<uint64_t, SystemProfile> system_profile_cache; |
| |
| CB_ASSIGN_OR_RETURN(std::vector<util::TimeInfo> to_generate, |
| backfill_manager_.CalculateBackfill( |
| procedure.GetLastTimeInfo(report_aggregate), civil_time_mgr, metric, |
| procedure.IsDaily(), procedure.IsExpedited())); |
| |
| for (util::TimeInfo time_info : to_generate) { |
| CB_ASSIGN_OR_RETURN(std::vector<ObservationAndSystemProfile> observations, |
| procedure.GenerateObservations(time_info, report_aggregate)); |
| |
| if (!observations.empty()) { |
| for (ObservationAndSystemProfile& observation : observations) { |
| const SystemProfile* system_profile = |
| GetSystemProfile(observation.system_profile_hash, &system_profile_cache, aggregate, |
| current_filtered_system_profile); |
| |
| CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations, |
| privacy_encoder_->MaybeMakePrivateObservations( |
| std::move(observation.observation), metric, report)); |
| |
| CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref, |
| report, private_observations, *system_profile, |
| observation.system_profile_hash)); |
| } |
| } else { |
| // Allow the privacy encoder a chance to create a fabricated observation. |
| CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations, |
| privacy_encoder_->MaybeMakePrivateObservations(nullptr, metric, report)); |
| |
| // Use the current system profile if any fabricated private observations are created. |
| // TODO(fxbug.dev/92955): choose plausible SystemProfiles for fabricated observations. |
| CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref, |
| report, private_observations, |
| current_filtered_system_profile, std::nullopt)); |
| } |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status ObservationGenerator::WriteObservations( |
| ReportAggregate& report_aggregate, AggregationProcedure& procedure, util::TimeInfo time_info, |
| const logger::MetricRef& metric_ref, const ReportDefinition& report, |
| const std::vector<std::unique_ptr<Observation>>& private_observations, |
| const SystemProfile& system_profile, std::optional<uint64_t> commit_system_profile_hash) { |
| bool is_contribution = true; |
| for (const std::unique_ptr<Observation>& private_observation : private_observations) { |
| if (private_observation == nullptr) { |
| continue; |
| } |
| CB_RETURN_IF_ERROR(observation_writer_.WriteObservation( |
| private_observation, |
| MetadataBuilder::BuildFromProfile(metric_ref, report, time_info.day_index, system_profile), |
| is_contribution)); |
| is_contribution = false; |
| } |
| // Mark the aggregated data as having been sent. |
| procedure.ObservationsCommitted(report_aggregate, time_info, commit_system_profile_hash); |
| return Status::OkStatus(); |
| } |
| |
| const SystemProfile* ObservationGenerator::GetSystemProfile( |
| uint64_t system_profile_hash, std::map<uint64_t, SystemProfile>* system_profile_cache, |
| const LocalAggregateStorage::MetricAggregateRef& aggregate, |
| const SystemProfile& current_filtered_system_profile) const { |
| auto system_profile_cache_it = system_profile_cache->find(system_profile_hash); |
| if (system_profile_cache_it != system_profile_cache->end()) { |
| return &system_profile_cache_it->second; |
| } |
| lib::statusor::StatusOr<SystemProfile> aggregate_profile = |
| aggregate.RetrieveFilteredSystemProfile(system_profile_hash); |
| if (!aggregate_profile.ok()) { |
| LOG(ERROR) << "Failed to retrieve system profile from local aggregation, falling back to using " |
| "current system profile for metric: " |
| << aggregate.DebugString(); |
| } |
| SystemProfile system_profile = aggregate_profile.ValueOr(current_filtered_system_profile); |
| if (generate_observations_with_current_system_profile_) { |
| SystemProfile current_system_profile = current_filtered_system_profile; |
| MergeSystemProfiles(¤t_system_profile, system_profile); |
| |
| system_profile_cache->emplace(system_profile_hash, std::move(current_system_profile)); |
| } else { |
| system_profile_cache->emplace(system_profile_hash, std::move(system_profile)); |
| } |
| return &system_profile_cache->at(system_profile_hash); |
| } |
| |
| void ObservationGenerator::MergeSystemProfiles(SystemProfile* to, const SystemProfile& from) { |
| // From |
| // https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.message#Message.MergeFrom.details: |
| // Singular fields will be overwritten, if specified in from, except for embedded messages which |
| // will be merged. Repeated fields will be concatenated. |
| // |
| // Concatenating repeated fields is undesirable here. Instead, replace the repeated field data. |
| if (from.experiment_ids_size() > 0) { |
| to->clear_experiment_ids(); |
| } |
| to->MergeFrom(from); |
| } |
| |
| } // namespace cobalt::local_aggregation |