| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/observation_generator.h" |
| |
| #include <optional> |
| #include <thread> |
| |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/statusor/status_macros.h" |
| #include "src/lib/statusor/statusor.h" |
| #include "src/lib/util/clock.h" |
| #include "src/lib/util/datetime_util.h" |
| #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h" |
| #include "src/local_aggregation_1_1/backfill_manager.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/logger/observation_writer.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/pb/observation_batch.pb.h" |
| #include "src/public/lib/status_codes.h" |
| #include "src/registry/window_size.pb.h" |
| #include "src/system_data/system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| const std::chrono::seconds kDefaultGenerateObsInterval = std::chrono::hours(1); |
| const std::chrono::hours kOneDay = std::chrono::hours(24); |
| |
| ObservationGenerator::ObservationGenerator( |
| LocalAggregateStorage* aggregate_storage, |
| const logger::ProjectContextFactory* global_project_context_factory, |
| MetadataBuilder* metadata_builder, const logger::ObservationWriter* observation_writer, |
| std::unique_ptr<logger::PrivacyEncoder> privacy_encoder, |
| bool /*generate_observations_with_current_system_profile*/, uint32_t backfill_manager_days) |
| : aggregate_storage_(aggregate_storage), |
| global_project_context_factory_(global_project_context_factory), |
| metadata_builder_(metadata_builder), |
| observation_writer_(observation_writer), |
| steady_clock_(new util::SteadyClock()), |
| generate_obs_interval_(kDefaultGenerateObsInterval), |
| backfill_manager_(backfill_manager_days), |
| privacy_encoder_(std::move(privacy_encoder)), |
| backfill_manager_days_(backfill_manager_days) { |
| CHECK(aggregate_storage_); |
| } |
| |
| void ObservationGenerator::Start(util::SystemClockInterface* clock) { |
| auto locked = protected_fields_.lock(); |
| locked->shut_down = false; |
| LOG(INFO) << "Starting ObservationGenerator Worker Thread"; |
| worker_thread_ = std::thread([this, clock]() mutable { this->Run(clock); }); |
| } |
| |
| void ObservationGenerator::ShutDown() { |
| if (worker_thread_.joinable()) { |
| { |
| auto locked = protected_fields_.lock(); |
| locked->shut_down = true; |
| locked->wakeup_notifier.notify_all(); |
| } |
| worker_thread_.join(); |
| } else { |
| protected_fields_.lock()->shut_down = true; |
| } |
| } |
| |
| void ObservationGenerator::Run(util::SystemClockInterface* clock) { |
| LOG(INFO) << "ObservationGenerator Worker Thread running"; |
| auto start_time = steady_clock_->now(); |
| // Schedule Observation generation to happen now. |
| next_generate_obs_ = start_time; |
| auto locked = protected_fields_.lock(); |
| while (true) { |
| // Exit if a shutdown has been requested. |
| if (locked->shut_down) { |
| return; |
| } |
| |
| // Sleep until the next scheduled observation generation or until notified of a shutdown. |
| locked->wakeup_notifier.wait_until(locked, next_generate_obs_, |
| [&locked]() { return locked->shut_down; }); |
| |
| const Status status = GenerateObservations(clock->now(), steady_clock_->now()); |
| if (!status.ok()) { |
| LOG(ERROR) << "Error occurred while generating observations: " << status; |
| } |
| } |
| } |
| |
| Status ObservationGenerator::GenerateObservations( |
| std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time) { |
| VLOG(5) << "ObservationGenerator::GenerateObservations"; |
| util::TimeInfo utc = util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC); |
| util::TimeInfo local = |
| util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL); |
| |
| if (steady_time >= next_generate_obs_) { |
| next_generate_obs_ += generate_obs_interval_; |
| return GenerateObservationsOnce(utc, local); |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status ObservationGenerator::GenerateObservationsOnce(util::TimeInfo utc, util::TimeInfo local) { |
| LOG(INFO) << "Generating aggregated observations for time, utc:" << utc << " local:" << local; |
| std::vector<Status> generation_errors; |
| for (lib::ProjectIdentifier project_identifier : |
| global_project_context_factory_->ListProjects()) { |
| std::unique_ptr<logger::ProjectContext> project = |
| global_project_context_factory_->NewProjectContext(project_identifier); |
| |
| for (lib::MetricIdentifier metric_identifier : project->ListMetrics()) { |
| const MetricDefinition* metric = project->GetMetric(metric_identifier); |
| lib::statusor::StatusOr<LocalAggregateStorage::MetricAggregateRef> aggregate_or = |
| aggregate_storage_->GetMetricAggregate(metric_identifier); |
| if (!aggregate_or.ok()) { |
| generation_errors.push_back(aggregate_or.status()); |
| continue; |
| } |
| LocalAggregateStorage::MetricAggregateRef aggregate = std::move(aggregate_or.ValueOrDie()); |
| logger::MetricRef metric_ref = project->RefMetric(metric); |
| |
| util::TimeInfo end_time_info = utc; |
| if (metric->time_zone_policy() == MetricDefinition::LOCAL) { |
| end_time_info = local; |
| } |
| |
| for (const ReportDefinition& report : metric->reports()) { |
| CB_ASSIGN_OR_RETURN(std::unique_ptr<AggregationProcedure> procedure, |
| AggregationProcedure::Get(*metric, report)); |
| |
| if (procedure) { |
| if (aggregate.aggregate()->mutable_by_report_id()->count(report.id())) { |
| ReportAggregate* report_aggregate = |
| &aggregate.aggregate()->mutable_by_report_id()->at(report.id()); |
| |
| Status status = GenerateObservationsForReportAggregate( |
| report_aggregate, procedure.get(), end_time_info, metric, metric_ref, report); |
| if (!status.ok()) { |
| generation_errors.push_back(status); |
| } |
| } |
| } |
| } |
| |
| aggregate.Save(); |
| } |
| } |
| |
| Status status = aggregate_storage_->GarbageCollection(); |
| if (!status.ok()) { |
| generation_errors.push_back(status); |
| } |
| |
| // We can guarantee that we will never need a metadata if it is older than: |
| // BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days). |
| // We add an extra day just to be safe. |
| metadata_builder_->CleanupBefore(kOneDay * backfill_manager_days_ + kOneDay * WindowSize_MAX + |
| kOneDay); |
| |
| if (!generation_errors.empty()) { |
| util::StatusBuilder sb(StatusCode::INTERNAL, "Encountered "); |
| sb.AppendMsg(generation_errors.size()).AppendMsg(" errors while generating observations"); |
| for (const Status& error : generation_errors) { |
| sb.WithContext("error", error); |
| } |
| return sb.Build(); |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| Status ObservationGenerator::GenerateObservationsForReportAggregate( |
| ReportAggregate* report_aggregate, AggregationProcedure* procedure, |
| util::TimeInfo end_time_info, const MetricDefinition* metric, |
| const logger::MetricRef& metric_ref, const ReportDefinition& report, |
| std::optional<const SystemProfile*> profile) { |
| if (procedure->IsDaily() && procedure->IsExpedited()) { |
| // Also try to generate expedited observations for the current day. |
| end_time_info.day_index++; |
| } |
| for (util::TimeInfo time_info : backfill_manager_.CalculateBackfill( |
| procedure->GetLastTimeInfo(*report_aggregate), end_time_info, metric->time_zone_policy(), |
| procedure->IsDaily())) { |
| CB_ASSIGN_OR_RETURN(std::unique_ptr<Observation> observation, |
| procedure->GenerateObservation(time_info, report_aggregate)); |
| |
| CB_ASSIGN_OR_RETURN( |
| std::vector<std::unique_ptr<Observation>> observations, |
| privacy_encoder_->MaybeMakePrivateObservations(std::move(observation), *metric, report)); |
| |
| bool is_contribution = true; |
| for (const std::unique_ptr<Observation>& observation : observations) { |
| if (observation == nullptr) { |
| continue; |
| } |
| CB_RETURN_IF_ERROR(observation_writer_->WriteObservation( |
| observation, |
| profile ? MetadataBuilder::BuildFromProfile(metric_ref, report, time_info.day_index, |
| *profile.value()) |
| : metadata_builder_->Build(metric_ref, report, time_info), |
| is_contribution)); |
| is_contribution = false; |
| } |
| |
| procedure->ObservationsCommitted(report_aggregate, time_info); |
| } |
| |
| return Status::OkStatus(); |
| } |
| |
| } // namespace cobalt::local_aggregation |