// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <condition_variable>
#include <memory>
#include <string>
#include <thread>
#include "./event.pb.h"
#include "./logging.h"
#include "./observation2.pb.h"
#include "config/metric_definition.pb.h"
#include "config/report_definition.pb.h"
#include "logger/encoder.h"
#include "logger/event_record.h"
#include "logger/local_aggregation.pb.h"
#include "logger/observation_writer.h"
#include "logger/status.h"
#include "util/clock.h"
#include "util/consistent_proto_store.h"
#include "util/protected_fields.h"
namespace cobalt {
namespace logger {
// Maximum value of |backfill_days| allowed by the constructor of
// EventAggregator.
static const size_t kEventAggregatorMaxAllowedBackfillDays = 1000;
// EventAggregator::GenerateObservations() ignores all aggregation window
// sizes larger than this value.
static const size_t kMaxAllowedAggregationWindowSize = 365;
// The number of seconds in an hour.
static const size_t kHour = 3600;
// The number of seconds in a day.
static const size_t kDay = kHour * 24;
// The EventAggregator class manages an in-memory store of aggregated values
// of Events logged for locally aggregated report types. For each day, this
// LocalAggregateStore contains an aggregate of the values of logged Events of
// a given event code over that day. These daily aggregates are used to
// produce Observations of values aggregated over a rolling window of size
// specified in the ReportDefinition.
// Each Logger interacts with the EventAggregator in the following way:
// (1) When the Logger is created, it calls UpdateAggregationConfigs() to
// provide the EventAggregator with the configurations of its locally
// aggregated reports.
// (2) When logging an Event for a locally aggregated report, a Logger calls an
// Update*Aggregation() method with the Event and the ReportAggregationKey of
// the report.
// A worker thread does the following tasks at intervals specified in the
// EventAggregator's constructor:
// (1) Backs up the EventAggregator's state to the file system.
// (2) Calls GenerateObservations() with the previous day's day index to
// generate all Observations for rolling windows ending on that day index,
// as well as any missing Observations for a specified number of days in the
// past.
// (3) Calls GarbageCollect() to delete daily aggregates which are not
// needed to compute aggregates for any windows of interest in the future.
class EventAggregator {
// Constructs an EventAggregator.
// An EventAggregator maintains daily aggregates of Events in a
// LocalAggregateStore, uses an Encoder to generate Observations for rolling
// windows ending on a specified day index, and writes the Observations to
// an ObservationStore using an ObservationWriter.
// encoder: the singleton instance of an Encoder on the system.
// local_aggregate_proto_store: A ConsistentProtoStore to be used by the
// EventAggregator to store snapshots of its in-memory store of event
// aggregates.
// obs_history_proto_store: A ConsistentProtoStore to be used by the
// EventAggregator to store snapshots of its in-memory history of generated
// Observations.
// backfill_days: the number of past days for which the EventAggregator
// generates and sends Observations, in addition to a requested day index.
// See the comment above EventAggregator::GenerateObservations for more
// detail. The constructor CHECK-fails if a value larger than
// |kEventAggregatorMaxAllowedBackfillDays| is passed.
// aggregate_backup_interval: the interval in seconds at which a snapshot of
// the in-memory store of event aggregates should be written to
// |local_aggregate_proto_store|.
// generate_obs_interval: the interval in seconds at which the
// EventAggregator should generate Observations.
// gc_interval: the interval in seconds at which the LocalAggregateStore
// should be garbage-collected.
// The constructor CHECK-fails if the value of |aggregate_backup_interval| is
// larger than either of |generate_obs_interval| or |gc_interval|. In
// practice, the value of |aggregate_backup_interval| should be small relative
// to the values of |generate_obs_interval| and |gc_interval|, since each of
// Observation generation and garbage collection will be done at the smallest
// multiple of |aggregate_backup_interval| which is greater than or equal to
// its specified interval.
const Encoder* encoder, const ObservationWriter* observation_writer,
util::ConsistentProtoStore* local_aggregate_proto_store,
util::ConsistentProtoStore* obs_history_proto_store,
const size_t backfill_days = 0,
const std::chrono::seconds aggregate_backup_interval =
const std::chrono::seconds generate_obs_interval =
const std::chrono::seconds gc_interval = std::chrono::seconds(kDay));
// Shut down the worker thread before destructing the EventAggregator.
~EventAggregator() { ShutDown(); }
// Start the worker thread.
void Start();
// Updates the EventAggregator's view of the set of locally aggregated
// report configurations.
// This method may be called multiple times during the EventAggregator's
// lifetime. If the EventAggregator is provided with a report whose tuple
// (customer ID, project ID, metric ID, report ID) matches that of a
// previously provided report, then the new report is ignored, even if other
// properties of the customer, Project, MetricDefinition, or
// ReportDefinition differ from those of the existing report.
Status UpdateAggregationConfigs(const ProjectContext& project_context);
// Logs an Event associated to a report of type UNIQUE_N_DAY_ACTIVES to the
// EventAggregator.
// report_id: the ID of the report associated to the logged Event.
// event_record: an EventRecord wrapping an Event of type OccurrenceEvent
// and the MetricDefinition for which the Event is to be logged.
// Returns kOK if the LocalAggregateStore was successfully updated, and
// kInvalidArguments if either a lookup key corresponding to |report_id| was
// not found in the LocalAggregateStore, or if the Event wrapped by
// EventRecord is not of type OccurrenceEvent.
// The EventAggregator does not explicitly validate the event code of
// the logged Event, and if the event code is larger than the associated
// metric's max_event_code then the EventAggregator will form and store an
// aggregate map for that event code just as it does for a valid event code.
// However, Observations will only be generated for valid event codes, and
// aggregates associated with invalid event codes will be garbage-collected
// together with valid aggregates when EventAggregator::GarbageCollect() is
// called.
Status LogUniqueActivesEvent(uint32_t report_id, EventRecord* event_record);
// Checks that the worker thread is shut down, and if so, calls the private
// method GenerateObservations() and returns its result. Returns kOther if the
// worker thread is joinable. See the documentation on GenerateObservations()
// for a description of the parameters.
// This method is intended for use in tests which require a single thread to
// both log events to and generate Observations from an EventAggregator.
Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
uint32_t final_day_index_local = 0u);
friend class LoggerTest;
friend class EventAggregatorTest;
friend class EventAggregatorWorkerTest;
// Request that the worker thread shut down and wait for it to exit. The
// worker thread backs up the LocalAggregateStore before exiting.
void ShutDown();
// Main loop executed by the worker thread. The thread sleeps for
// |aggregate_backup_interval_| seconds or until notified of shutdown, then
// calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
// DoScheduledTasks() and schedules the next occurrence of any completed
// tasks.
void Run();
// Helper method called by Run(). If |next_generate_obs_| is less than or
// equal to |current_time|, calls GenerateObservations() with the day index of
// the previous day in each of UTC and local time, and then backs up the
// history of generated Observations. If |next_gc_| is less than or equal to
// |current_time|, calls GarbageCollect() with the day index of the previous
// day in each of UTC and local time and then backs up the
// LocalAggregateStore. In each case, an error is logged and execution
// continues if the operation fails.
void DoScheduledTasks(std::chrono::system_clock::time_point current_time);
// Writes a snapshot of the LocalAggregateStore to
// |local_aggregate_proto_store_|. Does not assume that the caller holds the
// mutex of |protected_aggregate_store_|.
Status BackUpLocalAggregateStore();
// Writes a snapshot of |obs_history_|to |obs_history_proto_store_|.
Status BackUpObservationHistory();
// Generates one or more Observations for all of the registered locally
// aggregated reports known to this EventAggregator, for rolling windows
// ending on specified day indices.
// Each MetricDefinition specifies a time zone policy, which determines the
// day index for which an Event associated with that MetricDefinition is
// logged. For all MetricDefinitions whose Events are logged with respect to
// UTC, this method generates Observations for rolling windows ending on
// |final_day_index_utc|. For all MetricDefinitions whose Events are logged
// with respect to local time, this method generates Observations for rolling
// windows ending on |final_day_index_local|. If |final_day_index_local| is
// not specified, it is assumed that the local time zone is UTC.
// The generated Observations are written to the
// |observation_writer| passed to the constructor. Does not assume that the
// caller holds the mutex of |protected_aggregate_store_|.
// This class maintains a history of generated Observations and this method
// additionally performs backfill: Observations are also generated for
// rolling windows ending on any day in the interval [final_day_index -
// backfill_days, final_day_index] (where final_day_index is either
// final_day_index_utc or final_day_index_local, depending on the time zone
// policy of the associated MetricDefinition), if this history indicates they
// were not previously generated. Does not generate duplicate Observations
// when called multiple times with the same day index.
// Observations are not generated for aggregation windows larger than
// |kMaxAllowedAggregationWindowSize|.
Status GenerateObservations(uint32_t final_day_index_utc,
uint32_t final_day_index_local = 0u);
// Removes from the LocalAggregateStore all daily aggregates which are too
// old to contribute to their parent report's largest rolling window on the
// day which is |backfill_days| before |day_index_utc| (if the parent
// MetricDefinitions' time zone policy is UTC) or which is |backfill_days|
// before |day_index_local| (if the parent MetricDefinition's time zone policy
// is LOCAL). If |day_index_local| is not specified, then it is assumed that
// the local time zone is UTC.
// If |day_index| is the latest day index for which GarbageCollect() has
// been applied for a given report, then the LocalAggregateStore contains the
// data needed to generate Observations for that report for |day_index + k|
// for any k >= 0.
// Does not assume that the caller holds the mutex of
// |protected_aggregate_store_|.
Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u);
// Returns the most recent day index for which an Observation was generated
// for a given report, event code, and window size, according to
// |obs_history_|. Returns 0 if no Observation has been generated for the
// given arguments.
uint32_t LastGeneratedDayIndex(const std::string& report_key,
uint32_t event_code,
uint32_t window_size) const;
// For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
// for each event code of the parent metric, for each window size of the
// report, for the window of that size ending on |final_day_index|, unless
// an Observation with those parameters was generated in the past. Also
// generates Observations for days in the backfill period if needed.
// Writes the Observations to an ObservationStore via the ObservationWriter
// that was passed to the constructor.
// Observations are not generated for aggregation windows larger than
// |kMaxAllowedAggregationWindowSize|.
Status GenerateUniqueActivesObservations(
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t num_event_codes,
uint32_t final_day_index);
// Helper method called by GenerateUniqueActivesObservations() to generate
// and write a single Observation.
Status GenerateSingleUniqueActivesObservation(const MetricRef metric_ref,
const ReportDefinition* report,
uint32_t obs_day_index,
uint32_t event_code,
uint32_t window_size,
bool was_active) const;
// Returns a copy of the LocalAggregateStore. Does not assume that the
// caller holds the mutex of |protected_aggregate_store_|.
LocalAggregateStore CopyLocalAggregateStore() {
auto local_aggregate_store =
return local_aggregate_store;
// Sets the EventAggregator's ClockInterface. Only for use in tests.
void SetClock(util::ClockInterface* clock) { clock_.reset(clock); }
struct AggregateStoreFields {
LocalAggregateStore local_aggregate_store;
struct ShutDownFlag {
bool shut_down = true;
std::condition_variable shutdown_notifier;
const Encoder* encoder_;
const ObservationWriter* observation_writer_;
util::ConsistentProtoStore* local_aggregate_proto_store_;
util::ConsistentProtoStore* obs_history_proto_store_;
util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
// Not protected by a mutex. Should only be accessed by |worker_thread_|.
AggregatedObservationHistory obs_history_;
size_t backfill_days_ = 0;
std::thread worker_thread_;
util::ProtectedFields<ShutDownFlag> protected_shutdown_flag_;
std::chrono::seconds aggregate_backup_interval_;
std::chrono::seconds generate_obs_interval_;
std::chrono::seconds gc_interval_;
std::chrono::system_clock::time_point next_generate_obs_;
std::chrono::system_clock::time_point next_gc_;
std::unique_ptr<util::ClockInterface> clock_;
} // namespace logger
} // namespace cobalt