blob: cef942452aa93bb9126e150152811e6d9031d37f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/status_macros.h"
namespace cobalt::local_aggregation {
SelectFirstAggregationProcedure::SelectFirstAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report)
: AggregationProcedure(metric, report), is_expedited_(report.expedited_sending()) {
// Record only the first event vector that is logged for each aggregation period.
SetEventVectorBufferMax(1);
}
void SelectFirstAggregationProcedure::UpdateAggregateData(
const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) {
aggregate_data->mutable_at_least_once()->set_at_least_once(true);
}
std::string SelectFirstAggregationProcedure::DebugString() const { return "SELECT_FIRST"; }
lib::statusor::StatusOr<std::unique_ptr<Observation>>
SelectFirstAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
std::set<std::vector<uint32_t>> event_vectors_to_send;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
aggregate_data->event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) {
continue;
}
event_vectors_to_send.emplace(std::move(event_vector));
}
}
std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data;
data.reserve(event_vectors_to_send.size());
for (const std::vector<uint32_t> &event_vector : event_vectors_to_send) {
data.emplace_back(std::make_tuple(event_vector, 1));
}
if (data.empty()) {
return {nullptr};
}
return logger::Encoder::EncodeIntegerObservation(data);
}
void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
util::TimeInfo info,
uint64_t system_profile_hash) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(info, aggregate);
auto data_to_generate_it = data_to_generate.find(system_profile_hash);
if (data_to_generate_it == data_to_generate.end()) {
// This shouldn't happen, since the storage is locked during observation generation, so the
// return value of GetAggregateDataToGenerate should not have changed from the call in
// GenerateObservations.
LOG(ERROR) << "Failed to find the aggregate data for observations that were committed with a "
"SystemProfile hash of: "
<< system_profile_hash;
} else {
std::vector<AggregateDataToGenerate> buckets = std::move(data_to_generate_it->second);
const std::set<std::vector<uint32_t>> &event_vectors =
SelectEventVectorsForObservation(buckets);
for (AggregateDataToGenerate &bucket : buckets) {
for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
aggregate_data->event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
if (!aggregate_data->data().at_least_once().at_least_once()) {
continue;
}
if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) {
continue;
}
aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
}
}
}
util::TimeInfo clean_up_time_info = info;
if (is_expedited_) {
// Only cleanup data from before the current day, which can be reused for expedited metrics.
clean_up_time_info = util::TimeInfo::FromDayIndex(info.day_index - 1);
}
AggregationProcedure::ObservationsCommitted(aggregate, clean_up_time_info, system_profile_hash);
}
} // namespace cobalt::local_aggregation