| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "logger/event_aggregator.h" |
| |
| #include <algorithm> |
| #include <string> |
| #include <utility> |
| |
| #include "algorithms/rappor/rappor_config_helper.h" |
| #include "config/metric_definition.pb.h" |
| #include "logger/project_context.h" |
| #include "util/datetime_util.h" |
| #include "util/proto_util.h" |
| #include "util/status.h" |
| |
| namespace cobalt { |
| |
| using rappor::RapporConfigHelper; |
| using util::ConsistentProtoStore; |
| using util::SerializeToBase64; |
| using util::StatusCode; |
| using util::SystemClock; |
| using util::TimeToDayIndex; |
| |
| namespace logger { |
| |
| EventAggregator::EventAggregator( |
| const Encoder* encoder, const ObservationWriter* observation_writer, |
| ConsistentProtoStore* local_aggregate_proto_store, |
| ConsistentProtoStore* obs_history_proto_store, const size_t backfill_days, |
| const std::chrono::seconds aggregate_backup_interval, |
| const std::chrono::seconds generate_obs_interval, |
| const std::chrono::seconds gc_interval) |
| : encoder_(encoder), |
| observation_writer_(observation_writer), |
| local_aggregate_proto_store_(local_aggregate_proto_store), |
| obs_history_proto_store_(obs_history_proto_store) { |
| CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count()) |
| << "aggregate_backup_interval must be less than or equal to " |
| "generate_obs_interval"; |
| CHECK_LE(aggregate_backup_interval.count(), gc_interval.count()) |
| << "aggregate_backup_interval must be less than or equal to gc_interval"; |
| CHECK_LE(backfill_days, kEventAggregatorMaxAllowedBackfillDays) |
| << "backfill_days must be less than or equal to " |
| << kEventAggregatorMaxAllowedBackfillDays; |
| aggregate_backup_interval_ = aggregate_backup_interval; |
| generate_obs_interval_ = generate_obs_interval; |
| gc_interval_ = gc_interval; |
| backfill_days_ = backfill_days; |
| auto locked = protected_aggregate_store_.lock(); |
| auto restore_aggregates_status = |
| local_aggregate_proto_store_->Read(&(locked->local_aggregate_store)); |
| switch (restore_aggregates_status.error_code()) { |
| case StatusCode::OK: { |
| VLOG(4) << "Read LocalAggregateStore from disk."; |
| break; |
| } |
| case StatusCode::NOT_FOUND: { |
| VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding " |
| "with empty LocalAggregateStore. File will be created on " |
| "first snapshot of the LocalAggregateStore."; |
| break; |
| } |
| default: { |
| LOG(ERROR) |
| << "Read to local_aggregate_proto_store failed with status code: " |
| << restore_aggregates_status.error_code() |
| << "\nError message: " << restore_aggregates_status.error_message() |
| << "\nError details: " << restore_aggregates_status.error_details() |
| << "\nProceeding with empty LocalAggregateStore."; |
| locked->local_aggregate_store = LocalAggregateStore(); |
| } |
| } |
| auto restore_history_status = obs_history_proto_store_->Read(&obs_history_); |
| switch (restore_history_status.error_code()) { |
| case StatusCode::OK: { |
| VLOG(4) << "Read AggregatedObservationHistory from disk."; |
| break; |
| } |
| case StatusCode::NOT_FOUND: { |
| VLOG(4) |
| << "No file found for obs_history_proto_store. Proceeding " |
| "with empty AggregatedObservationHistory. File will be created on " |
| "first snapshot of the AggregatedObservationHistory."; |
| break; |
| } |
| default: { |
| LOG(ERROR) << "Read to obs_history_proto_store failed with status code: " |
| << restore_history_status.error_code() << "\nError message: " |
| << restore_history_status.error_message() |
| << "\nError details: " |
| << restore_history_status.error_details() |
| << "\nProceeding with empty AggregatedObservationHistory."; |
| obs_history_ = AggregatedObservationHistory(); |
| } |
| } |
| clock_.reset(new SystemClock()); |
| } |
| |
| void EventAggregator::Start() { |
| auto locked = protected_shutdown_flag_.lock(); |
| locked->shut_down = false; |
| std::thread t([this] { this->Run(); }); |
| worker_thread_ = std::move(t); |
| } |
| |
| // TODO(pesk): Have the config parser verify that each locally |
| // aggregated report has at least one window size and that all window |
| // sizes are <= |kMaxAllowedAggregationWindowSize|. Additionally, have |
| // this method filter out any window sizes larger than |
| // |kMaxAllowedAggregationWindowSize|. |
| Status EventAggregator::UpdateAggregationConfigs( |
| const ProjectContext& project_context) { |
| auto locked = protected_aggregate_store_.lock(); |
| std::string key; |
| ReportAggregationKey key_data; |
| key_data.set_customer_id(project_context.project().customer_id()); |
| key_data.set_project_id(project_context.project().project_id()); |
| for (const auto& metric : project_context.metric_definitions()->metric()) { |
| switch (metric.metric_type()) { |
| case MetricDefinition::EVENT_OCCURRED: { |
| key_data.set_metric_id(metric.id()); |
| for (const auto& report : metric.reports()) { |
| switch (report.report_type()) { |
| case ReportDefinition::UNIQUE_N_DAY_ACTIVES: { |
| key_data.set_report_id(report.id()); |
| if (!SerializeToBase64(key_data, &key)) { |
| return kInvalidArguments; |
| } |
| // TODO(pesk): update the EventAggregator's view of a Metric |
| // or ReportDefinition when appropriate. |
| if (locked->local_aggregate_store.aggregates().count(key) == 0) { |
| AggregationConfig aggregation_config; |
| *aggregation_config.mutable_project() = |
| project_context.project(); |
| *aggregation_config.mutable_metric() = |
| *project_context.GetMetric(metric.id()); |
| *aggregation_config.mutable_report() = report; |
| ReportAggregates report_aggregates; |
| *report_aggregates.mutable_aggregation_config() = |
| aggregation_config; |
| (*locked->local_aggregate_store.mutable_aggregates())[key] = |
| report_aggregates; |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| return kOK; |
| } |
| |
| Status EventAggregator::LogUniqueActivesEvent(uint32_t report_id, |
| EventRecord* event_record) { |
| if (!event_record->event->has_occurrence_event()) { |
| LOG(ERROR) << "EventAggregator::LogUniqueActivesEvent can only " |
| "accept OccurrenceEvents."; |
| return kInvalidArguments; |
| } |
| ReportAggregationKey key_data; |
| key_data.set_customer_id(event_record->metric->customer_id()); |
| key_data.set_project_id(event_record->metric->project_id()); |
| key_data.set_metric_id(event_record->metric->id()); |
| key_data.set_report_id(report_id); |
| std::string key; |
| if (!SerializeToBase64(key_data, &key)) { |
| return kInvalidArguments; |
| } |
| auto locked = protected_aggregate_store_.lock(); |
| auto aggregates = |
| locked->local_aggregate_store.mutable_aggregates()->find(key); |
| if (aggregates == locked->local_aggregate_store.mutable_aggregates()->end()) { |
| LOG(ERROR) << "The Local Aggregate Store received an unexpected key."; |
| return kInvalidArguments; |
| } |
| (*(*aggregates->second.mutable_by_event_code()) |
| [event_record->event->occurrence_event().event_code()] |
| .mutable_by_day_index())[event_record->event->day_index()] |
| .mutable_activity_daily_aggregate() |
| ->set_activity_indicator(true); |
| return kOK; |
| } |
| |
| Status EventAggregator::GenerateObservationsNoWorker( |
| uint32_t final_day_index_utc, uint32_t final_day_index_local) { |
| if (worker_thread_.joinable()) { |
| LOG(ERROR) << "GenerateObservationsNoWorker() was called while " |
| "worker thread was running."; |
| return kOther; |
| } |
| return GenerateObservations(final_day_index_utc, final_day_index_local); |
| } |
| |
| Status EventAggregator::BackUpLocalAggregateStore() { |
| // Lock, copy the LocalAggregateStore, and release the lock. Write the copy |
| // to |local_aggregate_proto_store_|. |
| auto local_aggregate_store = CopyLocalAggregateStore(); |
| auto status = local_aggregate_proto_store_->Write(local_aggregate_store); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to back up the LocalAggregateStore with error code: " |
| << status.error_code() |
| << "\nError message: " << status.error_message() |
| << "\nError details: " << status.error_details(); |
| return kOther; |
| } |
| return kOK; |
| } |
| |
| Status EventAggregator::BackUpObservationHistory() { |
| auto status = obs_history_proto_store_->Write(obs_history_); |
| if (!status.ok()) { |
| LOG(ERROR) << "Failed to back up the AggregatedObservationHistory. " |
| "::cobalt::util::Status error code: " |
| << status.error_code() |
| << "\nError message: " << status.error_message() |
| << "\nError details: " << status.error_details(); |
| return kOther; |
| } |
| return kOK; |
| } |
| |
| void EventAggregator::ShutDown() { |
| if (worker_thread_.joinable()) { |
| { |
| auto locked = protected_shutdown_flag_.lock(); |
| locked->shut_down = true; |
| locked->shutdown_notifier.notify_all(); |
| } |
| worker_thread_.join(); |
| } else { |
| protected_shutdown_flag_.lock()->shut_down = true; |
| } |
| } |
| |
| void EventAggregator::Run() { |
| auto current_time = clock_->now(); |
| // Schedule Observation generation to happen in the first cycle. |
| next_generate_obs_ = current_time; |
| // Schedule garbage collection to happen |gc_interval_| seconds from now. |
| next_gc_ = current_time + gc_interval_; |
| // Acquire the mutex protecting the shutdown flag and condition variable. |
| auto locked = protected_shutdown_flag_.lock(); |
| while (true) { |
| // If shutdown has been requested, back up the LocalAggregateStore and exit. |
| if (locked->shut_down) { |
| BackUpLocalAggregateStore(); |
| return; |
| } |
| // Sleep until the next scheduled backup of the LocalAggregateStore or until |
| // notified of shutdown. Back up the LocalAggregateStore after waking. |
| auto shutdown_requested = locked.wait_for_with( |
| &(locked->shutdown_notifier), aggregate_backup_interval_, |
| [&locked]() { return locked->shut_down; }); |
| BackUpLocalAggregateStore(); |
| // If the worker thread was woken up by a shutdown request, exit. Otherwise, |
| // complete any scheduled Observation generation and garbage collection. |
| if (shutdown_requested) { |
| return; |
| } |
| // Check whether it is time to generate Observations or to garbage-collect |
| // the LocalAggregate store. If so, do that task and schedule the next |
| // occurrence. |
| DoScheduledTasks(clock_->now()); |
| } |
| } |
| |
| void EventAggregator::DoScheduledTasks( |
| std::chrono::system_clock::time_point current_time) { |
| auto current_time_t = std::chrono::system_clock::to_time_t(current_time); |
| auto current_day_index_utc = |
| TimeToDayIndex(current_time_t, MetricDefinition::UTC); |
| auto current_day_index_local = |
| TimeToDayIndex(current_time_t, MetricDefinition::LOCAL); |
| if (current_time >= next_generate_obs_) { |
| auto obs_status = GenerateObservations(current_day_index_utc - 1, |
| current_day_index_local - 1); |
| if (obs_status == kOK) { |
| BackUpObservationHistory(); |
| } else { |
| LOG(ERROR) << "GenerateObservations failed with status: " << obs_status; |
| } |
| next_generate_obs_ += generate_obs_interval_; |
| } |
| if (current_time >= next_gc_) { |
| auto gc_status = |
| GarbageCollect(current_day_index_utc - 1, current_day_index_local - 1); |
| if (gc_status == kOK) { |
| BackUpLocalAggregateStore(); |
| } else { |
| LOG(ERROR) << "GarbageCollect failed with status: " << gc_status; |
| } |
| next_gc_ += gc_interval_; |
| } |
| } |
| |
| Status EventAggregator::GenerateObservations(uint32_t final_day_index_utc, |
| uint32_t final_day_index_local) { |
| if (final_day_index_local == 0u) { |
| final_day_index_local = final_day_index_utc; |
| } |
| // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to |
| // generate observations. |
| auto local_aggregate_store = CopyLocalAggregateStore(); |
| for (auto pair : local_aggregate_store.aggregates()) { |
| const auto& config = pair.second.aggregation_config(); |
| |
| const auto& metric = config.metric(); |
| auto metric_ref = MetricRef(&config.project(), &metric); |
| uint32_t final_day_index; |
| switch (metric.time_zone_policy()) { |
| case MetricDefinition::UTC: { |
| final_day_index = final_day_index_utc; |
| break; |
| } |
| case MetricDefinition::LOCAL: { |
| final_day_index = final_day_index_local; |
| break; |
| } |
| default: |
| LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid."; |
| return kInvalidConfig; |
| } |
| |
| const auto& report = config.report(); |
| auto max_window_size = 0u; |
| for (uint32_t window_size : report.window_size()) { |
| if (window_size > kMaxAllowedAggregationWindowSize) { |
| LOG(WARNING) << "Window size exceeding " |
| "kMaxAllowedAggregationWindowSize will be " |
| "ignored by GenerateObservations"; |
| } else if (window_size > max_window_size) { |
| max_window_size = window_size; |
| } |
| } |
| if (max_window_size == 0) { |
| LOG(ERROR) << "Each locally aggregated report must specify a positive " |
| "window size."; |
| return kInvalidConfig; |
| } |
| if (final_day_index < max_window_size) { |
| LOG(ERROR) << "final_day_index must be >= max_window_size."; |
| return kInvalidArguments; |
| } |
| |
| switch (metric.metric_type()) { |
| case MetricDefinition::EVENT_OCCURRED: { |
| auto num_event_codes = |
| RapporConfigHelper::BasicRapporNumCategories(metric); |
| |
| switch (report.report_type()) { |
| case ReportDefinition::UNIQUE_N_DAY_ACTIVES: { |
| auto status = GenerateUniqueActivesObservations( |
| metric_ref, pair.first, pair.second, num_event_codes, |
| final_day_index); |
| if (status != kOK) { |
| return status; |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| return kOK; |
| } |
| |
| Status EventAggregator::GarbageCollect(uint32_t day_index_utc, |
| uint32_t day_index_local) { |
| if (day_index_local == 0u) { |
| day_index_local = day_index_utc; |
| } |
| auto locked = protected_aggregate_store_.lock(); |
| for (auto pair : locked->local_aggregate_store.aggregates()) { |
| uint32_t day_index; |
| switch (pair.second.aggregation_config().metric().time_zone_policy()) { |
| case MetricDefinition::UTC: { |
| day_index = day_index_utc; |
| break; |
| } |
| case MetricDefinition::LOCAL: { |
| day_index = day_index_local; |
| break; |
| } |
| default: |
| LOG(ERROR) << "The TimeZonePolicy of this MetricDefinition is invalid."; |
| return kInvalidConfig; |
| } |
| // Determine the largest window size in the report associated to this |
| // key-value pair. |
| uint32_t max_window_size = 1; |
| for (uint32_t window_size : |
| pair.second.aggregation_config().report().window_size()) { |
| if (window_size > max_window_size && |
| window_size <= kMaxAllowedAggregationWindowSize) { |
| max_window_size = window_size; |
| } |
| } |
| if (max_window_size == 0) { |
| LOG(ERROR) << "Each locally aggregated report must specify a positive " |
| "window size."; |
| return kInvalidConfig; |
| } |
| if (day_index < backfill_days_ + max_window_size) { |
| LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size."; |
| return kInvalidArguments; |
| } |
| // For each event code, iterate over the sub-map of local aggregates |
| // keyed by day index. Keep buckets with day indices greater than |
| // |day_index| - |backfill_days_| - |max_window_size|, and remove |
| // all buckets with smaller day indices. |
| for (auto event_code_aggregates : pair.second.by_event_code()) { |
| for (auto day_aggregates : event_code_aggregates.second.by_day_index()) { |
| if (day_aggregates.first <= |
| day_index - backfill_days_ - max_window_size) { |
| locked->local_aggregate_store.mutable_aggregates() |
| ->at(pair.first) |
| .mutable_by_event_code() |
| ->at(event_code_aggregates.first) |
| .mutable_by_day_index() |
| ->erase(day_aggregates.first); |
| } |
| } |
| // If the day index map under this event code is empty, remove the event |
| // code from the event code map under this ReportAggregationKey. |
| if (locked->local_aggregate_store.aggregates() |
| .at(pair.first) |
| .by_event_code() |
| .at(event_code_aggregates.first) |
| .by_day_index() |
| .empty()) { |
| locked->local_aggregate_store.mutable_aggregates() |
| ->at(pair.first) |
| .mutable_by_event_code() |
| ->erase(event_code_aggregates.first); |
| } |
| } |
| } |
| return kOK; |
| } |
| |
| ////////// GenerateUniqueActivesObservations and helper methods //////////////// |
| |
| // Given the set of daily aggregates for a fixed event code, and the size and |
| // end date of an aggregation window, returns the first day index within that |
| // window on which the event code occurred. Returns 0 if the event code did |
| // not occur within the window. |
| uint32_t FirstActiveDayIndexInWindow(const DailyAggregates& daily_aggregates, |
| uint32_t obs_day_index, |
| uint32_t window_size) { |
| for (uint32_t day_index = obs_day_index - window_size + 1; |
| day_index <= obs_day_index; day_index++) { |
| auto day_aggregate = daily_aggregates.by_day_index().find(day_index); |
| if (day_aggregate != daily_aggregates.by_day_index().end() && |
| day_aggregate->second.activity_daily_aggregate().activity_indicator() == |
| true) { |
| return day_index; |
| } |
| } |
| return 0u; |
| } |
| |
| // Given the day index of an event occurrence and the size and end date |
| // of an aggregation window, returns true if the occurrence falls within |
| // the window and false if not. |
| bool IsActivityInWindow(uint32_t active_day_index, uint32_t obs_day_index, |
| uint32_t window_size) { |
| return (active_day_index <= obs_day_index && |
| active_day_index > obs_day_index - window_size); |
| } |
| |
| uint32_t EventAggregator::LastGeneratedDayIndex(const std::string& report_key, |
| uint32_t event_code, |
| uint32_t window_size) const { |
| auto report_history = obs_history_.by_report_key().find(report_key); |
| if (report_history == obs_history_.by_report_key().end()) { |
| return 0u; |
| } |
| auto event_code_history = |
| report_history->second.by_event_code().find(event_code); |
| if (event_code_history == report_history->second.by_event_code().end()) { |
| return 0u; |
| } |
| auto window_size_history = |
| event_code_history->second.by_window_size().find(window_size); |
| if (window_size_history == |
| event_code_history->second.by_window_size().end()) { |
| return 0u; |
| } |
| return window_size_history->second; |
| } |
| |
| Status EventAggregator::GenerateSingleUniqueActivesObservation( |
| const MetricRef metric_ref, const ReportDefinition* report, |
| uint32_t obs_day_index, uint32_t event_code, uint32_t window_size, |
| bool was_active) const { |
| auto encoder_result = encoder_->EncodeUniqueActivesObservation( |
| metric_ref, report, obs_day_index, event_code, was_active, window_size); |
| if (encoder_result.status != kOK) { |
| return encoder_result.status; |
| } |
| if (encoder_result.observation == nullptr || |
| encoder_result.metadata == nullptr) { |
| LOG(ERROR) << "Failed to encode UniqueActivesObservation"; |
| return kOther; |
| } |
| |
| auto writer_status = observation_writer_->WriteObservation( |
| *encoder_result.observation, std::move(encoder_result.metadata)); |
| if (writer_status != kOK) { |
| return writer_status; |
| } |
| return kOK; |
| } |
| |
| Status EventAggregator::GenerateUniqueActivesObservations( |
| const MetricRef metric_ref, const std::string& report_key, |
| const ReportAggregates& report_aggregates, uint32_t num_event_codes, |
| uint32_t final_day_index) { |
| for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) { |
| auto daily_aggregates = report_aggregates.by_event_code().find(event_code); |
| // Have any events ever been logged for this report and event code? |
| bool found_event_code = |
| (daily_aggregates != report_aggregates.by_event_code().end()); |
| for (uint32_t window_size : |
| report_aggregates.aggregation_config().report().window_size()) { |
| // Skip any window size larger than |
| // kMaxAllowedAggregationWindowSize. |
| if (window_size > kMaxAllowedAggregationWindowSize) { |
| LOG(WARNING) << "GenerateUniqueActivesObservations ignoring a window " |
| "size exceeding the maximum allowed value"; |
| continue; |
| } |
| // Find the earliest day index for which an Observation has not yet |
| // been generated for this report, event code, and window size. If |
| // that day index is later than |final_day_index|, no Observation is |
| // generated on this invocation. |
| auto last_gen = |
| LastGeneratedDayIndex(report_key, event_code, window_size); |
| auto first_day_index = |
| std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_)); |
| // The latest day index on which |event_type| is known to have |
| // occurred, so far. This value will be updated as we search |
| // forward from the earliest day index belonging to a window of |
| // interest. |
| uint32_t active_day_index = 0u; |
| // Iterate over the day indices |obs_day_index| for which we need |
| // to generate Observations. On each iteration, generate an |
| // Observation for the window of size |window_size| ending on |
| // |obs_day_index|. |
| for (uint32_t obs_day_index = first_day_index; |
| obs_day_index <= final_day_index; obs_day_index++) { |
| bool was_active = false; |
| if (found_event_code) { |
| // If the current value of |active_day_index| falls within the |
| // window, generate an Observation of activity. If not, search |
| // forward in the window, update |active_day_index|, and generate an |
| // Observation of activity or inactivity depending on the result of |
| // the search. |
| if (IsActivityInWindow(active_day_index, obs_day_index, |
| window_size)) { |
| was_active = true; |
| } else { |
| active_day_index = FirstActiveDayIndexInWindow( |
| daily_aggregates->second, obs_day_index, window_size); |
| was_active = IsActivityInWindow(active_day_index, obs_day_index, |
| window_size); |
| } |
| } |
| auto status = GenerateSingleUniqueActivesObservation( |
| metric_ref, &report_aggregates.aggregation_config().report(), |
| obs_day_index, event_code, window_size, was_active); |
| if (status != kOK) { |
| return status; |
| } |
| // Update |obs_history_| with the latest date of Observation |
| // generation for this report, event code, and window size. |
| (*(*(*obs_history_.mutable_by_report_key())[report_key] |
| .mutable_by_event_code())[event_code] |
| .mutable_by_window_size())[window_size] = obs_day_index; |
| } |
| } |
| } |
| return kOK; |
| } |
| |
| } // namespace logger |
| } // namespace cobalt |