blob: b8d17a28d873983e783a6c7e2938cd854f49354f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include <memory>
#include "src/lib/statusor/status_macros.h"
#include "src/lib/statusor/statusor.h"
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include "src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/event_vector_index.h"
#include "src/logging.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
std::unique_ptr<AggregationProcedure> AggregationProcedure::Get(const MetricDefinition &metric,
const ReportDefinition &report) {
VLOG(5) << "Getting aggregation procedure for report type " << report.report_type();
switch (report.report_type()) {
case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
return std::make_unique<CountAggregationProcedure>(metric, report);
case ReportDefinition::UNIQUE_DEVICE_COUNTS:
switch (report.local_aggregation_procedure()) {
case ReportDefinition::AT_LEAST_ONCE:
return std::make_unique<AtLeastOnceAggregationProcedure>(metric, report);
case ReportDefinition::SELECT_FIRST:
return std::make_unique<SelectFirstAggregationProcedure>(metric, report);
case ReportDefinition::SELECT_MOST_COMMON:
return std::make_unique<SelectMostCommonAggregationProcedure>(metric, report);
default:
LOG(ERROR) << "Report of type UNIQUE_DEVICE_COUNTS does not support selected aggregation "
"procedure: "
<< report.local_aggregation_procedure();
return nullptr;
}
break;
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
switch (metric.metric_type()) {
case MetricDefinition::OCCURRENCE:
return std::make_unique<CountAggregationProcedure>(metric, report);
case MetricDefinition::INTEGER:
return NumericStatAggregationProcedure::New(metric, report);
default:
LOG(ERROR) << "Report of type UNIQUE_DEVICE_HISTOGRAMS is not valid for metric of type: "
<< metric.metric_type();
return nullptr;
}
break;
case ReportDefinition::FLEETWIDE_HISTOGRAMS:
return std::make_unique<IntegerHistogramAggregationProcedure>(metric, report);
case ReportDefinition::FLEETWIDE_MEANS:
return std::make_unique<SumAndCountAggregationProcedure>(metric, report);
case ReportDefinition::STRING_COUNTS:
return std::make_unique<StringHistogramAggregationProcedure>(metric, report);
default:
return nullptr;
}
}
AggregationProcedure::AggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: metric_type_(metric.metric_type()), report_type_(report.report_type()) {
if (report.event_vector_buffer_max() > 0) {
event_vector_buffer_max_ = report.event_vector_buffer_max();
} else {
event_vector_buffer_max_ = logger::GetNumEventVectors(metric.metric_dimensions());
}
// TODO(fxbug.dev/53691): Switch when local_aggregation_period has been switched to
// OnDeviceAggregationWindow.
window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period()));
}
EventCodeAggregate *AggregationProcedure::GetEventCodeAggregate(
const logger::EventRecord &event_record, ReportAggregate *aggregate) const {
if (IsDaily()) {
return GetEventCodeAggregate(event_record.event()->day_index(), aggregate);
}
return GetEventCodeAggregate(event_record.event()->hour_id(), aggregate);
}
EventCodeAggregate *AggregationProcedure::GetEventCodeAggregate(uint32_t time,
ReportAggregate *aggregate) const {
if (IsDaily()) {
return &(*aggregate->mutable_daily()->mutable_by_day_index())[time];
}
return &(*aggregate->mutable_hourly()->mutable_by_hour_id())[time];
}
bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
return type == Event::TypeCase::kOccurrenceEvent;
case MetricDefinition::INTEGER:
return type == Event::TypeCase::kIntegerEvent;
case MetricDefinition::INTEGER_HISTOGRAM:
return type == Event::TypeCase::kIntegerHistogramEvent;
case MetricDefinition::STRING:
return type == Event::TypeCase::kStringEvent;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a cobalt 1.1 metric";
return false;
}
}
AggregateData *AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record,
EventCodeAggregate *aggregate) const {
google::protobuf::RepeatedField<uint32_t> event_codes;
switch (metric_type_) {
case MetricDefinition::OCCURRENCE:
event_codes = event_record.event()->occurrence_event().event_code();
break;
case MetricDefinition::INTEGER:
event_codes = event_record.event()->integer_event().event_code();
break;
case MetricDefinition::INTEGER_HISTOGRAM:
event_codes = event_record.event()->integer_histogram_event().event_code();
break;
case MetricDefinition::STRING:
event_codes = event_record.event()->string_event().event_code();
break;
default:
LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
<< " does not appear to be a Cobalt 1.1 metric";
return nullptr;
}
for (EventCodesAggregateData &aggregate_data : *aggregate->mutable_by_event_code()) {
if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
event_codes.begin(), event_codes.end())) {
return aggregate_data.mutable_data();
}
}
if (aggregate->by_event_code_size() < event_vector_buffer_max_) {
EventCodesAggregateData *aggregate_data = aggregate->add_by_event_code();
aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
return aggregate_data->mutable_data();
}
return nullptr;
}
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
ReportAggregate *aggregate) {
if (!IsValidEventType(event_record.event()->type_case())) {
LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
<< " with aggregation procedure of type " << DebugString() << ".";
return;
}
EventCodeAggregate *event_code_aggregate;
if (IsDaily()) {
event_code_aggregate = GetEventCodeAggregate(event_record.event()->day_index(), aggregate);
} else {
event_code_aggregate = GetEventCodeAggregate(event_record.event()->hour_id(), aggregate);
}
if (AggregateData *aggregate_data = GetAggregateData(event_record, event_code_aggregate);
aggregate_data != nullptr) {
UpdateAggregateData(event_record, aggregate_data, event_code_aggregate);
}
}
lib::statusor::StatusOr<std::unique_ptr<Observation>> AggregationProcedure::GenerateObservation(
const util::TimeInfo &time_info, ReportAggregate *aggregate) {
std::vector<EventCodeAggregate *> aggregates;
uint32_t delete_before;
if (IsDaily()) {
uint32_t end_day_index = time_info.day_index;
uint32_t start_day_index = end_day_index - static_cast<uint32_t>(window_.days()) + 1;
delete_before = start_day_index;
for (uint32_t i = end_day_index; i >= start_day_index; i--) {
aggregates.push_back(GetEventCodeAggregate(i, aggregate));
}
} else {
delete_before = time_info.hour_id;
aggregates.push_back(GetEventCodeAggregate(time_info.hour_id, aggregate));
}
CB_ASSIGN_OR_RETURN(
auto observation,
GenerateSingleObservation(aggregates, SelectEventVectorsForObservation(aggregates)));
// Clean up aggregates that will never be used again.
if (IsDaily()) {
std::vector<uint32_t> days_to_delete;
for (const auto &day : aggregate->daily().by_day_index()) {
if (day.first <= delete_before) {
days_to_delete.push_back(day.first);
}
}
for (auto day : days_to_delete) {
aggregate->mutable_daily()->mutable_by_day_index()->erase(day);
}
} else {
std::vector<uint32_t> hours_to_delete;
for (const auto &hour : aggregate->hourly().by_hour_id()) {
if (hour.first <= delete_before) {
hours_to_delete.push_back(hour.first);
}
}
for (auto hour : hours_to_delete) {
aggregate->mutable_hourly()->mutable_by_hour_id()->erase(hour);
}
}
return observation;
}
std::set<std::vector<uint32_t>> AggregationProcedure::SelectEventVectorsForObservation(
const std::vector<EventCodeAggregate *> &aggregates) const {
std::set<std::vector<uint32_t>> event_vectors;
for (const auto &aggregate : aggregates) {
for (const EventCodesAggregateData &aggregate_data : aggregate->by_event_code()) {
if (event_vectors.size() == event_vector_buffer_max_) {
break;
}
event_vectors.insert(
{aggregate_data.event_codes().begin(), aggregate_data.event_codes().end()});
}
}
return event_vectors;
}
} // namespace cobalt::local_aggregation