| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_1_1_OBSERVATION_GENERATOR_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_1_1_OBSERVATION_GENERATOR_H_ |
| |
| #include <condition_variable> |
| #include <thread> |
| |
| #include "src/lib/util/clock.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/backfill_manager.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/logger/privacy_encoder.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/status.h" |
| #include "src/registry/report_definition.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| const uint32_t kDefaultBackfillManagerDays = 3; |
| |
| // ObservationGenerator manages the background task that is responsible for generating observations |
| // and writing them to |observation_writer| on a regular schedule (~once per hour). |
| // |
| // There should be only one ObservationGenerator per system. |
| // |
| // Once the system clock has become accurate, the Start() method should be called with the |
| // SystemClockInterface exactly once. After that point, the ObservationGenerator will generate |
| // observations on a regular schedule until ShutDown() is called. |
| class ObservationGenerator { |
| public: |
| // Constructor for ObservationGenerator |
| // |
| // |aggregate_storage|: The file-backed storage for MetricAggregate objects. |
| // |global_project_context_factory|: The current global registry. |
| // |system_data|: Used to retrieve global SystemProfile data. |
| // |observation_writer|: Used for writing generated observations to observation storage. |
| // |privacy_encoder|: Used for encoding generated observations using privacy algorithms. |
| // |generate_observations_with_current_system_profile|: If set, any missing system profile fields |
| // should be taken from the current system profile. |
| ObservationGenerator(LocalAggregateStorage* aggregate_storage, |
| const logger::ProjectContextFactory* global_project_context_factory, |
| system_data::SystemDataInterface* system_data, |
| const logger::ObservationWriter* observation_writer, |
| std::unique_ptr<logger::PrivacyEncoder> privacy_encoder, |
| bool generate_observations_with_current_system_profile, |
| uint32_t backfill_manager_days = kDefaultBackfillManagerDays); |
| |
| // Start begins the background thread to generate observations on a fixed schedule. This method |
| // should only be called once when the system clock has become accurate. |
| void Start(util::SystemClockInterface* clock); |
| |
| // ShutDown halts the background thread, allowing the ObservationGenerator to be destroyed. |
| void ShutDown(); |
| |
| // GenerateObservationsOnce iterates through all of the metrics/reports in the global registry and |
| // attempts to generate observations for them. Any generated observations will be written to the |
| // |observation_writer_|. |
| // |
| // This method is called automatically in the background thread by 'GenerateObservations()', but |
| // can be called manually while testing to avoid having to wait. |
| Status GenerateObservationsOnce(util::TimeInfo utc, util::TimeInfo local); |
| |
| private: |
| void Run(util::SystemClockInterface* clock); |
| |
| // GenerateObservations calculates TimeInfos for UTC and LOCAL timezones, and passes them in to |
| // InnerGenerateObservations(). |
| Status GenerateObservations(std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time); |
| |
| // GenerateObservationsForReportAggregate generates all needed observations for a given |
| // ReportAggregate, returning a Status::OK if all observations were generated. |
| Status GenerateObservationsForReportAggregate( |
| const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate, |
| AggregationProcedure* procedure, util::TimeInfo end_time_info, const MetricDefinition* metric, |
| const logger::MetricRef& metric_ref, const ReportDefinition& report); |
| |
| Status WriteObservations(ReportAggregate* report_aggregate, AggregationProcedure* procedure, |
| util::TimeInfo time_info, const logger::MetricRef& metric_ref, |
| const ReportDefinition& report, |
| const std::vector<std::unique_ptr<Observation>>& private_observations, |
| const SystemProfile& system_profile, |
| std::optional<uint64_t> commit_system_profile_hash); |
| |
| const SystemProfile* GetSystemProfile(uint64_t system_profile_hash, |
| std::map<uint64_t, SystemProfile>* system_profile_cache, |
| const LocalAggregateStorage::MetricAggregateRef& aggregate, |
| const SystemProfile& current_filtered_system_profile) const; |
| |
| // Merge any unset fields in |to| with the values in |from|. |
| static void MergeSystemProfiles(SystemProfile* to, const SystemProfile& from); |
| |
| LocalAggregateStorage* aggregate_storage_; |
| const logger::ProjectContextFactory* global_project_context_factory_; |
| system_data::SystemDataInterface* system_data_; |
| const logger::ObservationWriter* observation_writer_; |
| std::unique_ptr<util::SteadyClockInterface> steady_clock_; |
| std::chrono::steady_clock::time_point next_generate_obs_; |
| std::chrono::seconds generate_obs_interval_; |
| BackfillManager backfill_manager_; |
| std::unique_ptr<logger::PrivacyEncoder> privacy_encoder_; |
| bool generate_observations_with_current_system_profile_; |
| |
| std::thread worker_thread_; |
| struct ProtectedFields { |
| bool shut_down; |
| |
| std::condition_variable_any wakeup_notifier; |
| }; |
| util::ProtectedFields<ProtectedFields> protected_fields_; |
| }; |
| |
| } // namespace cobalt::local_aggregation |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_1_1_OBSERVATION_GENERATOR_H_ |