| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation/aggregation_procedures/at_least_once_string_aggregation_procedure.h" |
| |
| #include <map> |
| #include <optional> |
| #include <set> |
| #include <tuple> |
| |
| #include "src/lib/util/hash.h" |
| #include "src/logger/encoder.h" |
| #include "src/pb/observation.pb.h" |
| #include "src/public/lib/statusor/status_macros.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| using google::protobuf::Map; |
| |
| AtLeastOnceStringAggregationProcedure::AtLeastOnceStringAggregationProcedure( |
| const MetricDefinition &metric, const ReportDefinition &report) |
| : AggregationProcedure(metric, report), |
| is_expedited_(report.expedited_sending()), |
| string_buffer_max_(report.string_buffer_max()) {} |
| |
| void AtLeastOnceStringAggregationProcedure::UpdateAggregateData( |
| const logger::EventRecord &event_record, AggregateData &aggregate_data, |
| AggregationPeriodBucket &bucket) { |
| Map<uint32_t, UniqueString> *unique_strings = |
| aggregate_data.mutable_unique_strings()->mutable_unique_strings(); |
| |
| // TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer store |
| // them. Continue to use legacy hashes if they're already stored in the current aggregation period |
| // bucket. Use Farmhash Fingerprint 64 hashes otherwise. |
| bool use_legacy_hash = !bucket.string_hashes().empty(); |
| std::string bytes = |
| use_legacy_hash |
| ? util::FarmhashFingerprint(event_record.event()->string_event().string_value()) |
| : util::FarmhashFingerprint64(event_record.event()->string_event().string_value()); |
| const google::protobuf::RepeatedPtrField<std::string> &string_hashes = |
| use_legacy_hash ? bucket.string_hashes() : bucket.string_hashes_ff64(); |
| |
| // Check if the current string event value's byte representation has appeared before in |
| // the string hashes of the current period bucket, if so, then initialize a UniqueString message |
| // if the index of the string hash doesn't exist in the current UniqueString mapping. |
| for (int i = 0; i < string_hashes.size(); i++) { |
| if (string_hashes.at(i) == bytes) { |
| if (!unique_strings->contains(i)) { |
| (*unique_strings)[i] = UniqueString(); |
| } |
| return; |
| } |
| } |
| |
| if (string_hashes.size() < string_buffer_max_) { |
| // Add new entry |
| (*unique_strings)[string_hashes.size()] = UniqueString(); |
| if (use_legacy_hash) { |
| bucket.add_string_hashes(bytes); |
| } else { |
| bucket.add_string_hashes_ff64(bytes); |
| } |
| } |
| } |
| |
| void AtLeastOnceStringAggregationProcedure::MergeAggregateData( |
| AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) { |
| // Merge in the aggregate data's mapping of indexes to their count. |
| // This only works correctly if the AggregateData objects are both part of the same |
| // AggregationPeriodBucket, such that their string_index values both refer to the same repeated |
| // string_hashes field in the bucket. |
| for (const auto &[string_index, unique_string] : |
| aggregate_data.unique_strings().unique_strings()) { |
| if (merged_aggregate_data.unique_strings().unique_strings().contains(string_index)) { |
| if (unique_string.last_day_index() > merged_aggregate_data.unique_strings() |
| .unique_strings() |
| .at(string_index) |
| .last_day_index()) { |
| (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index] |
| .set_last_day_index(unique_string.last_day_index()); |
| } |
| } else { |
| (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index] = |
| unique_string; |
| } |
| } |
| } |
| |
| std::string AtLeastOnceStringAggregationProcedure::DebugString() const { |
| return "AT_LEAST_ONCE_STRING"; |
| } |
| |
| lib::statusor::StatusOr<std::unique_ptr<Observation>> |
| AtLeastOnceStringAggregationProcedure::GenerateSingleObservation( |
| const std::vector<AggregateDataToGenerate> &buckets, |
| const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) { |
| // Different days/buckets may have the same event vectors that reported different and/or repeated |
| // unique strings. The observation being generated should aggregate the data so that there are |
| // unique event vectors and each event vector should have unique strings even across multiple |
| // days. |
| std::map<std::vector<uint32_t>, std::set<uint32_t>> data_to_send; |
| std::vector<std::string> hashes; |
| // seen hashes is a mapping from a string hash to it's hash index, which correlates to the index |
| // of string hashes in the hashes vector above. |
| std::map<std::string, uint32_t> seen_hashes; |
| |
| // Observation generation should use Farmhash Fingerprint 64 if a multi-day report has a mix of |
| // legacy and FF64 across multiple days. Use legacy hashes only if buckets is not empty and all |
| // buckets stores legacy hashes. |
| // |
| // TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer |
| // store them. |
| const bool generate_observation_use_legacy_hash = |
| !buckets.empty() && std::all_of(buckets.begin(), buckets.end(), [](const auto &b) { |
| return static_cast<bool>(b.use_legacy_hash); |
| }); |
| |
| for (const AggregateDataToGenerate &bucket : buckets) { |
| // Drop aggregated data for any bucket that doesn't match the correct string hash. |
| // Note, buckets using FF64 string hashes are never expected to be dropped because the function |
| // takes precedence over the legacy function when determining |
| // `generate_observation_use_legacy_hash`. |
| if (generate_observation_use_legacy_hash != bucket.use_legacy_hash) { |
| continue; |
| } |
| |
| for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) { |
| std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(), |
| aggregate_data.event_codes().end()); |
| if (!(event_vectors.count(event_vector) > 0)) { |
| continue; |
| } |
| |
| // Create or add on to a unique event vector's string historgram indices. |
| for (const auto &[index, unique_string] : |
| aggregate_data.data().unique_strings().unique_strings()) { |
| if (unique_string.last_day_index() >= time_info.day_index) { |
| continue; |
| } |
| |
| const std::string hash = bucket.string_hashes.at(static_cast<int>(index)); |
| if (seen_hashes.count(hash) > 0) { |
| data_to_send[event_vector].insert(seen_hashes[hash]); |
| } else if (hashes.size() < string_buffer_max_) { |
| // If the string hash hasn't been observed yet, check that the string buffer max has not |
| // been reached before adding the unique string hash. |
| uint32_t hash_index = hashes.size(); |
| data_to_send[event_vector].insert(hash_index); |
| hashes.push_back(hash); |
| seen_hashes[hash] = hash_index; |
| } |
| } |
| } |
| } |
| |
| std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data; |
| data.reserve(data_to_send.size()); |
| |
| for (const auto &[event_vector, unique_strings_indices] : data_to_send) { |
| std::vector<std::tuple<uint32_t, int64_t>> histogram; |
| // Each unique string index should have a count of 1 for the histogram. |
| for (const uint32_t &string_index : unique_strings_indices) { |
| histogram.emplace_back(string_index, 1); |
| } |
| |
| data.emplace_back(event_vector, std::move(histogram)); |
| } |
| |
| if (data.empty()) { |
| return {nullptr}; |
| } |
| |
| return logger::encoder::EncodeStringHistogramObservation(hashes, data, |
| generate_observation_use_legacy_hash); |
| } |
| |
| void AtLeastOnceStringAggregationProcedure::ObservationsCommitted( |
| ReportAggregate &aggregate, util::TimeInfo info, |
| std::optional<uint64_t> system_profile_hash) const { |
| if (system_profile_hash.has_value()) { |
| std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate = |
| GetAggregateDataToGenerate(info, aggregate); |
| auto data_to_generate_it = data_to_generate.find(*system_profile_hash); |
| if (data_to_generate_it == data_to_generate.end()) { |
| // This shouldn't happen, since the storage is locked during observation generation, so the |
| // return value of GetAggregateDataToGenerate should not have changed from the call in |
| // GenerateObservations. |
| LOG(ERROR) << "Failed to find the aggregate data for observations that were committed with a " |
| "SystemProfile hash of: " |
| << *system_profile_hash; |
| } else { |
| std::vector<AggregateDataToGenerate> buckets = std::move(data_to_generate_it->second); |
| const std::set<std::vector<uint32_t>> &event_vectors = |
| SelectEventVectorsForObservation(buckets); |
| for (AggregateDataToGenerate &bucket : buckets) { |
| for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) { |
| std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(), |
| aggregate_data.event_codes().end()); |
| if (!(event_vectors.count(event_vector) > 0)) { |
| continue; |
| } |
| |
| for (auto &[index, unique_string] : |
| *aggregate_data.mutable_data()->mutable_unique_strings()->mutable_unique_strings()) { |
| if (unique_string.last_day_index() >= info.day_index) { |
| continue; |
| } |
| |
| unique_string.set_last_day_index(info.day_index); |
| } |
| } |
| } |
| } |
| } |
| util::TimeInfo clean_up_time_info = info; |
| if (is_expedited_) { |
| // Only cleanup data from before the current day, which can be reused for expedited metrics. |
| clean_up_time_info = util::TimeInfo::FromDayIndex(info.day_index - 1); |
| } |
| AggregationProcedure::ObservationsCommitted(aggregate, clean_up_time_info, system_profile_hash); |
| } |
| |
| } // namespace cobalt::local_aggregation |