blob: 89e4678b059c32f06a90b18d86d6c7e124243740 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef COBALT_SRC_LOCAL_AGGREGATION_OBSERVATION_GENERATOR_H_
#define COBALT_SRC_LOCAL_AGGREGATION_OBSERVATION_GENERATOR_H_
#include <condition_variable>
#include <thread>
#include "src/lib/util/clock.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation/backfill_manager.h"
#include "src/local_aggregation/civil_time_manager.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/observation_writer.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/clock_interfaces.h"
#include "src/public/lib/status.h"
#include "src/registry/report_definition.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
const uint32_t kDefaultBackfillManagerDays = 3;
// ObservationGenerator manages the background task that is responsible for generating observations
// and writing them to |observation_writer| on a regular schedule (~once per hour).
//
// There should be only one ObservationGenerator per system.
//
// Once the system clock has become accurate, the Start() method should be called with the
// SystemClockInterface exactly once. After that point, the ObservationGenerator will generate
// observations on a regular schedule until ShutDown() is called.
class ObservationGenerator {
public:
// Constructor for ObservationGenerator
//
// |aggregate_storage|: The file-backed storage for MetricAggregate objects.
// |global_project_context_factory|: The current global registry.
// |system_data|: Used to retrieve global SystemProfile data.
// |observation_writer|: Used for writing generated observations to observation storage.
// |privacy_encoder|: Used for encoding generated observations using privacy algorithms.
// |civil_time_converter|: Converts a `time_point` to a civil time for a given time zone.
// |generate_observations_with_current_system_profile|: If set, any missing system profile fields
// should be taken from the current system profile.
// |dont_backfill_empty_reports|: If set, reports that have never had any events will be skipped.
// Only enable this in tests.
ObservationGenerator(LocalAggregateStorage& aggregate_storage,
const logger::ProjectContextFactory& global_project_context_factory,
system_data::SystemDataInterface& system_data,
const logger::ObservationWriter& observation_writer,
std::unique_ptr<logger::PrivacyEncoder> privacy_encoder,
util::CivilTimeConverterInterface& civil_time_converter,
bool generate_observations_with_current_system_profile,
bool test_dont_backfill_empty_reports,
uint32_t backfill_manager_days = kDefaultBackfillManagerDays);
// Start begins the background thread to generate observations on a fixed schedule. This method
// should only be called once when the system clock has become accurate.
void Start(util::SystemClockInterface* clock);
// ShutDown halts the background thread, allowing the ObservationGenerator to be destroyed.
void ShutDown();
// GenerateObservationsOnce iterates through all of the metrics/reports in the global registry and
// attempts to generate observations for them. Any generated observations will be written to the
// |observation_writer_|.
//
// |system_time|: The most recent time that should be considered for any report. In general,
// GenerateObservationsOnce generates observations for time periods that have
// ended before `system_time`. For expedited reports, observations are also
// generated for the time period containing `system_time`.
//
// This method is called automatically in the background thread by 'GenerateObservations()', but
// can be called manually while testing to avoid having to wait.
Status GenerateObservationsOnce(std::chrono::system_clock::time_point system_time);
void ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
internal_metrics_.reset(internal_metrics);
}
private:
void Run(util::SystemClockInterface* clock);
// GenerateObservations calculates TimeInfos for UTC and LOCAL timezones, and passes them in to
// InnerGenerateObservations().
Status GenerateObservations(std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time);
// GenerateObservationsForReportAggregate generates all needed observations for a given
// ReportAggregate, returning a Status::OK if all observations were generated.
Status GenerateObservationsForReportAggregate(
const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
AggregationProcedure& procedure, CivilTimeManager* civil_time_mgr,
const MetricDefinition& metric, const logger::MetricRef& metric_ref,
const ReportDefinition& report);
Status WriteObservations(ReportAggregate& report_aggregate, AggregationProcedure& procedure,
util::TimeInfo time_info, const logger::MetricRef& metric_ref,
const ReportDefinition& report,
const std::vector<std::unique_ptr<Observation>>& private_observations,
const SystemProfile& system_profile,
std::optional<uint64_t> commit_system_profile_hash);
const SystemProfile* GetSystemProfile(uint64_t system_profile_hash,
std::map<uint64_t, SystemProfile>* system_profile_cache,
const LocalAggregateStorage::MetricAggregateRef& aggregate,
const SystemProfile& current_filtered_system_profile) const;
// Merge any unset fields in |to| with the values in |from|.
static void MergeSystemProfiles(SystemProfile* to, const SystemProfile& from);
LocalAggregateStorage& aggregate_storage_;
const logger::ProjectContextFactory& global_project_context_factory_;
system_data::SystemDataInterface& system_data_;
const logger::ObservationWriter& observation_writer_;
logger::InternalMetricsPtr internal_metrics_;
std::unique_ptr<util::SteadyClockInterface> steady_clock_;
std::chrono::steady_clock::time_point next_generate_obs_;
std::chrono::seconds generate_obs_interval_;
BackfillManager backfill_manager_;
std::unique_ptr<logger::PrivacyEncoder> privacy_encoder_;
util::CivilTimeConverterInterface& civil_time_converter_;
bool generate_observations_with_current_system_profile_;
bool test_dont_backfill_empty_reports_;
std::thread worker_thread_;
struct ProtectedFields {
bool shut_down;
std::condition_variable_any wakeup_notifier;
};
util::ProtectedFields<ProtectedFields> protected_fields_;
};
} // namespace cobalt::local_aggregation
#endif // COBALT_SRC_LOCAL_AGGREGATION_OBSERVATION_GENERATOR_H_