blob: 240e2a365eebb744d2d2d37b67b8854b6dc7f9e9 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/observation_generator.h"
#include <thread>
#include "absl/strings/str_cat.h"
#include "src/lib/statusor/status_macros.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/backfill_manager.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/observation_writer.h"
#include "src/pb/metadata_builder.h"
#include "src/registry/window_size.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
const std::chrono::seconds kDefaultGenerateObsInterval = std::chrono::hours(1);
const uint32_t kBackfillManagerDays = 3;
const std::chrono::hours kOneDay = std::chrono::hours(24);
ObservationGenerator::ObservationGenerator(
LocalAggregateStorage* aggregate_storage,
const logger::ProjectContextFactory* global_project_context_factory,
MetadataBuilder* metadata_builder, const logger::ObservationWriter* observation_writer,
std::unique_ptr<logger::PrivacyEncoder> privacy_encoder)
: aggregate_storage_(aggregate_storage),
global_project_context_factory_(global_project_context_factory),
metadata_builder_(metadata_builder),
observation_writer_(observation_writer),
steady_clock_(new util::SteadyClock()),
generate_obs_interval_(kDefaultGenerateObsInterval),
backfill_manager_(kBackfillManagerDays),
privacy_encoder_(std::move(privacy_encoder)) {
CHECK(aggregate_storage_);
}
void ObservationGenerator::Start(util::SystemClockInterface* clock) {
auto locked = protected_fields_.lock();
locked->shut_down = false;
LOG(INFO) << "Starting ObservationGenerator Worker Thread";
worker_thread_ = std::thread([this, clock]() mutable { this->Run(clock); });
}
void ObservationGenerator::ShutDown() {
if (worker_thread_.joinable()) {
{
auto locked = protected_fields_.lock();
locked->shut_down = true;
locked->wakeup_notifier.notify_all();
}
worker_thread_.join();
} else {
protected_fields_.lock()->shut_down = true;
}
}
void ObservationGenerator::Run(util::SystemClockInterface* clock) {
LOG(INFO) << "ObservationGenerator Worker Thread running";
auto start_time = steady_clock_->now();
// Schedule Observation generation to happen now.
next_generate_obs_ = start_time;
auto locked = protected_fields_.lock();
while (true) {
// Exit if a shutdown has been requested.
if (locked->shut_down) {
return;
}
// Sleep until the next scheduled observation generation or until notified of a shutdown.
locked->wakeup_notifier.wait_until(locked, next_generate_obs_,
[&locked]() { return locked->shut_down; });
const util::Status status = GenerateObservations(clock->now(), steady_clock_->now());
if (!status.ok()) {
LOG(ERROR) << "Error occurred while generating observations: " << status.error_message();
}
}
}
util::Status ObservationGenerator::GenerateObservations(
std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time) {
VLOG(5) << "ObservationGenerator::GenerateObservations";
util::TimeInfo utc = util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC);
util::TimeInfo local =
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL);
if (steady_time >= next_generate_obs_) {
next_generate_obs_ += generate_obs_interval_;
return GenerateObservationsOnce(utc, local);
}
return util::Status::OK;
}
util::Status ObservationGenerator::GenerateObservationsOnce(util::TimeInfo utc,
util::TimeInfo local) {
VLOG(5) << "ObservationGenerator::GenerateObservationsOnce utc:" << utc << " local:" << local;
for (auto& [customer_id, project_id] : global_project_context_factory_->ListProjects()) {
std::unique_ptr<logger::ProjectContext> project =
global_project_context_factory_->NewProjectContext(customer_id, project_id);
for (uint32_t metric_id : project->ListMetrics()) {
const MetricDefinition* metric = project->GetMetric(metric_id);
CB_ASSIGN_OR_RETURN(
LocalAggregateStorage::MetricAggregateRef aggregate,
aggregate_storage_->GetMetricAggregate(customer_id, project_id, metric_id));
logger::MetricRef metric_ref = project->RefMetric(metric);
util::TimeInfo end_time_info = utc;
if (metric->time_zone_policy() == MetricDefinition::LOCAL) {
end_time_info = local;
}
for (const ReportDefinition& report : metric->reports()) {
std::unique_ptr<AggregationProcedure> procedure =
AggregationProcedure::Get(*metric, report);
if (procedure && aggregate.aggregate()->mutable_by_report_id()->count(report.id())) {
ReportAggregate* report_aggregate =
&aggregate.aggregate()->mutable_by_report_id()->at(report.id());
for (util::TimeInfo time_info : backfill_manager_.CalculateBackfill(
procedure->GetLastTimeInfo(*report_aggregate), end_time_info,
metric->time_zone_policy(), procedure->IsDaily())) {
CB_ASSIGN_OR_RETURN(std::unique_ptr<Observation> observation,
procedure->GenerateObservation(time_info, report_aggregate));
CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> observations,
privacy_encoder_->MaybeMakePrivateObservations(
std::move(observation), *metric, report));
bool is_contribution = true;
for (const std::unique_ptr<Observation>& observation : observations) {
if (observation == nullptr) {
continue;
}
logger::Status status = observation_writer_->WriteObservation(
observation,
metadata_builder_->Build(metric_ref, report, time_info.day_index,
time_info.hour_id),
is_contribution);
if (status != logger::kOK) {
return util::Status(util::StatusCode::INTERNAL,
absl::StrCat("Unable to write observation: ", status));
}
is_contribution = false;
}
procedure->SetLastTimeInfo(report_aggregate, time_info);
}
}
}
aggregate.Save();
}
}
metadata_builder_->CleanupBefore(kOneDay * kBackfillManagerDays + kOneDay * WindowSize_MAX +
kOneDay);
return util::Status::OK;
}
} // namespace cobalt::local_aggregation