blob: eb02a78982dc3c72883d91e93c774a11eef88f15 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h"
#include <map>
#include <set>
#include <tuple>
#include "src/lib/util/hash.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/status_macros.h"
namespace cobalt::local_aggregation {
using google::protobuf::Map;
const MetricDefinition &metric, const ReportDefinition &report)
: AggregationProcedure(metric, report),
string_buffer_max_(metric.string_buffer_max()) {}
void AtLeastOnceStringAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket *bucket) {
Map<uint32_t, UniqueString> *unique_strings =
std::string bytes =
// Check if the current string event value's byte representation has appeared before in
// the string hashes of the current period bucket, if so, then initialize a UniqueString message
// if the index of the string hash doesn't exist in the current UniqueString mapping.
for (int i = 0; i < bucket->string_hashes_size(); i++) {
if (bucket->string_hashes(i) == bytes) {
if (!unique_strings->contains(i)) {
(*unique_strings)[i] = UniqueString();
if (bucket->string_hashes_size() < string_buffer_max_) {
// Add new entry
(*unique_strings)[bucket->string_hashes_size()] = UniqueString();
void AtLeastOnceStringAggregationProcedure::MergeAggregateData(
AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
// Merge in the aggregate data's mapping of indexes to their count.
// This only works correctly if the AggregateData objects are both part of the same
// AggregationPeriodBucket, such that their string_index values both refer to the same repeated
// string_hashes field in the bucket.
for (const auto &[string_index, unique_string] :
aggregate_data.unique_strings().unique_strings()) {
if (merged_aggregate_data->unique_strings().unique_strings().contains(string_index)) {
if (unique_string.last_day_index() > merged_aggregate_data->unique_strings()
.last_day_index()) {
} else {
(*merged_aggregate_data->mutable_unique_strings()->mutable_unique_strings())[string_index] =
std::string AtLeastOnceStringAggregationProcedure::DebugString() const {
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
// Different days/buckets may have the same event vectors that reported different and/or repeated
// unique strings. The observation being generated should aggregate the data so that there are
// unique event vectors and each event vector should have unique strings even across multiple
// days.
std::map<std::vector<uint32_t>, std::set<uint32_t>> data_to_send;
std::vector<std::string> hashes;
// seen hashes is a mapping from a string hash to it's hash index, which correlates to the index
// of string hashes in the hashes vector above.
std::map<std::string, uint32_t> seen_hashes;
for (const AggregateDataToGenerate &bucket : buckets) {
for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
if (!(event_vectors.count(event_vector) > 0)) {
// Create or add on to a unique event vector's string historgram indices.
for (const auto &[index, unique_string] :
aggregate_data->data().unique_strings().unique_strings()) {
if (unique_string.last_day_index() >= time_info.day_index) {
const std::string hash =<int>(index));
if (seen_hashes.count(hash) > 0) {
} else if (hashes.size() < string_buffer_max_) {
// If the string hash hasn't been observed yet, check that the string buffer max has not
// been reached before adding the unique string hash.
uint32_t hash_index = hashes.size();
seen_hashes[hash] = hash_index;
std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
for (const auto &[event_vector, unique_strings_indices] : data_to_send) {
std::vector<std::tuple<uint32_t, int64_t>> histogram;
// Each unique string index should have a count of 1 for the histogram.
for (const uint32_t &string_index : unique_strings_indices) {
histogram.emplace_back(string_index, 1);
data.emplace_back(event_vector, std::move(histogram));
if (data.empty()) {
return {nullptr};
return logger::Encoder::EncodeStringHistogramObservation(hashes, data);
void AtLeastOnceStringAggregationProcedure::ObservationsCommitted(
ReportAggregate *aggregate, util::TimeInfo info, uint64_t system_profile_hash) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(info, aggregate);
auto data_to_generate_it = data_to_generate.find(system_profile_hash);
if (data_to_generate_it == data_to_generate.end()) {
// This shouldn't happen, since the storage is locked during observation generation, so the
// return value of GetAggregateDataToGenerate should not have changed from the call in
// GenerateObservations.
LOG(ERROR) << "Failed to find the aggregate data for observations that were committed with a "
"SystemProfile hash of: "
<< system_profile_hash;
} else {
std::vector<AggregateDataToGenerate> buckets = std::move(data_to_generate_it->second);
const std::set<std::vector<uint32_t>> &event_vectors =
for (AggregateDataToGenerate &bucket : buckets) {
for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
if (!(event_vectors.count(event_vector) > 0)) {
for (auto &[index, unique_string] :
*aggregate_data->mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
if (unique_string.last_day_index() >= info.day_index) {
util::TimeInfo clean_up_time_info = info;
if (is_expedited_) {
// Only cleanup data from before the current day, which can be reused for expedited metrics.
clean_up_time_info = util::TimeInfo::FromDayIndex(info.day_index - 1);
AggregationProcedure::ObservationsCommitted(aggregate, clean_up_time_info, system_profile_hash);
} // namespace cobalt::local_aggregation