blob: 1ccecff0d6dd8e31961a02b8b949728405c24242 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/observation_generator.h"
#include <map>
#include <optional>
#include <thread>
#include "absl/strings/str_cat.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/backfill_manager.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/logger/observation_writer.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/status_codes.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/window_size.pb.h"
#include "src/system_data/system_data.h"
namespace cobalt::local_aggregation {
const std::chrono::seconds kDefaultGenerateObsInterval = std::chrono::hours(1);
ObservationGenerator::ObservationGenerator(
LocalAggregateStorage* aggregate_storage,
const logger::ProjectContextFactory* global_project_context_factory,
system_data::SystemDataInterface* system_data,
const logger::ObservationWriter* observation_writer,
std::unique_ptr<logger::PrivacyEncoder> privacy_encoder,
bool generate_observations_with_current_system_profile, uint32_t backfill_manager_days)
: aggregate_storage_(aggregate_storage),
global_project_context_factory_(global_project_context_factory),
system_data_(system_data),
observation_writer_(observation_writer),
steady_clock_(new util::SteadyClock()),
generate_obs_interval_(kDefaultGenerateObsInterval),
backfill_manager_(backfill_manager_days),
privacy_encoder_(std::move(privacy_encoder)),
generate_observations_with_current_system_profile_(
generate_observations_with_current_system_profile) {
CHECK(aggregate_storage_);
}
void ObservationGenerator::Start(util::SystemClockInterface* clock) {
auto locked = protected_fields_.lock();
locked->shut_down = false;
LOG(INFO) << "Starting ObservationGenerator Worker Thread";
worker_thread_ = std::thread([this, clock]() mutable { this->Run(clock); });
}
void ObservationGenerator::ShutDown() {
if (worker_thread_.joinable()) {
{
auto locked = protected_fields_.lock();
locked->shut_down = true;
locked->wakeup_notifier.notify_all();
}
worker_thread_.join();
} else {
protected_fields_.lock()->shut_down = true;
}
}
void ObservationGenerator::Run(util::SystemClockInterface* clock) {
LOG(INFO) << "ObservationGenerator Worker Thread running";
auto start_time = steady_clock_->now();
// Schedule Observation generation to happen now.
next_generate_obs_ = start_time;
auto locked = protected_fields_.lock();
while (true) {
// Exit if a shutdown has been requested.
if (locked->shut_down) {
return;
}
// Sleep until the next scheduled observation generation or until notified of a shutdown.
locked->wakeup_notifier.wait_until(locked, next_generate_obs_,
[&locked]() { return locked->shut_down; });
const Status status = GenerateObservations(clock->now(), steady_clock_->now());
if (!status.ok()) {
LOG(ERROR) << "Error occurred while generating observations: " << status;
}
}
}
Status ObservationGenerator::GenerateObservations(
std::chrono::system_clock::time_point system_time,
std::chrono::steady_clock::time_point steady_time) {
VLOG(5) << "ObservationGenerator::GenerateObservations";
util::TimeInfo utc = util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC);
util::TimeInfo local =
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL);
if (steady_time >= next_generate_obs_) {
next_generate_obs_ += generate_obs_interval_;
return GenerateObservationsOnce(utc, local);
}
return Status::OkStatus();
}
Status ObservationGenerator::GenerateObservationsOnce(util::TimeInfo utc, util::TimeInfo local) {
LOG(INFO) << "Generating aggregated observations for time, utc:" << utc << " local:" << local;
std::vector<Status> generation_errors;
for (lib::ProjectIdentifier project_identifier :
global_project_context_factory_->ListProjects()) {
std::unique_ptr<logger::ProjectContext> project =
global_project_context_factory_->NewProjectContext(project_identifier);
for (lib::MetricIdentifier metric_identifier : project->ListMetrics()) {
const MetricDefinition* metric = project->GetMetric(metric_identifier);
lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> aggregate_or =
aggregate_storage_->GetMetricAggregate(metric_identifier);
if (!aggregate_or.ok()) {
generation_errors.push_back(aggregate_or.status());
continue;
}
LocalAggregateStorage::MetricAggregateRef aggregate = std::move(aggregate_or.ValueOrDie());
logger::MetricRef metric_ref = project->RefMetric(metric);
util::TimeInfo end_time_info = utc;
if (metric->time_zone_policy() == MetricDefinition::LOCAL) {
end_time_info = local;
}
for (const ReportDefinition& report : metric->reports()) {
CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure,
AggregationProcedure::Get(*metric, report));
if (procedure) {
if (aggregate.aggregate()->mutable_by_report_id()->count(report.id())) {
ReportAggregate* report_aggregate =
&aggregate.aggregate()->mutable_by_report_id()->at(report.id());
Status status =
GenerateObservationsForReportAggregate(aggregate, report_aggregate, procedure.get(),
end_time_info, metric, metric_ref, report);
if (!status.ok()) {
generation_errors.push_back(status);
}
}
}
}
aggregate.Save();
}
}
Status status = aggregate_storage_->GarbageCollection();
if (!status.ok()) {
generation_errors.push_back(status);
}
if (!generation_errors.empty()) {
util::StatusBuilder sb(StatusCode::INTERNAL, "Encountered ");
sb.AppendMsg(generation_errors.size()).AppendMsg(" errors while generating observations");
for (const Status& error : generation_errors) {
sb.WithContext("error", error);
}
return sb.Build();
}
return Status::OkStatus();
}
Status ObservationGenerator::GenerateObservationsForReportAggregate(
const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate,
AggregationProcedure* procedure, util::TimeInfo end_time_info, const MetricDefinition* metric,
const logger::MetricRef& metric_ref, const ReportDefinition& report) {
if (procedure->IsDaily() && procedure->IsExpedited()) {
// Also try to generate expedited observations for the current day.
end_time_info.day_index++;
}
SystemProfile current_filtered_system_profile;
MetadataBuilder::FilteredSystemProfile(report, system_data_->system_profile(),
&current_filtered_system_profile);
std::map<uint64_t, SystemProfile> system_profile_cache;
for (util::TimeInfo time_info : backfill_manager_.CalculateBackfill(
procedure->GetLastTimeInfo(*report_aggregate), end_time_info, metric->time_zone_policy(),
procedure->IsDaily())) {
CB_ASSIGN_OR_RETURN(std::vector<ObservationAndSystemProfile> observations,
procedure->GenerateObservations(time_info, report_aggregate));
if (!observations.empty()) {
for (ObservationAndSystemProfile& observation : observations) {
CB_ASSIGN_OR_RETURN(const SystemProfile* system_profile,
GetSystemProfile(observation.system_profile_hash, &system_profile_cache,
aggregate, report));
CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
privacy_encoder_->MaybeMakePrivateObservations(
std::move(observation.observation), *metric, report));
CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
report, private_observations, *system_profile,
observation.system_profile_hash));
}
} else {
// Allow the privacy encoder a chance to create a fabricated observation.
CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
privacy_encoder_->MaybeMakePrivateObservations(nullptr, *metric, report));
// Use the current system profile if any fabricated private observations are created.
// TODO(fxbug.dev/92955): choose plausible SystemProfiles for fabricated observations.
CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
report, private_observations,
current_filtered_system_profile, std::nullopt));
}
}
return Status::OkStatus();
}
Status ObservationGenerator::WriteObservations(
ReportAggregate* report_aggregate, AggregationProcedure* procedure, util::TimeInfo time_info,
const logger::MetricRef& metric_ref, const ReportDefinition& report,
const std::vector<std::unique_ptr<Observation>>& private_observations,
const SystemProfile& system_profile, std::optional<uint64_t> commit_system_profile_hash) {
bool is_contribution = true;
for (const std::unique_ptr<Observation>& private_observation : private_observations) {
if (private_observation == nullptr) {
continue;
}
CB_RETURN_IF_ERROR(observation_writer_->WriteObservation(
private_observation,
MetadataBuilder::BuildFromProfile(metric_ref, report, time_info.day_index, system_profile),
is_contribution));
is_contribution = false;
}
if (commit_system_profile_hash != std::nullopt) {
procedure->ObservationsCommitted(report_aggregate, time_info, *commit_system_profile_hash);
}
return Status::OkStatus();
}
lib::statusor::StatusOr<const SystemProfile*> ObservationGenerator::GetSystemProfile(
uint64_t system_profile_hash, std::map<uint64_t, SystemProfile>* system_profile_cache,
const LocalAggregateStorage::MetricAggregateRef& aggregate, const ReportDefinition& report) {
auto system_profile_cache_it = system_profile_cache->find(system_profile_hash);
if (system_profile_cache_it != system_profile_cache->end()) {
return &system_profile_cache_it->second;
}
CB_ASSIGN_OR_RETURN(SystemProfile aggregate_profile,
aggregate.RetrieveFilteredSystemProfile(system_profile_hash));
if (generate_observations_with_current_system_profile_) {
SystemProfile current_system_profile;
MetadataBuilder::FilteredSystemProfile(report, system_data_->system_profile(),
&current_system_profile);
MergeSystemProfiles(&current_system_profile, aggregate_profile);
system_profile_cache->emplace(system_profile_hash, std::move(current_system_profile));
} else {
system_profile_cache->emplace(system_profile_hash, std::move(aggregate_profile));
}
return &system_profile_cache->at(system_profile_hash);
}
void ObservationGenerator::MergeSystemProfiles(SystemProfile* to, const SystemProfile& from) {
// From
// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.message#Message.MergeFrom.details:
// Singular fields will be overwritten, if specified in from, except for embedded messages which
// will be merged. Repeated fields will be concatenated.
//
// Concatenating repeated fields is undesirable here. Instead, replace the repeated field data.
if (from.experiment_ids_size() > 0) {
to->clear_experiment_ids();
}
if (from.experiment_tokens_size() > 0) {
to->clear_experiment_tokens();
}
to->MergeFrom(from);
}
} // namespace cobalt::local_aggregation