blob: 79f61aad8cd1dc8ba4db0c67128c3ea2a4b856a8 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1.1/aggregation_procedures/aggregation_procedure.h"
#include <memory>
#include "src/lib/statusor/status_macros.h"
#include "src/lib/statusor/statusor.h"
#include "src/local_aggregation_1.1/aggregation_procedures/at_least_once_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/count_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/select_first_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/select_most_common_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1.1/aggregation_procedures/sum_and_count_aggregation_procedure.h"
#include "src/local_aggregation_1.1/local_aggregation.pb.h"
#include "src/logger/event_vector_index.h"
#include "src/logging.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/packed_event_codes.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
using cobalt::config::PackEventCodes;
std::unique_ptr<AggregationProcedure> AggregationProcedure::Get(const MetricDefinition &metric,
const ReportDefinition &report) {
VLOG(5) << "Getting aggregation procedure for report type " << report.report_type();
switch (report.report_type()) {
case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
return std::make_unique<CountAggregationProcedure>(metric, report);
case ReportDefinition::UNIQUE_DEVICE_COUNTS:
switch (report.local_aggregation_procedure()) {
case ReportDefinition::AT_LEAST_ONCE:
return std::make_unique<AtLeastOnceAggregationProcedure>(metric, report);
case ReportDefinition::SELECT_FIRST:
return std::make_unique<SelectFirstAggregationProcedure>(metric, report);
case ReportDefinition::SELECT_MOST_COMMON:
return std::make_unique<SelectMostCommonAggregationProcedure>(metric, report);
default:
LOG(ERROR) << "Report of type UNIQUE_DEVICE_COUNTS does not support selected aggregation "
"procedure: "
<< report.local_aggregation_procedure();
return nullptr;
}
break;
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
switch (metric.metric_type()) {
case MetricDefinition::OCCURRENCE:
return std::make_unique<CountAggregationProcedure>(metric, report);
case MetricDefinition::INTEGER:
return NumericStatAggregationProcedure::New(metric, report);
default:
LOG(ERROR) << "Report of type UNIQUE_DEVICE_HISTOGRAMS is not valid for metric of type: "
<< metric.metric_type();
return nullptr;
}
break;
case ReportDefinition::FLEETWIDE_HISTOGRAMS:
return std::make_unique<IntegerHistogramAggregationProcedure>(metric, report);
case ReportDefinition::FLEETWIDE_MEANS:
return std::make_unique<SumAndCountAggregationProcedure>(metric, report);
case ReportDefinition::STRING_COUNTS:
return std::make_unique<StringHistogramAggregationProcedure>(metric, report);
default:
return nullptr;
}
}
AggregationProcedure::AggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: metric_type_(metric.metric_type()), report_type_(report.report_type()) {
if (metric.event_code_buffer_max() > 0) {
event_code_buffer_max_ = metric.event_code_buffer_max();
} else {
event_code_buffer_max_ = logger::GetNumEventVectors(metric.metric_dimensions());
}
// TODO(fxbug.dev/53691): Switch when local_aggregation_period has been switched to
// OnDeviceAggregationWindow.
window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period()));
}
EventCodeAggregate *AggregationProcedure::GetEventCodeAggregate(
const logger::EventRecord &event_record, ReportAggregate *aggregate) const {
if (IsDaily()) {
return GetEventCodeAggregate(event_record.event()->day_index(), aggregate);
}
return GetEventCodeAggregate(event_record.event()->hour_id(), aggregate);
}
EventCodeAggregate *AggregationProcedure::GetEventCodeAggregate(uint32_t time,
ReportAggregate *aggregate) const {
if (IsDaily()) {
return &(*aggregate->mutable_daily()->mutable_by_day_index())[time];
}
return &(*aggregate->mutable_hourly()->mutable_by_hour_id())[time];
}
bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
return type == Event::TypeCase::kOccurrenceEvent;
case MetricDefinition::INTEGER:
return type == Event::TypeCase::kIntegerEvent;
case MetricDefinition::INTEGER_HISTOGRAM:
return type == Event::TypeCase::kIntegerHistogramEvent;
case MetricDefinition::STRING:
return type == Event::TypeCase::kStringEvent;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a cobalt 1.1 metric";
return false;
}
}
AggregateData *AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record,
EventCodeAggregate *aggregate) const {
uint32_t packed_event_vector;
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
packed_event_vector = PackEventCodes(event_record.event()->occurrence_event().event_code());
break;
case MetricDefinition::INTEGER:
packed_event_vector = PackEventCodes(event_record.event()->integer_event().event_code());
break;
case MetricDefinition::INTEGER_HISTOGRAM:
packed_event_vector =
PackEventCodes(event_record.event()->integer_histogram_event().event_code());
break;
case MetricDefinition::STRING:
packed_event_vector = PackEventCodes(event_record.event()->string_event().event_code());
break;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a Cobalt 1.1 metric";
return nullptr;
}
if (auto it = aggregate->mutable_by_event_code()->find(packed_event_vector);
it != aggregate->mutable_by_event_code()->end()) {
return &it->second;
}
if (aggregate->by_event_code_size() < event_code_buffer_max_) {
aggregate->add_event_vectors(packed_event_vector);
return &(*aggregate->mutable_by_event_code())[packed_event_vector];
}
return nullptr;
}
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
ReportAggregate *aggregate) {
if (!IsValidEventType(event_record.event()->type_case())) {
LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
<< " with aggregation procedure of type " << DebugString() << ".";
return;
}
EventCodeAggregate *event_code_aggregate;
if (IsDaily()) {
event_code_aggregate = GetEventCodeAggregate(event_record.event()->day_index(), aggregate);
} else {
event_code_aggregate = GetEventCodeAggregate(event_record.event()->hour_id(), aggregate);
}
if (AggregateData *aggregate_data = GetAggregateData(event_record, event_code_aggregate);
aggregate_data != nullptr) {
UpdateAggregateData(event_record, aggregate_data, event_code_aggregate);
}
}
lib::statusor::StatusOr<std::unique_ptr<Observation>> AggregationProcedure::GenerateObservation(
const util::TimeInfo &time_info, const logger::Encoder *encoder, ReportAggregate *aggregate) {
std::vector<EventCodeAggregate *> aggregates;
uint32_t delete_before;
if (IsDaily()) {
uint32_t end_day_index = time_info.day_index;
uint32_t start_day_index = end_day_index - static_cast<uint32_t>(window_.days()) + 1;
delete_before = start_day_index;
for (uint32_t i = end_day_index; i >= start_day_index; i--) {
aggregates.push_back(GetEventCodeAggregate(i, aggregate));
}
} else {
delete_before = time_info.hour_id;
aggregates.push_back(GetEventCodeAggregate(time_info.hour_id, aggregate));
}
CB_ASSIGN_OR_RETURN(
auto observation,
GenerateSingleObservation(encoder, aggregates, SelectEventVectorsForObservation(aggregates)));
// Clean up aggregates that will never be used again.
if (IsDaily()) {
std::vector<uint32_t> days_to_delete;
for (const auto &day : aggregate->daily().by_day_index()) {
if (day.first <= delete_before) {
days_to_delete.push_back(day.first);
}
}
for (auto day : days_to_delete) {
aggregate->mutable_daily()->mutable_by_day_index()->erase(day);
}
} else {
std::vector<uint32_t> hours_to_delete;
for (const auto &hour : aggregate->hourly().by_hour_id()) {
if (hour.first <= delete_before) {
hours_to_delete.push_back(hour.first);
}
}
for (auto hour : hours_to_delete) {
aggregate->mutable_hourly()->mutable_by_hour_id()->erase(hour);
}
}
return observation;
}
std::set<uint32_t> AggregationProcedure::SelectEventVectorsForObservation(
const std::vector<EventCodeAggregate *> &aggregates) const {
std::set<uint32_t> event_vectors;
for (const auto &aggregate : aggregates) {
for (const uint32_t event_vector : aggregate->event_vectors()) {
if (event_vectors.size() == event_code_buffer_max_) {
break;
}
event_vectors.insert(event_vector);
}
}
return event_vectors;
}
} // namespace cobalt::local_aggregation