blob: c9593b9ea46f98a2e55a58270ee1c716f90ca46d [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/event_aggregator_mgr.h"
#include <memory>
#include "src/lib/util/consistent_proto_store.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/file_system.h"
#include "src/lib/util/not_null.h"
#include "src/lib/util/status_builder.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/window_size.pb.h"
namespace cobalt::local_aggregation {
using util::TimeToDayIndex;
const std::chrono::seconds EventAggregatorManager::kDefaultAggregateBackupInterval =
std::chrono::minutes(1);
const std::chrono::seconds EventAggregatorManager::kDefaultGenerateObsInterval =
std::chrono::hours(1);
const std::chrono::seconds EventAggregatorManager::kDefaultGCInterval = std::chrono::hours(24);
EventAggregatorManager::EventAggregatorManager(const CobaltConfig& cfg, util::FileSystem& fs,
const logger::Encoder& encoder,
const logger::ObservationWriter* observation_writer,
MetadataBuilder& metadata_builder)
: encoder_(encoder),
observation_writer_(observation_writer),
metadata_builder_(metadata_builder),
backfill_days_(cfg.local_aggregation_backfill_days),
aggregate_backup_interval_(kDefaultAggregateBackupInterval),
generate_obs_interval_(kDefaultGenerateObsInterval),
gc_interval_(kDefaultGCInterval),
steady_clock_(util::MakeNotNullUniquePtr<util::SteadyClock>()),
owned_local_aggregate_proto_store_(cfg.local_aggregate_proto_store_path, fs),
owned_obs_history_proto_store_(cfg.obs_history_proto_store_path, fs),
aggregate_store_(encoder_, observation_writer, owned_local_aggregate_proto_store_,
owned_obs_history_proto_store_, backfill_days_),
event_aggregator_(aggregate_store_) {}
void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
auto locked = protected_worker_thread_controller_.lock();
locked->shut_down = false;
std::thread t([this, clock = std::move(clock)]() mutable { this->Run(std::move(clock)); });
worker_thread_ = std::move(t);
}
void EventAggregatorManager::ShutDown() {
if (worker_thread_.joinable()) {
{
auto locked = protected_worker_thread_controller_.lock();
locked->shut_down = true;
locked->wakeup_notifier.notify_all();
}
worker_thread_.join();
} else {
protected_worker_thread_controller_.lock()->shut_down = true;
}
}
void EventAggregatorManager::Run(std::unique_ptr<util::SystemClockInterface> system_clock) {
std::chrono::steady_clock::time_point steady_time = steady_clock_->now();
// Schedule Observation generation to happen in the first cycle.
next_generate_obs_ = steady_time;
// Schedule garbage collection to happen |gc_interval_| seconds from now.
next_gc_ = steady_time + gc_interval_;
// Acquire the mutex protecting the shutdown flag and condition variable.
auto locked = protected_worker_thread_controller_.lock();
while (true) {
num_runs_++;
// If shutdown has been requested, back up the LocalAggregateStore and
// exit.
if (locked->shut_down) {
aggregate_store_.BackUpLocalAggregateStore();
return;
}
// Sleep until the next scheduled backup of the LocalAggregateStore or
// until notified of shutdown. Back up the LocalAggregateStore after
// waking.
locked->wakeup_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
if (locked->immediate_run_trigger) {
locked->immediate_run_trigger = false;
return true;
}
return locked->shut_down || locked->back_up_now;
});
aggregate_store_.BackUpLocalAggregateStore();
if (locked->back_up_now) {
locked->back_up_now = false;
aggregate_store_.BackUpObservationHistory();
}
// If the worker thread was woken up by a shutdown request, exit.
// Otherwise, complete any scheduled Observation generation and garbage
// collection.
if (locked->shut_down) {
return;
}
// Check whether it is time to generate Observations or to garbage-collect
// the LocalAggregate store. If so, do that task and schedule the next
// occurrence.
DoScheduledTasks(system_clock->now(), steady_clock_->now());
}
}
void EventAggregatorManager::DoScheduledTasks(std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time) {
auto current_time_t = std::chrono::system_clock::to_time_t(system_time);
auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1;
auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1;
// Skip the tasks (but do schedule a retry) if either day index is too small.
uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + backfill_days_;
bool skip_tasks =
(yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index);
if (steady_time >= next_generate_obs_) {
next_generate_obs_ += generate_obs_interval_;
if (skip_tasks) {
LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the "
"current day index is too small.";
} else {
Status obs_status =
aggregate_store_.GenerateObservations(yesterday_utc, yesterday_local_time);
if (obs_status.ok()) {
aggregate_store_.BackUpObservationHistory();
// We can guarantee that we will never need a metadata if it is older than:
// BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
// We add an extra day just to be safe.
metadata_builder_.CleanupBefore(kOneDay * (backfill_days_ + 1) + kOneDay * WindowSize_MAX +
kOneDay);
} else {
LOG(ERROR) << "GenerateObservations failed with status: " << obs_status.error_message();
}
}
}
if (steady_time >= next_gc_) {
next_gc_ += gc_interval_;
if (skip_tasks) {
LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
"current day index is too small.";
} else {
Status gc_status = aggregate_store_.GarbageCollect(yesterday_utc, yesterday_local_time);
if (gc_status.ok()) {
aggregate_store_.BackUpLocalAggregateStore();
} else {
LOG(ERROR) << "GarbageCollect failed with status: " << gc_status.error_message();
}
}
}
}
Status EventAggregatorManager::GenerateObservationsNoWorker(uint32_t final_day_index_utc,
uint32_t final_day_index_local) {
if (worker_thread_.joinable()) {
return util::StatusBuilder(
StatusCode::ABORTED,
"GenerateObservationsNoWorker() was called while worker thread was running.")
.LogError()
.Build();
}
CB_RETURN_IF_ERROR(
aggregate_store_.GenerateObservations(final_day_index_utc, final_day_index_local));
// We can guarantee that we will never need a metadata if it is older than:
// BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
// We add an extra day just to be safe.
metadata_builder_.CleanupBefore(kOneDay * (backfill_days_ + 1) + kOneDay * WindowSize_MAX +
kOneDay);
return Status::OkStatus();
}
void EventAggregatorManager::TriggerBackups() {
auto locked = protected_worker_thread_controller_.lock();
locked->back_up_now = true;
locked->wakeup_notifier.notify_all();
}
void EventAggregatorManager::Reset() {
aggregate_store_ =
AggregateStore(encoder_, observation_writer_, owned_local_aggregate_proto_store_,
owned_obs_history_proto_store_, backfill_days_);
aggregate_store_.ResetInternalMetrics(internal_metrics_);
event_aggregator_ = EventAggregator(aggregate_store_);
steady_clock_ = util::MakeNotNullUniquePtr<util::SteadyClock>();
}
} // namespace cobalt::local_aggregation