blob: 8da595fc42b94731ac590db671a97d6d61f28196 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/aggregation_procedures/at_least_once_string_aggregation_procedure.h"
#include <map>
#include <optional>
#include <set>
#include <tuple>
#include "src/lib/util/hash.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/status_macros.h"
namespace cobalt::local_aggregation {
using google::protobuf::Map;
AtLeastOnceStringAggregationProcedure::AtLeastOnceStringAggregationProcedure(
const MetricDefinition &metric, const ReportDefinition &report)
: AggregationProcedure(metric, report),
is_expedited_(report.expedited_sending()),
string_buffer_max_(report.string_buffer_max()) {}
void AtLeastOnceStringAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket &bucket) {
Map<uint32_t, UniqueString> *unique_strings =
aggregate_data.mutable_unique_strings()->mutable_unique_strings();
// TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer store
// them. Continue to use legacy hashes if they're already stored in the current aggregation period
// bucket. Use Farmhash Fingerprint 64 hashes otherwise.
bool use_legacy_hash = !bucket.string_hashes().empty();
std::string bytes =
use_legacy_hash
? util::FarmhashFingerprint(event_record.event()->string_event().string_value())
: util::FarmhashFingerprint64(event_record.event()->string_event().string_value());
const google::protobuf::RepeatedPtrField<std::string> &string_hashes =
use_legacy_hash ? bucket.string_hashes() : bucket.string_hashes_ff64();
// Check if the current string event value's byte representation has appeared before in
// the string hashes of the current period bucket, if so, then initialize a UniqueString message
// if the index of the string hash doesn't exist in the current UniqueString mapping.
for (int i = 0; i < string_hashes.size(); i++) {
if (string_hashes.at(i) == bytes) {
if (!unique_strings->contains(i)) {
(*unique_strings)[i] = UniqueString();
}
return;
}
}
if (string_hashes.size() < string_buffer_max_) {
// Add new entry
(*unique_strings)[string_hashes.size()] = UniqueString();
if (use_legacy_hash) {
bucket.add_string_hashes(bytes);
} else {
bucket.add_string_hashes_ff64(bytes);
}
}
}
void AtLeastOnceStringAggregationProcedure::MergeAggregateData(
AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
// Merge in the aggregate data's mapping of indexes to their count.
// This only works correctly if the AggregateData objects are both part of the same
// AggregationPeriodBucket, such that their string_index values both refer to the same repeated
// string_hashes field in the bucket.
for (const auto &[string_index, unique_string] :
aggregate_data.unique_strings().unique_strings()) {
if (merged_aggregate_data.unique_strings().unique_strings().contains(string_index)) {
if (unique_string.last_day_index() > merged_aggregate_data.unique_strings()
.unique_strings()
.at(string_index)
.last_day_index()) {
(*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index]
.set_last_day_index(unique_string.last_day_index());
}
} else {
(*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index] =
unique_string;
}
}
}
std::string AtLeastOnceStringAggregationProcedure::DebugString() const {
return "AT_LEAST_ONCE_STRING";
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
AtLeastOnceStringAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
// Different days/buckets may have the same event vectors that reported different and/or repeated
// unique strings. The observation being generated should aggregate the data so that there are
// unique event vectors and each event vector should have unique strings even across multiple
// days.
std::map<std::vector<uint32_t>, std::set<uint32_t>> data_to_send;
std::vector<std::string> hashes;
// seen hashes is a mapping from a string hash to it's hash index, which correlates to the index
// of string hashes in the hashes vector above.
std::map<std::string, uint32_t> seen_hashes;
// Observation generation should use Farmhash Fingerprint 64 if a multi-day report has a mix of
// legacy and FF64 across multiple days. Use legacy hashes only if buckets is not empty and all
// buckets stores legacy hashes.
//
// TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer
// store them.
const bool generate_observation_use_legacy_hash =
!buckets.empty() && std::all_of(buckets.begin(), buckets.end(), [](const auto &b) {
return static_cast<bool>(b.use_legacy_hash);
});
for (const AggregateDataToGenerate &bucket : buckets) {
// Drop aggregated data for any bucket that doesn't match the correct string hash.
// Note, buckets using FF64 string hashes are never expected to be dropped because the function
// takes precedence over the legacy function when determining
// `generate_observation_use_legacy_hash`.
if (generate_observation_use_legacy_hash != bucket.use_legacy_hash) {
continue;
}
for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
aggregate_data.event_codes().end());
if (!(event_vectors.count(event_vector) > 0)) {
continue;
}
// Create or add on to a unique event vector's string historgram indices.
for (const auto &[index, unique_string] :
aggregate_data.data().unique_strings().unique_strings()) {
if (unique_string.last_day_index() >= time_info.day_index) {
continue;
}
const std::string hash = bucket.string_hashes.at(static_cast<int>(index));
if (seen_hashes.count(hash) > 0) {
data_to_send[event_vector].insert(seen_hashes[hash]);
} else if (hashes.size() < string_buffer_max_) {
// If the string hash hasn't been observed yet, check that the string buffer max has not
// been reached before adding the unique string hash.
uint32_t hash_index = hashes.size();
data_to_send[event_vector].insert(hash_index);
hashes.push_back(hash);
seen_hashes[hash] = hash_index;
}
}
}
}
std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
data.reserve(data_to_send.size());
for (const auto &[event_vector, unique_strings_indices] : data_to_send) {
std::vector<std::tuple<uint32_t, int64_t>> histogram;
// Each unique string index should have a count of 1 for the histogram.
for (const uint32_t &string_index : unique_strings_indices) {
histogram.emplace_back(string_index, 1);
}
data.emplace_back(event_vector, std::move(histogram));
}
if (data.empty()) {
return {nullptr};
}
return logger::encoder::EncodeStringHistogramObservation(hashes, data,
generate_observation_use_legacy_hash);
}
void AtLeastOnceStringAggregationProcedure::ObservationsCommitted(
ReportAggregate &aggregate, util::TimeInfo info,
std::optional<uint64_t> system_profile_hash) const {
if (system_profile_hash.has_value()) {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(info, aggregate);
auto data_to_generate_it = data_to_generate.find(*system_profile_hash);
if (data_to_generate_it == data_to_generate.end()) {
// This shouldn't happen, since the storage is locked during observation generation, so the
// return value of GetAggregateDataToGenerate should not have changed from the call in
// GenerateObservations.
LOG(ERROR) << "Failed to find the aggregate data for observations that were committed with a "
"SystemProfile hash of: "
<< *system_profile_hash;
} else {
std::vector<AggregateDataToGenerate> buckets = std::move(data_to_generate_it->second);
const std::set<std::vector<uint32_t>> &event_vectors =
SelectEventVectorsForObservation(buckets);
for (AggregateDataToGenerate &bucket : buckets) {
for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
aggregate_data.event_codes().end());
if (!(event_vectors.count(event_vector) > 0)) {
continue;
}
for (auto &[index, unique_string] :
*aggregate_data.mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
if (unique_string.last_day_index() >= info.day_index) {
continue;
}
unique_string.set_last_day_index(info.day_index);
}
}
}
}
}
util::TimeInfo clean_up_time_info = info;
if (is_expedited_) {
// Only cleanup data from before the current day, which can be reused for expedited metrics.
clean_up_time_info = util::TimeInfo::FromDayIndex(info.day_index - 1);
}
AggregationProcedure::ObservationsCommitted(aggregate, clean_up_time_info, system_profile_hash);
}
} // namespace cobalt::local_aggregation