| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/event_aggregator_mgr.h" |
| |
| #include <memory> |
| |
| #include "src/lib/util/consistent_proto_store.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/file_system.h" |
| #include "src/lib/util/not_null.h" |
| #include "src/lib/util/status_builder.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/registry/window_size.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| using util::TimeToDayIndex; |
| |
| const std::chrono::seconds EventAggregatorManager::kDefaultAggregateBackupInterval = |
| std::chrono::minutes(1); |
| const std::chrono::seconds EventAggregatorManager::kDefaultGenerateObsInterval = |
| std::chrono::hours(1); |
| const std::chrono::seconds EventAggregatorManager::kDefaultGCInterval = std::chrono::hours(24); |
| |
| EventAggregatorManager::EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem& fs, |
| const logger::Encoder& encoder, |
| const logger::ObservationWriter& observation_writer, |
| MetadataBuilder& metadata_builder) |
| : encoder_(encoder), |
| observation_writer_(observation_writer), |
| metadata_builder_(metadata_builder), |
| backfill_days_(cfg.local_aggregation_backfill_days), |
| aggregate_backup_interval_(kDefaultAggregateBackupInterval), |
| generate_obs_interval_(kDefaultGenerateObsInterval), |
| gc_interval_(kDefaultGCInterval), |
| steady_clock_(util::MakeNotNullUniquePtr<util::SteadyClock>()), |
| owned_local_aggregate_proto_store_(cfg.local_aggregate_proto_store_path, fs), |
| owned_obs_history_proto_store_(cfg.obs_history_proto_store_path, fs), |
| aggregate_store_(encoder_, observation_writer, owned_local_aggregate_proto_store_, |
| owned_obs_history_proto_store_, backfill_days_), |
| event_aggregator_(aggregate_store_) {} |
| |
| void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) { |
| auto locked = protected_worker_thread_controller_.lock(); |
| locked->shut_down = false; |
| std::thread t([this, clock = std::move(clock)]() mutable { this->Run(std::move(clock)); }); |
| worker_thread_ = std::move(t); |
| } |
| |
| void EventAggregatorManager::ShutDown() { |
| if (worker_thread_.joinable()) { |
| { |
| auto locked = protected_worker_thread_controller_.lock(); |
| locked->shut_down = true; |
| locked->wakeup_notifier.notify_all(); |
| } |
| worker_thread_.join(); |
| } else { |
| protected_worker_thread_controller_.lock()->shut_down = true; |
| } |
| } |
| |
| void EventAggregatorManager::Run(std::unique_ptr<util::SystemClockInterface> system_clock) { |
| std::chrono::steady_clock::time_point steady_time = steady_clock_->now(); |
| // Schedule Observation generation to happen in the first cycle. |
| next_generate_obs_ = steady_time; |
| // Schedule garbage collection to happen |gc_interval_| seconds from now. |
| next_gc_ = steady_time + gc_interval_; |
| // Acquire the mutex protecting the shutdown flag and condition variable. |
| auto locked = protected_worker_thread_controller_.lock(); |
| while (true) { |
| num_runs_++; |
| |
| // If shutdown has been requested, back up the LocalAggregateStore and |
| // exit. |
| if (locked->shut_down) { |
| aggregate_store_.BackUpLocalAggregateStore(); |
| return; |
| } |
| // Sleep until the next scheduled backup of the LocalAggregateStore or |
| // until notified of shutdown. Back up the LocalAggregateStore after |
| // waking. |
| locked->wakeup_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() { |
| if (locked->immediate_run_trigger) { |
| locked->immediate_run_trigger = false; |
| return true; |
| } |
| return locked->shut_down || locked->back_up_now; |
| }); |
| aggregate_store_.BackUpLocalAggregateStore(); |
| if (locked->back_up_now) { |
| locked->back_up_now = false; |
| aggregate_store_.BackUpObservationHistory(); |
| } |
| // If the worker thread was woken up by a shutdown request, exit. |
| // Otherwise, complete any scheduled Observation generation and garbage |
| // collection. |
| if (locked->shut_down) { |
| return; |
| } |
| // Check whether it is time to generate Observations or to garbage-collect |
| // the LocalAggregate store. If so, do that task and schedule the next |
| // occurrence. |
| DoScheduledTasks(system_clock->now(), steady_clock_->now()); |
| } |
| } |
| |
| void EventAggregatorManager::DoScheduledTasks(std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time) { |
| auto current_time_t = std::chrono::system_clock::to_time_t(system_time); |
| auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1; |
| auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1; |
| |
| // Skip the tasks (but do schedule a retry) if either day index is too small. |
| uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + backfill_days_; |
| bool skip_tasks = |
| (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index); |
| if (steady_time >= next_generate_obs_) { |
| next_generate_obs_ += generate_obs_interval_; |
| if (skip_tasks) { |
| LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the " |
| "current day index is too small."; |
| } else { |
| Status obs_status = |
| aggregate_store_.GenerateObservations(yesterday_utc, yesterday_local_time); |
| if (obs_status.ok()) { |
| aggregate_store_.BackUpObservationHistory(); |
| |
| // We can guarantee that we will never need a metadata if it is older than: |
| // BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days). |
| // We add an extra day just to be safe. |
| metadata_builder_.CleanupBefore(kOneDay * (backfill_days_ + 1) + kOneDay * WindowSize_MAX + |
| kOneDay); |
| } else { |
| LOG(ERROR) << "GenerateObservations failed with status: " << obs_status.error_message(); |
| } |
| } |
| } |
| if (steady_time >= next_gc_) { |
| next_gc_ += gc_interval_; |
| if (skip_tasks) { |
| LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the " |
| "current day index is too small."; |
| } else { |
| Status gc_status = aggregate_store_.GarbageCollect(yesterday_utc, yesterday_local_time); |
| if (gc_status.ok()) { |
| aggregate_store_.BackUpLocalAggregateStore(); |
| } else { |
| LOG(ERROR) << "GarbageCollect failed with status: " << gc_status.error_message(); |
| } |
| } |
| } |
| } |
| |
| Status EventAggregatorManager::GenerateObservationsNoWorker(uint32_t final_day_index_utc, |
| uint32_t final_day_index_local) { |
| if (worker_thread_.joinable()) { |
| return util::StatusBuilder( |
| StatusCode::ABORTED, |
| "GenerateObservationsNoWorker() was called while worker thread was running.") |
| .LogError() |
| .Build(); |
| } |
| |
| CB_RETURN_IF_ERROR( |
| aggregate_store_.GenerateObservations(final_day_index_utc, final_day_index_local)); |
| |
| // We can guarantee that we will never need a metadata if it is older than: |
| // BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days). |
| // We add an extra day just to be safe. |
| metadata_builder_.CleanupBefore(kOneDay * (backfill_days_ + 1) + kOneDay * WindowSize_MAX + |
| kOneDay); |
| |
| return Status::OkStatus(); |
| } |
| |
| void EventAggregatorManager::TriggerBackups() { |
| auto locked = protected_worker_thread_controller_.lock(); |
| locked->back_up_now = true; |
| locked->wakeup_notifier.notify_all(); |
| } |
| |
| void EventAggregatorManager::Reset() { |
| aggregate_store_ = |
| AggregateStore(encoder_, observation_writer_, owned_local_aggregate_proto_store_, |
| owned_obs_history_proto_store_, backfill_days_); |
| aggregate_store_.ResetInternalMetrics(internal_metrics_); |
| |
| event_aggregator_ = EventAggregator(aggregate_store_); |
| steady_clock_ = util::MakeNotNullUniquePtr<util::SteadyClock>(); |
| } |
| |
| } // namespace cobalt::local_aggregation |