blob: c40bf324e4087bfab891d6123d408a46597a5ba8 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/aggregation_procedures/string_histogram_aggregation_procedure.h"
#include <tuple>
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/logger/encoder.h"
#include "src/pb/observation.pb.h"
#include "src/public/lib/statusor/status_macros.h"
namespace cobalt::local_aggregation {
void StringHistogramAggregationProcedure::UpdateAggregateData(
const logger::EventRecord &event_record, AggregateData &aggregate_data,
AggregationPeriodBucket &bucket) {
StringHistogram *histogram = aggregate_data.mutable_string_histogram();
// TODO(https://fxbug.dev/322409910): Delete usage of legacy hash after clients no longer store
// them. Continue to use legacy hashes if they're already stored in the current aggregation period
// bucket. Use Farmhash Fingerprint 64 hashes otherwise.
bool use_legacy_hash = !bucket.string_hashes().empty();
std::string bytes =
use_legacy_hash
? util::FarmhashFingerprint(event_record.event()->string_event().string_value())
: util::FarmhashFingerprint64(event_record.event()->string_event().string_value());
const google::protobuf::RepeatedPtrField<std::string> &string_hashes =
use_legacy_hash ? bucket.string_hashes() : bucket.string_hashes_ff64();
for (int i = 0; i < string_hashes.size(); i++) {
if (string_hashes.at(i) == bytes) {
(*histogram->mutable_histogram())[i] += 1;
return;
}
}
if (string_hashes.size() < string_buffer_max_) {
// Add new entry
(*histogram->mutable_histogram())[string_hashes.size()] += 1;
if (use_legacy_hash) {
bucket.add_string_hashes(bytes);
} else {
bucket.add_string_hashes_ff64(bytes);
}
}
}
void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
// Merge in the aggregate data's mapping of indexes to their count.
// This only works correctly if the AggregateData objects are both part of the same
// AggregationPeriodBucket, such that their string_index values both refer to the same repeated
// string_hashes field in the bucket.
for (const auto &[string_index, count] : aggregate_data.string_histogram().histogram()) {
(*merged_aggregate_data.mutable_string_histogram()->mutable_histogram())[string_index] += count;
}
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
StringHistogramAggregationProcedure::GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) {
std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
data.reserve(bucket.aggregate_data.size());
for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
const StringHistogram &histogram = aggregate_data.data().string_histogram();
std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
aggregate_data.event_codes().end());
std::vector<std::tuple<uint32_t, int64_t>> event_code_histogram;
for (const auto &[index, count] : histogram.histogram()) {
event_code_histogram.emplace_back(index, count);
}
data.emplace_back(event_codes, event_code_histogram);
}
if (data.empty()) {
return {nullptr};
}
std::vector<std::string> hashes;
for (const std::string &hash : bucket.string_hashes) {
hashes.push_back(hash);
}
return logger::encoder::EncodeStringHistogramObservation(hashes, data, bucket.use_legacy_hash);
}
std::string StringHistogramAggregationProcedure::DebugString() const { return "STRING_HISTOGRAM"; }
} // namespace cobalt::local_aggregation