| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h" |
| |
| #include <algorithm> |
| #include <limits> |
| #include <memory> |
| |
| #include "src/local_aggregation_1_1/local_aggregation.pb.h" |
| #include "src/logger/encoder.h" |
| #include "src/pb/observation.pb.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| #include "src/registry/metric_definition.pb.h" |
| #include "src/registry/report_definition.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| NumericStatAggregationProcedure::NumericStatAggregationProcedure(const MetricDefinition &metric, |
| const ReportDefinition &report) |
| : AggregationProcedure(metric, report) {} |
| |
| std::unique_ptr<NumericStatAggregationProcedure> NumericStatAggregationProcedure::New( |
| const MetricDefinition &metric, const ReportDefinition &report) { |
| switch (report.local_aggregation_procedure()) { |
| case ReportDefinition::SUM_PROCEDURE: |
| return std::make_unique<SumNumericStatAggregationProcedure>(metric, report); |
| case ReportDefinition::MIN_PROCEDURE: |
| return std::make_unique<MinNumericStatAggregationProcedure>(metric, report); |
| case ReportDefinition::MAX_PROCEDURE: |
| return std::make_unique<MaxNumericStatAggregationProcedure>(metric, report); |
| case ReportDefinition::MEAN: |
| return std::make_unique<MeanNumericStatAggregationProcedure>(metric, report); |
| case ReportDefinition::MEDIAN: |
| return std::make_unique<MedianNumericStatAggregationProcedure>(metric, report); |
| case ReportDefinition::PERCENTILE_N: |
| return std::make_unique<PercentileNNumericStatAggregationProcedure>(metric, report); |
| default: |
| LOG(ERROR) << "Unexpected aggregation procedure " << report.local_aggregation_procedure() |
| << ". Nothing will be collected " << report.report_name(); |
| return nullptr; |
| } |
| } |
| |
| bool NumericStatAggregationProcedure::IsDaily() const { |
| switch (report_type()) { |
| case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS: |
| case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS: |
| return true; |
| case ReportDefinition::HOURLY_VALUE_HISTOGRAMS: |
| case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS: |
| return false; |
| default: |
| LOG(ERROR) << "Unexpected report_type for NumericStatAggregationProcedure: " << report_type() |
| << ". Defaulting to IsDaily=true."; |
| return true; |
| } |
| } |
| |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> |
| NumericStatAggregationProcedure::GenerateSingleObservation( |
| const std::vector<AggregateDataToGenerate> &buckets, |
| const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) { |
| std::map<std::vector<uint32_t>, std::vector<std::reference_wrapper<const AggregateData>>> |
| aggregates_by_event_code; |
| |
| for (const AggregateDataToGenerate &bucket : buckets) { |
| for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) { |
| std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(), |
| aggregate_data.event_codes().end()); |
| if (!event_vectors.count(event_vector)) { |
| continue; |
| } |
| aggregates_by_event_code[event_vector].push_back(aggregate_data.data()); |
| } |
| } |
| |
| std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data; |
| data.reserve(aggregates_by_event_code.size()); |
| for (auto [event_codes, aggregates] : aggregates_by_event_code) { |
| data.emplace_back(std::make_tuple(event_codes, CollectValue(aggregates))); |
| } |
| |
| if (data.empty()) { |
| return {nullptr}; |
| } |
| |
| return logger::Encoder::EncodeIntegerObservation(data); |
| } |
| |
| void SumNumericStatAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket & /*bucket*/) { |
| aggregate_data.set_integer_value(aggregate_data.integer_value() + |
| event_record.event()->integer_event().value()); |
| } |
| |
| void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data, |
| const AggregateData &aggregate_data) { |
| merged_aggregate_data.set_integer_value(merged_aggregate_data.integer_value() + |
| aggregate_data.integer_value()); |
| } |
| |
| int64_t SumNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| int64_t value = 0; |
| for (const AggregateData &aggregate : aggregates) { |
| value += aggregate.integer_value(); |
| } |
| return value; |
| } |
| |
| void MinNumericStatAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket & /*bucket*/) { |
| if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| aggregate_data.set_integer_value( |
| std::min(aggregate_data.integer_value(), event_record.event()->integer_event().value())); |
| } else { |
| aggregate_data.set_integer_value(event_record.event()->integer_event().value()); |
| } |
| } |
| |
| void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data, |
| const AggregateData &aggregate_data) { |
| if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| merged_aggregate_data.set_integer_value( |
| std::min(aggregate_data.integer_value(), merged_aggregate_data.integer_value())); |
| } else { |
| merged_aggregate_data.set_integer_value(aggregate_data.integer_value()); |
| } |
| } |
| } |
| |
| int64_t MinNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| int64_t value = std::numeric_limits<int64_t>::max(); |
| for (const AggregateData &aggregate : aggregates) { |
| value = std::min(value, aggregate.integer_value()); |
| } |
| return value; |
| } |
| |
| void MaxNumericStatAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket & /*bucket*/) { |
| if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| aggregate_data.set_integer_value( |
| std::max(aggregate_data.integer_value(), event_record.event()->integer_event().value())); |
| } else { |
| aggregate_data.set_integer_value(event_record.event()->integer_event().value()); |
| } |
| } |
| |
| void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data, |
| const AggregateData &aggregate_data) { |
| if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) { |
| merged_aggregate_data.set_integer_value( |
| std::max(aggregate_data.integer_value(), merged_aggregate_data.integer_value())); |
| } else { |
| merged_aggregate_data.set_integer_value(aggregate_data.integer_value()); |
| } |
| } |
| } |
| |
| int64_t MaxNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| int64_t value = std::numeric_limits<int64_t>::min(); |
| for (const AggregateData &aggregate : aggregates) { |
| value = std::max(value, aggregate.integer_value()); |
| } |
| return value; |
| } |
| |
| void MeanNumericStatAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket & /*bucket*/) { |
| SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count(); |
| sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value()); |
| sum_and_count->set_count(sum_and_count->count() + 1); |
| } |
| |
| void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data, |
| const AggregateData &aggregate_data) { |
| SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count(); |
| sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum()); |
| sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count()); |
| } |
| |
| int64_t MeanNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| int64_t sum = 0; |
| uint32_t count = 0; |
| for (const AggregateData &aggregate : aggregates) { |
| sum += aggregate.sum_and_count().sum(); |
| count += aggregate.sum_and_count().count(); |
| } |
| return sum / count; |
| } |
| |
| void MedianNumericStatAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket & /*bucket*/) { |
| aggregate_data.mutable_integer_values()->add_value(event_record.event()->integer_event().value()); |
| } |
| |
| void MedianNumericStatAggregationProcedure::MergeAggregateData( |
| AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) { |
| merged_aggregate_data.mutable_integer_values()->MergeFrom(aggregate_data.integer_values()); |
| } |
| |
| namespace { |
| std::vector<int64_t> CollectValues( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| std::vector<int64_t> values; |
| for (const AggregateData &aggregate : aggregates) { |
| values.insert(values.end(), aggregate.integer_values().value().begin(), |
| aggregate.integer_values().value().end()); |
| } |
| |
| sort(values.begin(), values.end()); |
| |
| return values; |
| } |
| } // namespace |
| |
| int64_t MedianNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| std::vector<int64_t> values = CollectValues(aggregates); |
| |
| if (values.size() % 2 == 0) { |
| return (values[values.size() / 2 - 1] + values[values.size() / 2]) / 2; |
| } |
| return values[values.size() / 2]; |
| } |
| |
| int64_t PercentileNNumericStatAggregationProcedure::CollectValue( |
| const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) { |
| std::vector<int64_t> values = CollectValues(aggregates); |
| |
| auto index = static_cast<uint32_t>((static_cast<double>(percentile_n_) / 100.0) * |
| static_cast<double>(values.size())); |
| return values[index]; |
| } |
| |
| } // namespace cobalt::local_aggregation |