| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/event_aggregator.h" |
| |
| #include <algorithm> |
| #include <map> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "src/lib/util/datetime_util.h" |
| #include "src/lib/util/proto_util.h" |
| #include "src/registry/packed_event_codes.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| using logger::Encoder; |
| using logger::EventRecord; |
| using logger::kInvalidArguments; |
| using logger::kOK; |
| using logger::kOther; |
| using logger::ObservationWriter; |
| using logger::ProjectContext; |
| using logger::Status; |
| using util::ConsistentProtoStore; |
| using util::SerializeToBase64; |
| using util::SteadyClock; |
| using util::TimeToDayIndex; |
| |
| namespace { |
| |
| // Populates a ReportAggregationKey proto message and then populates a string |
| // with the base64 encoding of the serialized proto. |
| bool PopulateReportKey(uint32_t customer_id, uint32_t project_id, uint32_t metric_id, |
| uint32_t report_id, std::string* key) { |
| ReportAggregationKey key_data; |
| key_data.set_customer_id(customer_id); |
| key_data.set_project_id(project_id); |
| key_data.set_metric_id(metric_id); |
| key_data.set_report_id(report_id); |
| return SerializeToBase64(key_data, key); |
| } |
| |
| } // namespace |
| |
| EventAggregator::EventAggregator(const Encoder* encoder, |
| const ObservationWriter* observation_writer, |
| ConsistentProtoStore* local_aggregate_proto_store, |
| ConsistentProtoStore* obs_history_proto_store, |
| const size_t backfill_days, |
| const std::chrono::seconds aggregate_backup_interval, |
| const std::chrono::seconds generate_obs_interval, |
| const std::chrono::seconds gc_interval) { |
| CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count()) |
| << "aggregate_backup_interval must be less than or equal to " |
| "generate_obs_interval"; |
| CHECK_LE(aggregate_backup_interval.count(), gc_interval.count()) |
| << "aggregate_backup_interval must be less than or equal to gc_interval"; |
| aggregate_backup_interval_ = aggregate_backup_interval; |
| generate_obs_interval_ = generate_obs_interval; |
| gc_interval_ = gc_interval; |
| |
| aggregate_store_ = |
| std::make_unique<AggregateStore>(encoder, observation_writer, local_aggregate_proto_store, |
| obs_history_proto_store, backfill_days); |
| |
| steady_clock_ = std::make_unique<SteadyClock>(); |
| } |
| |
| void EventAggregator::Start(std::unique_ptr<util::SystemClockInterface> clock) { |
| auto locked = protected_worker_thread_controller_.lock(); |
| locked->shut_down = false; |
| std::thread t(std::bind( |
| [this](std::unique_ptr<util::SystemClockInterface>& clock) { this->Run(std::move(clock)); }, |
| std::move(clock))); |
| worker_thread_ = std::move(t); |
| } |
| |
| // TODO(pesk): update the EventAggregator's view of a Metric |
| // or ReportDefinition when appropriate. |
| Status EventAggregator::UpdateAggregationConfigs(const ProjectContext& project_context) { |
| auto locked = aggregate_store_->protected_aggregate_store_.lock(); |
| Status status; |
| for (const auto& metric : project_context.metrics()) { |
| switch (metric.metric_type()) { |
| case MetricDefinition::EVENT_OCCURRED: { |
| for (const auto& report : metric.reports()) { |
| switch (report.report_type()) { |
| case ReportDefinition::UNIQUE_N_DAY_ACTIVES: { |
| status = aggregate_store_->MaybeInsertReportConfigLocked( |
| project_context, metric, report, &(locked->local_aggregate_store)); |
| if (status != kOK) { |
| return status; |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| } |
| case MetricDefinition::EVENT_COUNT: |
| case MetricDefinition::ELAPSED_TIME: |
| case MetricDefinition::FRAME_RATE: |
| case MetricDefinition::MEMORY_USAGE: { |
| for (const auto& report : metric.reports()) { |
| switch (report.report_type()) { |
| case ReportDefinition::PER_DEVICE_NUMERIC_STATS: |
| case ReportDefinition::PER_DEVICE_HISTOGRAM: { |
| status = aggregate_store_->MaybeInsertReportConfigLocked( |
| project_context, metric, report, &(locked->local_aggregate_store)); |
| if (status != kOK) { |
| return status; |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| } |
| default: |
| continue; |
| } |
| } |
| return kOK; |
| } |
| |
| // Helper functions for the Log*Event() methods. |
| namespace { |
| |
| // Checks that an Event has type |expected_event_type|. |
| bool ValidateEventType(Event::TypeCase expected_event_type, const Event& event) { |
| Event::TypeCase event_type = event.type_case(); |
| if (event_type != expected_event_type) { |
| LOG(ERROR) << "Expected Event type is " << expected_event_type << "; found " << event_type |
| << "."; |
| return false; |
| } |
| return true; |
| } |
| |
| } // namespace |
| |
| Status EventAggregator::LogUniqueActivesEvent(uint32_t report_id, const EventRecord& event_record) { |
| auto* event = event_record.event(); |
| if (!ValidateEventType(Event::kOccurrenceEvent, *event)) { |
| return kInvalidArguments; |
| } |
| auto* metric = event_record.metric(); |
| std::string key; |
| if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id, |
| &key)) { |
| return kInvalidArguments; |
| } |
| auto locked = aggregate_store_->protected_aggregate_store_.lock(); |
| auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(key); |
| if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) { |
| LOG(ERROR) << "The Local Aggregate Store received an unexpected key."; |
| return kInvalidArguments; |
| } |
| if (!aggregates->second.has_unique_actives_aggregates()) { |
| LOG(ERROR) << "The local aggregates for this report key are not of type " |
| "UniqueActivesReportAggregates."; |
| return kInvalidArguments; |
| } |
| (*(*aggregates->second.mutable_unique_actives_aggregates() |
| ->mutable_by_event_code())[event->occurrence_event().event_code()] |
| .mutable_by_day_index())[event->day_index()] |
| .mutable_activity_daily_aggregate() |
| ->set_activity_indicator(true); |
| return kOK; |
| } |
| |
| Status EventAggregator::LogCountEvent(uint32_t report_id, const EventRecord& event_record) { |
| auto* event = event_record.event(); |
| if (!ValidateEventType(Event::kCountEvent, *event)) { |
| return kInvalidArguments; |
| } |
| auto* metric = event_record.metric(); |
| std::string key; |
| if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id, |
| &key)) { |
| return kInvalidArguments; |
| } |
| const CountEvent& count_event = event->count_event(); |
| return LogNumericEvent(key, event->day_index(), count_event.component(), |
| config::PackEventCodes(count_event.event_code()), count_event.count()); |
| } |
| |
| Status EventAggregator::LogElapsedTimeEvent(uint32_t report_id, const EventRecord& event_record) { |
| auto* event = event_record.event(); |
| if (!ValidateEventType(Event::kElapsedTimeEvent, *event)) { |
| return kInvalidArguments; |
| } |
| std::string key; |
| auto* metric = event_record.metric(); |
| if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id, |
| &key)) { |
| return kInvalidArguments; |
| } |
| const ElapsedTimeEvent& elapsed_time_event = event->elapsed_time_event(); |
| return LogNumericEvent(key, event->day_index(), elapsed_time_event.component(), |
| config::PackEventCodes(elapsed_time_event.event_code()), |
| elapsed_time_event.elapsed_micros()); |
| } |
| |
| Status EventAggregator::LogFrameRateEvent(uint32_t report_id, const EventRecord& event_record) { |
| auto* event = event_record.event(); |
| if (!ValidateEventType(Event::kFrameRateEvent, *event)) { |
| return kInvalidArguments; |
| } |
| std::string key; |
| auto* metric = event_record.metric(); |
| if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id, |
| &key)) { |
| return kInvalidArguments; |
| } |
| const FrameRateEvent& frame_rate_event = event->frame_rate_event(); |
| return LogNumericEvent(key, event->day_index(), frame_rate_event.component(), |
| config::PackEventCodes(frame_rate_event.event_code()), |
| frame_rate_event.frames_per_1000_seconds()); |
| } |
| |
| Status EventAggregator::LogMemoryUsageEvent(uint32_t report_id, const EventRecord& event_record) { |
| auto* event = event_record.event(); |
| if (!ValidateEventType(Event::kMemoryUsageEvent, *event)) { |
| return kInvalidArguments; |
| } |
| std::string key; |
| auto* metric = event_record.metric(); |
| if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id, |
| &key)) { |
| return kInvalidArguments; |
| } |
| const MemoryUsageEvent& memory_usage_event = event->memory_usage_event(); |
| return LogNumericEvent(key, event->day_index(), memory_usage_event.component(), |
| config::PackEventCodes(memory_usage_event.event_code()), |
| memory_usage_event.bytes()); |
| } |
| |
| Status EventAggregator::LogNumericEvent(const std::string& report_key, uint32_t day_index, |
| const std::string& component, uint64_t event_code, |
| int64_t value) { |
| auto locked = aggregate_store_->protected_aggregate_store_.lock(); |
| auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(report_key); |
| if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) { |
| LOG(ERROR) << "The Local Aggregate Store received an unexpected key."; |
| return kInvalidArguments; |
| } |
| if (!aggregates->second.has_numeric_aggregates()) { |
| LOG(ERROR) << "The local aggregates for this report key are not of a " |
| "compatible type."; |
| return kInvalidArguments; |
| } |
| auto aggregates_by_day = |
| (*(*aggregates->second.mutable_numeric_aggregates()->mutable_by_component())[component] |
| .mutable_by_event_code())[event_code] |
| .mutable_by_day_index(); |
| bool first_event_today = ((*aggregates_by_day).find(day_index) == aggregates_by_day->end()); |
| auto day_aggregate = (*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate(); |
| const auto& aggregation_type = |
| aggregates->second.aggregation_config().report().aggregation_type(); |
| switch (aggregation_type) { |
| case ReportDefinition::SUM: |
| day_aggregate->set_value(value + day_aggregate->value()); |
| return kOK; |
| case ReportDefinition::MAX: |
| day_aggregate->set_value(std::max(value, day_aggregate->value())); |
| return kOK; |
| case ReportDefinition::MIN: |
| if (first_event_today) { |
| day_aggregate->set_value(value); |
| } else { |
| day_aggregate->set_value(std::min(value, day_aggregate->value())); |
| } |
| return kOK; |
| default: |
| LOG(ERROR) << "Unexpected aggregation type " << aggregation_type; |
| return kInvalidArguments; |
| } |
| } |
| |
| Status EventAggregator::GenerateObservationsNoWorker(uint32_t final_day_index_utc, |
| uint32_t final_day_index_local) { |
| if (worker_thread_.joinable()) { |
| LOG(ERROR) << "GenerateObservationsNoWorker() was called while " |
| "worker thread was running."; |
| return kOther; |
| } |
| return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local); |
| } |
| |
| void EventAggregator::ShutDown() { |
| if (worker_thread_.joinable()) { |
| { |
| auto locked = protected_worker_thread_controller_.lock(); |
| locked->shut_down = true; |
| locked->shutdown_notifier.notify_all(); |
| } |
| worker_thread_.join(); |
| } else { |
| protected_worker_thread_controller_.lock()->shut_down = true; |
| } |
| } |
| |
| void EventAggregator::Run(std::unique_ptr<util::SystemClockInterface> system_clock) { |
| std::chrono::steady_clock::time_point steady_time = steady_clock_->now(); |
| // Schedule Observation generation to happen in the first cycle. |
| next_generate_obs_ = steady_time; |
| // Schedule garbage collection to happen |gc_interval_| seconds from now. |
| next_gc_ = steady_time + gc_interval_; |
| // Acquire the mutex protecting the shutdown flag and condition variable. |
| auto locked = protected_worker_thread_controller_.lock(); |
| while (true) { |
| // If shutdown has been requested, back up the LocalAggregateStore and |
| // exit. |
| if (locked->shut_down) { |
| aggregate_store_->BackUpLocalAggregateStore(); |
| return; |
| } |
| // Sleep until the next scheduled backup of the LocalAggregateStore or |
| // until notified of shutdown. Back up the LocalAggregateStore after |
| // waking. |
| locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() { |
| if (locked->immediate_run_trigger) { |
| locked->immediate_run_trigger = false; |
| return true; |
| } |
| return locked->shut_down; |
| }); |
| aggregate_store_->BackUpLocalAggregateStore(); |
| // If the worker thread was woken up by a shutdown request, exit. |
| // Otherwise, complete any scheduled Observation generation and garbage |
| // collection. |
| if (locked->shut_down) { |
| return; |
| } |
| // Check whether it is time to generate Observations or to garbage-collect |
| // the LocalAggregate store. If so, do that task and schedule the next |
| // occurrence. |
| DoScheduledTasks(system_clock->now(), steady_clock_->now()); |
| } |
| } |
| |
| void EventAggregator::DoScheduledTasks(std::chrono::system_clock::time_point system_time, |
| std::chrono::steady_clock::time_point steady_time) { |
| auto current_time_t = std::chrono::system_clock::to_time_t(system_time); |
| auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1; |
| auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1; |
| |
| // Skip the tasks (but do schedule a retry) if either day index is too small. |
| uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + aggregate_store_->backfill_days_; |
| bool skip_tasks = |
| (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index); |
| if (steady_time >= next_generate_obs_) { |
| next_generate_obs_ += generate_obs_interval_; |
| if (skip_tasks) { |
| LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the " |
| "current day index is too small."; |
| } else { |
| auto obs_status = aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time); |
| if (obs_status == kOK) { |
| aggregate_store_->BackUpObservationHistory(); |
| } else { |
| LOG(ERROR) << "GenerateObservations failed with status: " << obs_status; |
| } |
| } |
| } |
| if (steady_time >= next_gc_) { |
| next_gc_ += gc_interval_; |
| if (skip_tasks) { |
| LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the " |
| "current day index is too small."; |
| } else { |
| auto gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time); |
| if (gc_status == kOK) { |
| aggregate_store_->BackUpLocalAggregateStore(); |
| } else { |
| LOG(ERROR) << "GarbageCollect failed with status: " << gc_status; |
| } |
| } |
| } |
| } |
| |
| } // namespace cobalt::local_aggregation |