blob: 8e26df351bf80077f3633ce1c42b6589c51824cc [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include <condition_variable>
#include <cstddef>
#include <memory>
#include <string>
#include <thread>
#include "src/lib/util/clock.h"
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/protected_fields.h"
#include "src/local_aggregation/aggregate_store.h"
#include "src/local_aggregation/event_aggregator.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/encoder.h"
#include "src/logger/observation_writer.h"
#include "src/logger/status.h"
#include "src/public/cobalt_config.h"
namespace cobalt {
constexpr int64_t kHoursInADay = 24;
// Forward declaration used for friend tests. These will be removed once a better solution is
// designed.
// TODO(ninai): remove this
namespace internal {
class RealLoggerFactory;
} // namespace internal
namespace local_aggregation {
// Class responsible for managing memory, threading, locks, time and the main loop.
// A worker thread calls on AggregateStore methods to do the following tasks at intervals
// specified in the EventAggregator's constructor:
// (1) Calls BackUp*() to back up the EventAggregator's state to the file system.
// (2) Calls GenerateObservations() with the previous day's day index to generate all Observations
// for rolling windows ending on that day index, as well as any missing Observations for a specified
// number of days in the past.
// (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates
// for any windows of interest in the future.
class EventAggregatorManager {
// Constructs a class to manage local aggregation and provide EventAggregators.
// An EventAggregator maintains daily aggregates of Events in a LocalAggregateStore, uses an
// Encoder to generate Observations for rolling windows ending on a specified day index, and
// writes the Observations to an ObservationStore using an ObservationWriter.
// cfg: The CobaltConfig with information for constructing the EventAggregatorManager.
// fs: An instance of util::FileSystem used for interacting with the filesystem.
// encoder: The singleton instance of an Encoder on the system.
// observation_writer: The interface EventAggregator can use to write aggregated observations.
EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem* fs,
const logger::Encoder* encoder,
const logger::ObservationWriter* observation_writer);
// Shut down the worker thread before destructing the EventAggregatorManager.
~EventAggregatorManager() { ShutDown(); }
// Starts the worker thread.
// |clock| The clock that should be used by the worker thread for scheduling tasks and determining
// the current day and hour. On systems on which the clock may be initially inaccurate,
// the caller should wait to invoke this method until after it is known that the clock is
// accurate.
void Start(std::unique_ptr<util::SystemClockInterface> clock);
// Resets the state of the local aggregator.
// TODO(pesk): also clear the contents of the ConsistentProtoStores if we
// implement a mode which uses them.
void Reset();
// Returns a pointer to an EventAggregator to be used for logging.
EventAggregator* GetEventAggregator() { return event_aggregator_.get(); }
// Checks that the worker thread is shut down, and if so, triggers an out of schedule
// AggregateStore::GenerateObservations() and returns its result. Returns kOther if the
// worker thread is joinable. See the documentation on AggregateStore::GenerateObservations()
// for a description of the parameters.
// This method is intended for use in the Cobalt testapps which require a single thread to
// both log events to and generate Observations from an EventAggregator.
logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
uint32_t final_day_index_local = 0u);
// Returns a count of the number of times the loop in the Run function has run. This is likely
// only useful in testing to verify that the worker thread is not running too frequently.
uint64_t num_runs() const { return num_runs_; }
void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); }
void DeleteData() {
void ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
internal_metrics_ = internal_metrics;
friend class TestEventAggregatorManager;
friend class EventAggregatorManagerTest;
friend class CobaltControllerImpl;
friend class AggregateStoreTest;
friend class AggregateStoreWorkerTest;
friend class EventAggregatorTest;
friend class EventAggregatorManagerTest;
friend class EventAggregatorWorkerTest;
friend class internal::RealLoggerFactory;
// Request that the worker thread shut down and wait for it to exit. The
// worker thread backs up the LocalAggregateStore before exiting.
void ShutDown();
// Main loop executed by the worker thread. The thread sleeps for
// |aggregate_backup_interval_| seconds or until notified of shutdown, then
// calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
// DoScheduledTasks() and schedules the next occurrence of any completed
// tasks.
void Run(std::unique_ptr<util::SystemClockInterface> system_clock);
// Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|,
// calls AggregateStore::GenerateObservations() with the day index of the previous day from
// |system_time| in each of UTC and local time, and then backs up the history of generated
// Observations. If |next_gc_| is less than or equal to |steady_time|, calls
// AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in
// each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is
// logged and execution continues if the operation fails.
void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time);
// Triggers the work thread to wake up and back up the LocalAggregateStore and the
// ObservationHistory.
// TODO(zmbush): Rename "backup" nomenclature to "checkpoint".
void TriggerBackups();
struct WorkerThreadController {
// Setting this value to true requests that the worker thread stop.
bool shut_down = true;
// Setting this value to true requests that the worker thread immediately perform its work
// rather than waiting for the next scheduled time to run. After the worker thread has completed
// its work, it will reset this value to false.
bool immediate_run_trigger = false;
// Setting this value to true requests that the worker thread wake up and back up the aggregate
// store and the observation history, before going back to sleep.
bool back_up_now = false;
// Used to wait on to execute periodic EventAggregator tasks.
std::condition_variable_any wakeup_notifier;
const logger::Encoder* encoder_;
const logger::ObservationWriter* observation_writer_;
logger::InternalMetrics* internal_metrics_ = nullptr;
size_t backfill_days_ = 0;
std::chrono::seconds aggregate_backup_interval_;
std::chrono::seconds generate_obs_interval_;
std::chrono::seconds gc_interval_;
std::chrono::steady_clock::time_point next_generate_obs_;
std::chrono::steady_clock::time_point next_gc_;
std::unique_ptr<util::SteadyClockInterface> steady_clock_;
std::unique_ptr<AggregateStore> aggregate_store_;
std::unique_ptr<util::ConsistentProtoStore> owned_local_aggregate_proto_store_;
std::unique_ptr<util::ConsistentProtoStore> owned_obs_history_proto_store_;
std::unique_ptr<EventAggregator> event_aggregator_;
static const std::chrono::seconds kDefaultAggregateBackupInterval;
static const std::chrono::seconds kDefaultGenerateObsInterval;
static const std::chrono::seconds kDefaultGCInterval;
std::thread worker_thread_;
util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
uint64_t num_runs_ = 0;
} // namespace local_aggregation
} // namespace cobalt