| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h" |
| |
| #include "src/logger/encoder.h" |
| #include "src/pb/observation.pb.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| SelectFirstAggregationProcedure::SelectFirstAggregationProcedure(const MetricDefinition &metric, |
| const ReportDefinition &report) |
| : AggregationProcedure(metric, report), is_expedited_(report.expedited_sending()) { |
| // Record only the first event vector that is logged for each aggregation period. |
| SetEventVectorBufferMax(1); |
| } |
| |
| void SelectFirstAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data, |
| AggregationPeriodBucket * /*bucket*/) { |
| aggregate_data->mutable_at_least_once()->set_at_least_once(true); |
| } |
| |
| std::string SelectFirstAggregationProcedure::DebugString() const { return "SELECT_FIRST"; } |
| |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> |
| SelectFirstAggregationProcedure::GenerateSingleObservation( |
| const std::vector<AggregateDataToGenerate> &buckets, |
| const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) { |
| std::set<std::vector<uint32_t>> event_vectors_to_send; |
| for (const AggregateDataToGenerate &bucket : buckets) { |
| for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) { |
| std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(), |
| aggregate_data->event_codes().end()); |
| if (!event_vectors.count(event_vector)) { |
| continue; |
| } |
| if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) { |
| continue; |
| } |
| |
| event_vectors_to_send.emplace(std::move(event_vector)); |
| } |
| } |
| |
| std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data; |
| data.reserve(event_vectors_to_send.size()); |
| |
| for (const std::vector<uint32_t> &event_vector : event_vectors_to_send) { |
| data.emplace_back(std::make_tuple(event_vector, 1)); |
| } |
| |
| if (data.empty()) { |
| return {nullptr}; |
| } |
| |
| return logger::Encoder::EncodeIntegerObservation(data); |
| } |
| |
| void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate, |
| util::TimeInfo info, |
| uint64_t system_profile_hash) const { |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate = |
| GetAggregateDataToGenerate(info, aggregate); |
| auto data_to_generate_it = data_to_generate.find(system_profile_hash); |
| if (data_to_generate_it == data_to_generate.end()) { |
| // This shouldn't happen, since the storage is locked during observation generation, so the |
| // return value of GetAggregateDataToGenerate should not have changed from the call in |
| // GenerateObservations. |
| LOG(ERROR) << "Failed to find the aggregate data for observations that were committed with a " |
| "SystemProfile hash of: " |
| << system_profile_hash; |
| } else { |
| std::vector<AggregateDataToGenerate> buckets = std::move(data_to_generate_it->second); |
| const std::set<std::vector<uint32_t>> &event_vectors = |
| SelectEventVectorsForObservation(buckets); |
| for (AggregateDataToGenerate &bucket : buckets) { |
| for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) { |
| std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(), |
| aggregate_data->event_codes().end()); |
| if (!event_vectors.count(event_vector)) { |
| continue; |
| } |
| if (!aggregate_data->data().at_least_once().at_least_once()) { |
| continue; |
| } |
| if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) { |
| continue; |
| } |
| aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index); |
| } |
| } |
| } |
| util::TimeInfo clean_up_time_info = info; |
| if (is_expedited_) { |
| // Only cleanup data from before the current day, which can be reused for expedited metrics. |
| clean_up_time_info = util::TimeInfo::FromDayIndex(info.day_index - 1); |
| } |
| AggregationProcedure::ObservationsCommitted(aggregate, clean_up_time_info, system_profile_hash); |
| } |
| |
| } // namespace cobalt::local_aggregation |