blob: 8c261308770dcda2694d64434055044f8d702b9b [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include <algorithm>
#include <limits>
#include <memory>
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
NumericStatAggregationProcedure::NumericStatAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: AggregationProcedure(metric, report) {}
std::unique_ptr<NumericStatAggregationProcedure> NumericStatAggregationProcedure::New(
const MetricDefinition &metric, const ReportDefinition &report) {
switch (report.local_aggregation_procedure()) {
case ReportDefinition::SUM_PROCEDURE:
return std::make_unique<SumNumericStatAggregationProcedure>(metric, report);
case ReportDefinition::MIN_PROCEDURE:
return std::make_unique<MinNumericStatAggregationProcedure>(metric, report);
case ReportDefinition::MAX_PROCEDURE:
return std::make_unique<MaxNumericStatAggregationProcedure>(metric, report);
case ReportDefinition::MEAN:
return std::make_unique<MeanNumericStatAggregationProcedure>(metric, report);
case ReportDefinition::MEDIAN:
return std::make_unique<MedianNumericStatAggregationProcedure>(metric, report);
case ReportDefinition::PERCENTILE_N:
return std::make_unique<PercentileNNumericStatAggregationProcedure>(metric, report);
default:
LOG(ERROR) << "Unexpected aggregation procedure " << report.local_aggregation_procedure()
<< ". Nothing will be collected " << report.report_name();
return nullptr;
}
}
bool NumericStatAggregationProcedure::IsDaily() const {
switch (report_type()) {
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
return true;
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
return false;
default:
LOG(ERROR) << "Unexpected report_type for NumericStatAggregationProcedure: " << report_type()
<< ". Defaulting to IsDaily=true.";
return true;
}
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
NumericStatAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
aggregate_data->event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
aggregates_by_event_code[event_vector].push_back(&aggregate_data->data());
}
}
std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data;
data.reserve(aggregates_by_event_code.size());
for (auto [event_codes, aggregates] : aggregates_by_event_code) {
data.emplace_back(std::make_tuple(event_codes, CollectValue(aggregates)));
}
if (data.empty()) {
return {nullptr};
}
return logger::Encoder::EncodeIntegerObservation(data);
}
void SumNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
aggregate_data->set_integer_value(aggregate_data->integer_value() +
event_record.event()->integer_event().value());
}
int64_t SumNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = 0;
for (const auto *aggregate : aggregates) {
value += aggregate->integer_value();
}
return value;
}
void MinNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
aggregate_data->set_integer_value(
std::min(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
} else {
aggregate_data->set_integer_value(event_record.event()->integer_event().value());
}
}
int64_t MinNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::max();
for (const auto *aggregate : aggregates) {
value = std::min(value, aggregate->integer_value());
}
return value;
}
void MaxNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
aggregate_data->set_integer_value(
std::max(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
} else {
aggregate_data->set_integer_value(event_record.event()->integer_event().value());
}
}
int64_t MaxNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::min();
for (const auto *aggregate : aggregates) {
value = std::max(value, aggregate->integer_value());
}
return value;
}
void MeanNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
SumAndCount *sum_and_count = aggregate_data->mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
sum_and_count->set_count(sum_and_count->count() + 1);
}
int64_t MeanNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t sum = 0;
uint32_t count = 0;
for (const auto *aggregate : aggregates) {
sum += aggregate->sum_and_count().sum();
count += aggregate->sum_and_count().count();
}
return sum / count;
}
void MedianNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
aggregate_data->mutable_integer_values()->add_value(
event_record.event()->integer_event().value());
}
namespace {
std::vector<int64_t> CollectValues(const std::vector<const AggregateData *> &aggregates) {
std::vector<int64_t> values;
for (const auto *aggregate : aggregates) {
values.insert(values.end(), aggregate->integer_values().value().begin(),
aggregate->integer_values().value().end());
}
sort(values.begin(), values.end());
return values;
}
} // namespace
int64_t MedianNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
if (values.size() % 2 == 0) {
return (values[values.size() / 2 - 1] + values[values.size() / 2]) / 2;
}
return values[values.size() / 2];
}
int64_t PercentileNNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
auto index = static_cast<uint32_t>((static_cast<double>(percentile_n_) / 100.0) *
static_cast<double>(values.size()));
return values[index];
}
} // namespace cobalt::local_aggregation