blob: 41c4ebe5ab8dc3bac992af24474bb36326996b90 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/aggregation_procedures/numeric_stat_aggregation_procedure.h"
#include <algorithm>
#include <limits>
#include <memory>
#include "src/lib/util/not_null.h"
#include "src/lib/util/status_builder.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/registry/metric_definition.pb.h"
#include "src/registry/report_definition.pb.h"
namespace cobalt::local_aggregation {
NumericStatAggregationProcedure::NumericStatAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: AggregationProcedure(metric, report) {}
lib::statusor::StatusOr<util::NotNullUniquePtr<NumericStatAggregationProcedure>>
NumericStatAggregationProcedure::New(const std::string &customer_name,
const std::string &project_name,
const MetricDefinition &metric,
const ReportDefinition &report) {
switch (report.local_aggregation_procedure()) {
case ReportDefinition::SUM_PROCEDURE:
return {util::MakeNotNullUniquePtr<SumNumericStatAggregationProcedure>(metric, report)};
case ReportDefinition::MIN_PROCEDURE:
return {util::MakeNotNullUniquePtr<MinNumericStatAggregationProcedure>(metric, report)};
case ReportDefinition::MAX_PROCEDURE:
return {util::MakeNotNullUniquePtr<MaxNumericStatAggregationProcedure>(metric, report)};
case ReportDefinition::MEAN:
return {util::MakeNotNullUniquePtr<MeanNumericStatAggregationProcedure>(metric, report)};
case ReportDefinition::MEDIAN:
return {util::MakeNotNullUniquePtr<MedianNumericStatAggregationProcedure>(metric, report)};
case ReportDefinition::PERCENTILE_N:
return {
util::MakeNotNullUniquePtr<PercentileNNumericStatAggregationProcedure>(metric, report)};
default:
return util::StatusBuilder(StatusCode::FAILED_PRECONDITION,
"Report has unexpected aggregation procedure: ")
.AppendMsg(report.local_aggregation_procedure())
.WithContext("Customer", customer_name)
.WithContext("Project", project_name)
.WithContexts(metric, report)
.LogError()
.Build();
}
}
bool NumericStatAggregationProcedure::IsDaily() const {
switch (report_type()) {
case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
return true;
case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
return false;
default:
LOG(ERROR) << "Unexpected report_type for NumericStatAggregationProcedure: " << report_type()
<< ". Defaulting to IsDaily=true.";
return true;
}
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
NumericStatAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
std::map<std::vector<uint32_t>, std::vector<std::reference_wrapper<const AggregateData>>>
aggregates_by_event_code;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
aggregates_by_event_code[event_vector].push_back(aggregate_data.data());
}
}
std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data;
data.reserve(aggregates_by_event_code.size());
for (auto [event_codes, aggregates] : aggregates_by_event_code) {
data.emplace_back(event_codes, CollectValue(aggregates));
}
if (data.empty()) {
return {nullptr};
}
return logger::encoder::EncodeIntegerObservation(data);
}
void SumNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket & /*bucket*/) {
aggregate_data.set_integer_value(aggregate_data.integer_value() +
event_record.event()->integer_event().value());
}
void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
merged_aggregate_data.set_integer_value(merged_aggregate_data.integer_value() +
aggregate_data.integer_value());
}
int64_t SumNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = 0;
for (const AggregateData &aggregate : aggregates) {
value += aggregate.integer_value();
}
return value;
}
void MinNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket & /*bucket*/) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
aggregate_data.set_integer_value(
std::min(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
} else {
aggregate_data.set_integer_value(event_record.event()->integer_event().value());
}
}
void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
merged_aggregate_data.set_integer_value(
std::min(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
} else {
merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
}
}
}
int64_t MinNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::max();
for (const AggregateData &aggregate : aggregates) {
value = std::min(value, aggregate.integer_value());
}
return value;
}
void MaxNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket & /*bucket*/) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
aggregate_data.set_integer_value(
std::max(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
} else {
aggregate_data.set_integer_value(event_record.event()->integer_event().value());
}
}
void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
merged_aggregate_data.set_integer_value(
std::max(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
} else {
merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
}
}
}
int64_t MaxNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::min();
for (const AggregateData &aggregate : aggregates) {
value = std::max(value, aggregate.integer_value());
}
return value;
}
void MeanNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket & /*bucket*/) {
SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
sum_and_count->set_count(sum_and_count->count() + 1);
}
void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
}
int64_t MeanNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t sum = 0;
uint32_t count = 0;
for (const AggregateData &aggregate : aggregates) {
sum += aggregate.sum_and_count().sum();
count += aggregate.sum_and_count().count();
}
return sum / count;
}
void MedianNumericStatAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket & /*bucket*/) {
aggregate_data.mutable_integer_values()->add_value(event_record.event()->integer_event().value());
}
void MedianNumericStatAggregationProcedure::MergeAggregateData(
AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
merged_aggregate_data.mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
}
namespace {
std::vector<int64_t> CollectValues(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values;
for (const AggregateData &aggregate : aggregates) {
values.insert(values.end(), aggregate.integer_values().value().begin(),
aggregate.integer_values().value().end());
}
sort(values.begin(), values.end());
return values;
}
} // namespace
int64_t MedianNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
if (values.size() % 2 == 0) {
return (values[values.size() / 2 - 1] + values[values.size() / 2]) / 2;
}
return values[values.size() / 2];
}
int64_t PercentileNNumericStatAggregationProcedure::CollectValue(
const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
auto index = static_cast<uint32_t>((static_cast<double>(percentile_n_) / 100.0) *
static_cast<double>(values.size()));
return values[index];
}
} // namespace cobalt::local_aggregation