| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_ |
| #define COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_ |
| |
| #include <condition_variable> |
| #include <cstddef> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/consistent_proto_store.h" |
| #include "src/lib/util/protected_fields.h" |
| #include "src/local_aggregation/aggregate_store.h" |
| #include "src/local_aggregation/event_aggregator.h" |
| #include "src/local_aggregation/local_aggregation.pb.h" |
| #include "src/logger/encoder.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/logger/status.h" |
| #include "src/public/cobalt_config.h" |
| |
| namespace cobalt { |
| |
| constexpr int64_t kHoursInADay = 24; |
| |
| // Forward declaration used for friend tests. These will be removed once a better solution is |
| // designed. |
| // TODO(ninai): remove this |
| namespace internal { |
| |
| class RealLoggerFactory; |
| |
| } // namespace internal |
| |
| namespace local_aggregation { |
| |
| // Class responsible for managing memory, threading, locks, time and the main loop. |
| // |
| // A worker thread calls on AggregateStore methods to do the following tasks at intervals |
| // specified in the EventAggregator's constructor: |
| // (1) Calls BackUp*() to back up the EventAggregator's state to the file system. |
| // (2) Calls GenerateObservations() with the previous day's day index to generate all Observations |
| // for rolling windows ending on that day index, as well as any missing Observations for a specified |
| // number of days in the past. |
| // (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates |
| // for any windows of interest in the future. |
| class EventAggregatorManager { |
| public: |
| // Constructs a class to manage local aggregation and provide EventAggregators. |
| // |
| // An EventAggregator maintains daily aggregates of Events in a LocalAggregateStore, uses an |
| // Encoder to generate Observations for rolling windows ending on a specified day index, and |
| // writes the Observations to an ObservationStore using an ObservationWriter. |
| // |
| // cfg: The CobaltConfig with information for constructing the EventAggregatorManager. |
| // |
| // fs: An instance of util::FileSystem used for interacting with the filesystem. |
| // |
| // encoder: The singleton instance of an Encoder on the system. |
| // |
| // observation_writer: The interface EventAggregator can use to write aggregated observations. |
| EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem* fs, |
| const logger::Encoder* encoder, |
| const logger::ObservationWriter* observation_writer); |
| |
| // Shut down the worker thread before destructing the EventAggregatorManager. |
| ~EventAggregatorManager() { ShutDown(); } |
| |
| // Starts the worker thread. |
| // |
| // |clock| The clock that should be used by the worker thread for scheduling tasks and determining |
| // the current day and hour. On systems on which the clock may be initially inaccurate, |
| // the caller should wait to invoke this method until after it is known that the clock is |
| // accurate. |
| void Start(std::unique_ptr<util::SystemClockInterface> clock); |
| |
| // Resets the state of the local aggregator. |
| // |
| // TODO(pesk): also clear the contents of the ConsistentProtoStores if we |
| // implement a mode which uses them. |
| void Reset(); |
| |
| // Returns a pointer to an EventAggregator to be used for logging. |
| EventAggregator* GetEventAggregator() { return event_aggregator_.get(); } |
| |
| // Checks that the worker thread is shut down, and if so, triggers an out of schedule |
| // AggregateStore::GenerateObservations() and returns its result. Returns kOther if the |
| // worker thread is joinable. See the documentation on AggregateStore::GenerateObservations() |
| // for a description of the parameters. |
| // |
| // This method is intended for use in the Cobalt testapps which require a single thread to |
| // both log events to and generate Observations from an EventAggregator. |
| logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc, |
| uint32_t final_day_index_local = 0u); |
| |
| // Returns a count of the number of times the loop in the Run function has run. This is likely |
| // only useful in testing to verify that the worker thread is not running too frequently. |
| uint64_t num_runs() const { return num_runs_; } |
| |
| void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); } |
| void DeleteData() { |
| aggregate_store_->DeleteData(); |
| TriggerBackups(); |
| } |
| |
| private: |
| friend class TestEventAggregatorManager; |
| friend class EventAggregatorManagerTest; |
| friend class CobaltControllerImpl; |
| friend class AggregateStoreTest; |
| friend class AggregateStoreWorkerTest; |
| friend class EventAggregatorTest; |
| friend class EventAggregatorManagerTest; |
| friend class EventAggregatorWorkerTest; |
| friend class internal::RealLoggerFactory; |
| |
| // Request that the worker thread shut down and wait for it to exit. The |
| // worker thread backs up the LocalAggregateStore before exiting. |
| void ShutDown(); |
| |
| // Main loop executed by the worker thread. The thread sleeps for |
| // |aggregate_backup_interval_| seconds or until notified of shutdown, then |
| // calls BackUpLocalAggregateStore(). If not notified of shutdown, calls |
| // DoScheduledTasks() and schedules the next occurrence of any completed |
| // tasks. |
| void Run(std::unique_ptr<util::SystemClockInterface> system_clock); |
| |
| // Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|, |
| // calls AggregateStore::GenerateObservations() with the day index of the previous day from |
| // |system_time| in each of UTC and local time, and then backs up the history of generated |
| // Observations. If |next_gc_| is less than or equal to |steady_time|, calls |
| // AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in |
| // each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is |
| // logged and execution continues if the operation fails. |
| void DoScheduledTasks(std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time); |
| |
| // Triggers the work thread to wake up and back up the LocalAggregateStore and the |
| // ObservationHistory. |
| // |
| // TODO(zmbush): Rename "backup" nomenclature to "checkpoint". |
| void TriggerBackups(); |
| |
| struct WorkerThreadController { |
| // Setting this value to true requests that the worker thread stop. |
| bool shut_down = true; |
| |
| // Setting this value to true requests that the worker thread immediately perform its work |
| // rather than waiting for the next scheduled time to run. After the worker thread has completed |
| // its work, it will reset this value to false. |
| bool immediate_run_trigger = false; |
| |
| // Setting this value to true requests that the worker thread wake up and back up the aggregate |
| // store and the observation history, before going back to sleep. |
| bool back_up_now = false; |
| |
| // Used to wait on to execute periodic EventAggregator tasks. |
| std::condition_variable_any wakeup_notifier; |
| }; |
| |
| const logger::Encoder* encoder_; |
| const logger::ObservationWriter* observation_writer_; |
| size_t backfill_days_ = 0; |
| std::chrono::seconds aggregate_backup_interval_; |
| std::chrono::seconds generate_obs_interval_; |
| std::chrono::seconds gc_interval_; |
| |
| std::chrono::steady_clock::time_point next_generate_obs_; |
| std::chrono::steady_clock::time_point next_gc_; |
| std::unique_ptr<util::SteadyClockInterface> steady_clock_; |
| |
| std::unique_ptr<AggregateStore> aggregate_store_; |
| std::unique_ptr<util::ConsistentProtoStore> owned_local_aggregate_proto_store_; |
| std::unique_ptr<util::ConsistentProtoStore> owned_obs_history_proto_store_; |
| std::unique_ptr<EventAggregator> event_aggregator_; |
| |
| static const std::chrono::seconds kDefaultAggregateBackupInterval; |
| static const std::chrono::seconds kDefaultGenerateObsInterval; |
| static const std::chrono::seconds kDefaultGCInterval; |
| |
| std::thread worker_thread_; |
| util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_; |
| |
| uint64_t num_runs_ = 0; |
| }; |
| |
| } // namespace local_aggregation |
| } // namespace cobalt |
| |
| #endif // COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_ |