[NoCheck] Use references in AggregationProcedure
Plain references should be used instead of pointers when the parameter is required
- Removes 1 CHECK call
- Makes AggregateDataToGenerate move only
Change-Id: I9a19b1ab44e53aae0b34a1bd42a8d5fe67035560
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/658845
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/lib/util/status_builder.h b/src/lib/util/status_builder.h
index cf08df8..e59e712 100644
--- a/src/lib/util/status_builder.h
+++ b/src/lib/util/status_builder.h
@@ -239,6 +239,7 @@
PB_ENUM_FORMATTER(MetricDefinition::MetricType);
PB_ENUM_FORMATTER(ReportDefinition::OnDeviceAggregationType);
PB_ENUM_FORMATTER(ReportDefinition::ReportType);
+PB_ENUM_FORMATTER(SystemProfileSelectionPolicy);
#undef PB_ENUM_FORMATTER
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
index 6e2f8b9..ee94f35 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
@@ -4,6 +4,7 @@
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
+#include <functional>
#include <memory>
#include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h"
@@ -102,12 +103,12 @@
window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period()));
}
-AggregationPeriodBucket *AggregationProcedure::GetAggregationPeriodBucket(
- uint32_t time, ReportAggregate *aggregate) const {
+AggregationPeriodBucket &AggregationProcedure::GetAggregationPeriodBucket(
+ const util::TimeInfo &time, ReportAggregate &aggregate) const {
if (IsDaily()) {
- return &(*aggregate->mutable_daily()->mutable_by_day_index())[time];
+ return (*aggregate.mutable_daily()->mutable_by_day_index())[time.day_index];
}
- return &(*aggregate->mutable_hourly()->mutable_by_hour_id())[time];
+ return (*aggregate.mutable_hourly()->mutable_by_hour_id())[time.hour_id];
}
bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
@@ -127,9 +128,11 @@
}
}
-AggregateData *AggregationProcedure::GetAggregateData(
- const logger::EventRecord &event_record, AggregationPeriodBucket *bucket,
- uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const {
+lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
+AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record,
+ AggregationPeriodBucket &bucket,
+ uint64_t system_profile_hash,
+ std::chrono::system_clock::time_point system_time) const {
google::protobuf::RepeatedField<uint32_t> event_codes;
switch (metric_type_) {
@@ -146,9 +149,13 @@
event_codes = event_record.event()->string_event().event_code();
break;
default:
- LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
- << " does not appear to be a Cobalt 1.1 metric";
- return nullptr;
+ return util::StatusBuilder(StatusCode::INVALID_ARGUMENT)
+ .AppendMsg(DebugString())
+ .AppendMsg(": Metric of type")
+ .AppendMsg(metric_type_)
+ .AppendMsg(" does not appear to be a Cobalt 1.1 metric")
+ .LogError()
+ .Build();
}
int64_t system_timestamp = util::ToUnixSeconds(system_time);
@@ -157,14 +164,14 @@
system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) {
SystemProfileAggregate *system_profile_aggregate;
- if (bucket->system_profile_aggregates_size() >= 1) {
- if (bucket->system_profile_aggregates_size() > 1) {
- LOG(ERROR) << "There are " << bucket->system_profile_aggregates_size()
+ if (bucket.system_profile_aggregates_size() >= 1) {
+ if (bucket.system_profile_aggregates_size() > 1) {
+ LOG(ERROR) << "There are " << bucket.system_profile_aggregates_size()
<< " system profile aggregates for a report with system_profile_selection "
<< system_profile_selection_policy_ << ". There should be only one.";
// Use the first aggregate anyway.
}
- system_profile_aggregate = bucket->mutable_system_profile_aggregates(0);
+ system_profile_aggregate = bucket.mutable_system_profile_aggregates(0);
if ((system_profile_selection_policy_ == SELECT_LAST ||
system_profile_selection_policy_ == SELECT_DEFAULT) &&
system_profile_aggregate->system_profile_hash() != system_profile_hash) {
@@ -172,59 +179,65 @@
}
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
} else {
- system_profile_aggregate = bucket->add_system_profile_aggregates();
+ system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
}
- return GetAggregateData(system_profile_aggregate, event_codes);
+ return GetAggregateData(*system_profile_aggregate, event_codes);
}
if (system_profile_selection_policy_ == REPORT_ALL) {
SystemProfileAggregate *system_profile_aggregate = nullptr;
- for (SystemProfileAggregate &aggregate : *(bucket->mutable_system_profile_aggregates())) {
+ for (SystemProfileAggregate &aggregate : *(bucket.mutable_system_profile_aggregates())) {
if (aggregate.system_profile_hash() == system_profile_hash) {
system_profile_aggregate = &aggregate;
}
}
if (system_profile_aggregate == nullptr) {
- system_profile_aggregate = bucket->add_system_profile_aggregates();
+ system_profile_aggregate = bucket.add_system_profile_aggregates();
system_profile_aggregate->set_system_profile_hash(system_profile_hash);
system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
}
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
- return GetAggregateData(system_profile_aggregate, event_codes);
+ return GetAggregateData(*system_profile_aggregate, event_codes);
}
- return nullptr;
+ return util::StatusBuilder(StatusCode::NOT_FOUND, "Unknown system_profile_selection_policy ")
+ .AppendMsg(system_profile_selection_policy_)
+ .LogError()
+ .Build();
}
-AggregateData *AggregationProcedure::GetAggregateData(
- SystemProfileAggregate *system_profile_aggregate,
+lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
+AggregationProcedure::GetAggregateData(
+ SystemProfileAggregate &system_profile_aggregate,
google::protobuf::RepeatedField<uint32_t> event_codes) const {
- CHECK(system_profile_aggregate != nullptr)
- << "Null SystemProfileAggregate passed to GetAggregateData";
for (EventCodesAggregateData &aggregate_data :
- *system_profile_aggregate->mutable_by_event_code()) {
+ *system_profile_aggregate.mutable_by_event_code()) {
// Find the event codes that match the event's.
if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
event_codes.begin(), event_codes.end())) {
- return aggregate_data.mutable_data();
+ return std::ref(*aggregate_data.mutable_data());
}
}
// Event codes were not found, so add them as a new entry.
- if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
- EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
+ if (system_profile_aggregate.by_event_code_size() < event_vector_buffer_max_) {
+ EventCodesAggregateData *aggregate_data = system_profile_aggregate.add_by_event_code();
aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
- return aggregate_data->mutable_data();
+ return std::ref(*aggregate_data->mutable_data());
}
- return nullptr;
+
+ return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
+ "SystemProfileAggregate has reached event_vector_buffer_max")
+ .WithContext("event_vector_buffer_max", event_vector_buffer_max_)
+ .Build();
}
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
- ReportAggregate *aggregate, uint64_t system_profile_hash,
+ ReportAggregate &aggregate, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) {
if (!IsValidEventType(event_record.event()->type_case())) {
LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
@@ -232,46 +245,44 @@
return;
}
- AggregationPeriodBucket *bucket;
- if (IsDaily()) {
- bucket = GetAggregationPeriodBucket(event_record.event()->day_index(), aggregate);
- } else {
- bucket = GetAggregationPeriodBucket(event_record.event()->hour_id(), aggregate);
- }
+ std::reference_wrapper<AggregationPeriodBucket> bucket =
+ GetAggregationPeriodBucket(event_record.GetTimeInfo(), aggregate);
- if (AggregateData *aggregate_data =
+ if (lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> aggregate_data =
GetAggregateData(event_record, bucket, system_profile_hash, system_time);
- aggregate_data != nullptr) {
- UpdateAggregateData(event_record, aggregate_data, bucket);
+ aggregate_data.ok()) {
+ UpdateAggregateData(event_record, aggregate_data.ConsumeValueOrDie(), bucket);
}
}
-void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
const SystemProfileAggregate &aggregate) {
if (system_profile_selection_policy_ == SELECT_FIRST) {
- if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
- merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
- merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+ if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
+ merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
+ merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
}
- if (aggregate.last_seen_timestamp() > merged_aggregate->last_seen_timestamp()) {
- merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+ if (aggregate.last_seen_timestamp() > merged_aggregate.last_seen_timestamp()) {
+ merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
}
} else { // SELECT_LAST or SELECT_DEFAULT
- if (aggregate.last_seen_timestamp() >= merged_aggregate->last_seen_timestamp()) {
- merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
- merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+ if (aggregate.last_seen_timestamp() >= merged_aggregate.last_seen_timestamp()) {
+ merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
+ merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
}
- if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
- merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+ if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
+ merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
}
}
for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) {
// Find or create the corresponding AggregateData in the merged system profile aggregate.
- AggregateData *data = GetAggregateData(merged_aggregate, aggregate_data.event_codes());
- // nullptr means there is no room in event_vector_buffer_max_ for more event codes.
- if (data != nullptr) {
- MergeAggregateData(data, aggregate_data.data());
+ lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> data =
+ GetAggregateData(merged_aggregate, aggregate_data.event_codes());
+ // A non-OK status here means that there is no room in event_vector_buffer_max_ for the event
+ // codes.
+ if (data.ok()) {
+ MergeAggregateData(data.ValueOrDie(), aggregate_data.data());
}
}
}
@@ -291,7 +302,7 @@
std::map<uint64_t, std::vector<AggregateDataToGenerate>>
AggregationProcedure::GetAggregateDataToGenerate(const util::TimeInfo &time_info,
- ReportAggregate *aggregate) const {
+ ReportAggregate &aggregate) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate;
util::TimeInfo start_time_info = GetStartTimeInfo(time_info);
@@ -304,10 +315,10 @@
uint32_t first_seen_timestamp = UINT32_MAX;
std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
- if (!aggregate->daily().by_day_index().contains(i)) {
+ if (!aggregate.daily().by_day_index().contains(i)) {
continue;
}
- AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+ AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
@@ -325,7 +336,7 @@
first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
}
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
- agg_to_generate.aggregate_data.push_back(&data);
+ agg_to_generate.aggregate_data.push_back(data);
}
}
if (!agg_to_generate.aggregate_data.empty()) {
@@ -337,15 +348,15 @@
}
} else if (system_profile_selection_policy_ == REPORT_ALL) {
for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
- if (!aggregate->daily().by_day_index().contains(i)) {
+ if (!aggregate.daily().by_day_index().contains(i)) {
continue;
}
- AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+ AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
- agg_to_generate.aggregate_data.push_back(&data);
+ agg_to_generate.aggregate_data.push_back(data);
}
data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
std::move(agg_to_generate));
@@ -353,14 +364,14 @@
}
}
} else {
- if (aggregate->hourly().by_hour_id().contains(start_time_info.hour_id)) {
+ if (aggregate.hourly().by_hour_id().contains(start_time_info.hour_id)) {
AggregationPeriodBucket *agg =
- &(*aggregate->mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
+ &(*aggregate.mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
for (SystemProfileAggregate &system_profile_aggregate :
*agg->mutable_system_profile_aggregates()) {
AggregateDataToGenerate agg_to_generate{.string_hashes = agg->string_hashes()};
for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
- agg_to_generate.aggregate_data.push_back(&data);
+ agg_to_generate.aggregate_data.push_back(data);
}
data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
std::move(agg_to_generate));
@@ -372,7 +383,7 @@
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>>
AggregationProcedure::GenerateObservations(const util::TimeInfo &time_info,
- ReportAggregate *aggregate) {
+ ReportAggregate &aggregate) {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(time_info, aggregate);
@@ -388,7 +399,7 @@
return observations;
}
-void AggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void AggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
util::TimeInfo time_info,
uint64_t system_profile_hash) const {
util::TimeInfo delete_before = GetStartTimeInfo(time_info);
@@ -396,32 +407,32 @@
// Clean up aggregates that will never be used again.
if (IsDaily()) {
std::vector<uint32_t> days_to_delete;
- for (const auto &day : aggregate->daily().by_day_index()) {
+ for (const auto &day : aggregate.daily().by_day_index()) {
if (day.first <= delete_before.day_index) {
days_to_delete.push_back(day.first);
}
}
for (auto day : days_to_delete) {
- aggregate->mutable_daily()->mutable_by_day_index()->erase(day);
+ aggregate.mutable_daily()->mutable_by_day_index()->erase(day);
}
} else {
std::vector<uint32_t> hours_to_delete;
- for (const auto &hour : aggregate->hourly().by_hour_id()) {
+ for (const auto &hour : aggregate.hourly().by_hour_id()) {
if (hour.first <= delete_before.hour_id) {
hours_to_delete.push_back(hour.first);
}
}
for (auto hour : hours_to_delete) {
- aggregate->mutable_hourly()->mutable_by_hour_id()->erase(hour);
+ aggregate.mutable_hourly()->mutable_by_hour_id()->erase(hour);
}
}
if (IsDaily()) {
- aggregate->mutable_daily()->set_last_day_index(time_info.day_index);
+ aggregate.mutable_daily()->set_last_day_index(time_info.day_index);
} else {
- aggregate->mutable_hourly()->set_last_hour_id(time_info.hour_id);
+ aggregate.mutable_hourly()->set_last_hour_id(time_info.hour_id);
}
}
@@ -429,12 +440,12 @@
const std::vector<AggregateDataToGenerate> &buckets) const {
std::set<std::vector<uint32_t>> event_vectors;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
if (event_vectors.size() == event_vector_buffer_max_) {
return event_vectors;
}
event_vectors.insert(
- {aggregate_data->event_codes().begin(), aggregate_data->event_codes().end()});
+ {aggregate_data.event_codes().begin(), aggregate_data.event_codes().end()});
}
}
return event_vectors;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
index ce1ca12..cc7bf0d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
@@ -31,8 +31,13 @@
// generating observations, all AggregateDataToGenerate and all the aggregate_data they contain must
// be for the same system profile.
struct AggregateDataToGenerate {
- std::vector<EventCodesAggregateData *> aggregate_data;
+ std::vector<std::reference_wrapper<EventCodesAggregateData>> aggregate_data;
const google::protobuf::RepeatedPtrField<std::string> &string_hashes;
+
+ // Make the struct move only
+ AggregateDataToGenerate(AggregateDataToGenerate const &) = delete;
+ AggregateDataToGenerate &operator=(AggregateDataToGenerate const &) = delete;
+ AggregateDataToGenerate(AggregateDataToGenerate &&) = default;
};
// A container for the observation, with the hash of the system profile that it is for.
@@ -77,7 +82,7 @@
// |aggregate|: A mutable ReportAggregate object.
// |system_profile_hash|: The hash of the filtered system profile this event is for.
// |system_time|: The system time that the event occurred at.
- void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate,
+ void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate &aggregate,
uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time);
@@ -87,7 +92,7 @@
// |merged_aggregate|. Both system profile aggregates must be included in the same
// AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
// merging the AggregateData in each bucket.
- void MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+ void MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
const SystemProfileAggregate &aggregate);
// GenerateObservations is the public interface for generating observations. It handles
@@ -101,7 +106,7 @@
// and the system profile hash to use when generating their ObservationMetadata.
//
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations(
- const util::TimeInfo &time_info, ReportAggregate *aggregate);
+ const util::TimeInfo &time_info, ReportAggregate &aggregate);
[[nodiscard]] virtual std::string DebugString() const = 0;
@@ -132,7 +137,7 @@
// observations, they should be deleted. If an hour_id or a day_index is passed to this function,
// an earlier hour_id or day_index will never be passed and the data associated with that can be
// deleted.
- virtual void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo time_info,
+ virtual void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo time_info,
uint64_t system_profile_hash) const;
protected:
@@ -144,15 +149,15 @@
// |aggregate_data|: A mutable AggregateData object.
// |bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|.
virtual void UpdateAggregateData(const logger::EventRecord &event_record,
- AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) = 0;
+ AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) = 0;
// Merge two instances of the aggregate data for this procedure.
//
// The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
// AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
// bucket to be preserved accurately.
- virtual void MergeAggregateData(AggregateData *merged_aggregate_data,
+ virtual void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) = 0;
// GenerateSingleObservation generates an observation for the given report at the given time.
@@ -181,7 +186,7 @@
// The return value is a map from the system_profile_hash, to the data to use to generate the
// observation for that system profile.
std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate(
- const util::TimeInfo &time_info, ReportAggregate *aggregate) const;
+ const util::TimeInfo &time_info, ReportAggregate &aggregate) const;
// Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
// for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period.
@@ -196,27 +201,28 @@
// Returns a mutable reference to the AggregationPeriodBucket associated with the given time
// specifier.
//
- // |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics)
+ // |time|: util::TimeInfo to fetch the bucket for.
// |aggregate|: A mutable reference to a ReportAggregate object.
- AggregationPeriodBucket *GetAggregationPeriodBucket(uint32_t time,
- ReportAggregate *aggregate) const;
+ AggregationPeriodBucket &GetAggregationPeriodBucket(const util::TimeInfo &time,
+ ReportAggregate &aggregate) const;
// Returns a pointer to the AggregateData of |bucket| that |event_record| should be added to,
// if |bucket| has capacity for the event vector of |event_record|. In addition, adds
// the event vector of |event_record| to |bucket|'s event_vectors buffer if it has capacity and
- // the event vector is not yet present. Returns a null pointer if |bucket| does not have
+ // the event vector is not yet present. Returns a non-OK status if |bucket| does not have
// capacity.
- AggregateData *GetAggregateData(const logger::EventRecord &event_record,
- AggregationPeriodBucket *bucket, uint64_t system_profile_hash,
- std::chrono::system_clock::time_point system_time) const;
+ lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
+ const logger::EventRecord &event_record, AggregationPeriodBucket &bucket,
+ uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const;
// Returns a pointer to the AggregateData of |system_profile_aggregate| that data for
// |event_codes| should be added to, if |system_profile_aggregate| has capacity for the event
// vector of |event_codes|. If it is not present, adds the event vector of |event_codes| to
- // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a null pointer
+ // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a non-OK status
// if |system_profile_aggregate| does not have capacity for new event codes.
- AggregateData *GetAggregateData(SystemProfileAggregate *system_profile_aggregate,
- google::protobuf::RepeatedField<uint32_t> event_codes) const;
+ lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
+ SystemProfileAggregate &system_profile_aggregate,
+ google::protobuf::RepeatedField<uint32_t> event_codes) const;
// Returns the starting TimeInfo to use aggregate data from for this report, when processing the
// current_time_info.
@@ -236,12 +242,12 @@
~UnimplementedAggregationProcedure() override = default;
void UpdateAggregateData(const logger::EventRecord & /* event_record */,
- AggregateData * /*aggregate_data*/,
- AggregationPeriodBucket * /* bucket */) override {
+ AggregateData & /*aggregate_data*/,
+ AggregationPeriodBucket & /* bucket */) override {
LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
}
- void MergeAggregateData(AggregateData * /*merged_aggregate_data*/,
+ void MergeAggregateData(AggregateData & /*merged_aggregate_data*/,
const AggregateData & /*aggregate_data*/) override {
LOG(ERROR) << "MergeAggregateData is UNIMPLEMENTED for " << DebugString();
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
index fc858b4..54ad1fc 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
@@ -60,8 +60,8 @@
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
- AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
- count_aggregation_procedure.get(), &aggregate);
+ AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
+ aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -89,13 +89,13 @@
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -126,13 +126,13 @@
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -163,13 +163,13 @@
uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
- count_aggregation_procedure.get(), &aggregate,
+ *count_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -217,7 +217,7 @@
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate);
+ *at_least_once_aggregation_procedure, aggregate);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -245,13 +245,13 @@
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate,
+ *at_least_once_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate,
+ *at_least_once_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -284,13 +284,13 @@
uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate,
+ *select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate,
+ *select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -320,13 +320,13 @@
uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate,
+ *select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate,
+ *select_first_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -374,7 +374,7 @@
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, first_system_profile_hash,
- report_all_aggregation_procedure.get(), &aggregate,
+ *report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
AggregationPeriodBucket& bucket =
aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
@@ -383,7 +383,7 @@
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, second_system_profile_hash,
- report_all_aggregation_procedure.get(), &aggregate,
+ *report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
// Merge the second system profile aggregates into the first one.
@@ -395,7 +395,7 @@
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
- merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+ *merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
@@ -424,7 +424,7 @@
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, first_system_profile_hash,
- report_all_aggregation_procedure.get(), &aggregate,
+ *report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(first_event_time));
AggregationPeriodBucket& bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1u);
@@ -433,7 +433,7 @@
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, second_system_profile_hash,
- report_all_aggregation_procedure.get(), &aggregate,
+ *report_all_aggregation_procedure, aggregate,
util::FromUnixSeconds(second_event_time));
ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
ASSERT_EQ(bucket.system_profile_aggregates(1).by_event_code_size(), event_vector_buffer_max);
@@ -447,7 +447,7 @@
SystemProfileAggregate* merged_system_profile_aggregate =
bucket.mutable_system_profile_aggregates(0);
at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
- merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+ *merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
@@ -473,13 +473,13 @@
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId - 1, system_profile_hash,
- count_aggregation_procedure.get(), &aggregate);
- AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
- count_aggregation_procedure.get(), &aggregate);
+ *count_aggregation_procedure, aggregate);
+ AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
+ aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
count_aggregation_procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId),
- &aggregate);
+ aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -507,15 +507,15 @@
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate);
+ *at_least_once_aggregation_procedure, aggregate);
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate);
+ *at_least_once_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
- util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+ util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -543,16 +543,16 @@
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate);
+ *at_least_once_aggregation_procedure, aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
- at_least_once_aggregation_procedure.get(), &aggregate);
+ *at_least_once_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
at_least_once_aggregation_procedure->GenerateObservations(
- util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+ util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -581,16 +581,16 @@
uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate);
+ *select_first_aggregation_procedure, aggregate);
uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
- select_first_aggregation_procedure.get(), &aggregate);
+ *select_first_aggregation_procedure, aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
select_first_aggregation_procedure->GenerateObservations(
- util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+ util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
index 8472172..de17231 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
@@ -15,19 +15,19 @@
: AggregationProcedure(metric, report), is_expedited_(report.expedited_sending()) {}
void AtLeastOnceAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
+ const logger::EventRecord & /*event_record*/, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
// TODO(fxbug.dev/53775): Handle the case where event_record is malformed.
- aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ aggregate_data.mutable_at_least_once()->set_at_least_once(true);
}
-void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.at_least_once().at_least_once()) {
- merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ merged_aggregate_data.mutable_at_least_once()->set_at_least_once(true);
if (aggregate_data.at_least_once().last_day_index() >
- merged_aggregate_data->at_least_once().last_day_index()) {
- merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+ merged_aggregate_data.at_least_once().last_day_index()) {
+ merged_aggregate_data.mutable_at_least_once()->set_last_day_index(
aggregate_data.at_least_once().last_day_index());
}
}
@@ -41,13 +41,13 @@
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
std::set<std::vector<uint32_t>> event_vectors_to_send;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) {
+ if (aggregate_data.data().at_least_once().last_day_index() >= time_info.day_index) {
continue;
}
@@ -69,7 +69,7 @@
return logger::Encoder::EncodeIntegerObservation(data);
}
-void AtLeastOnceAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void AtLeastOnceAggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
util::TimeInfo info,
uint64_t system_profile_hash) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
@@ -87,19 +87,19 @@
const std::set<std::vector<uint32_t>> &event_vectors =
SelectEventVectorsForObservation(buckets);
for (AggregateDataToGenerate &bucket : buckets) {
- for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- if (!aggregate_data->data().at_least_once().at_least_once()) {
+ if (!aggregate_data.data().at_least_once().at_least_once()) {
continue;
}
- if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) {
+ if (aggregate_data.data().at_least_once().last_day_index() >= info.day_index) {
continue;
}
- aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
+ aggregate_data.mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
}
}
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
index 5acf645..0e63c20 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
@@ -19,17 +19,17 @@
const ReportDefinition &report);
void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
- AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
// Call observation generation more frequently so that data can be sent as soon as possible.
[[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
- void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+ void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
uint64_t system_profile_hash) const override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
index 77bc7a1..4c2819f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
@@ -45,8 +45,7 @@
ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
ReportAggregate aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -69,7 +68,7 @@
std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -81,7 +80,7 @@
std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_at_least_once());
EXPECT_FALSE(merged_data.at_least_once().at_least_once());
@@ -96,7 +95,7 @@
std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -110,7 +109,7 @@
std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
@@ -130,11 +129,11 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -156,12 +155,12 @@
ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -181,11 +180,11 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -208,7 +207,7 @@
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
@@ -221,7 +220,7 @@
}
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -230,13 +229,13 @@
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
// Check that obsolete aggregates get cleaned up the next day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
}
@@ -254,14 +253,14 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -283,7 +282,7 @@
ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
// Days in the 7-day window that had events continue to contain data until the last day when it
@@ -311,7 +310,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -331,14 +330,14 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
ReportAggregate report_aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -360,7 +359,7 @@
ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
// Days in the 7-day window that had events continue to contain data.
@@ -380,7 +379,7 @@
}
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -391,7 +390,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -411,11 +410,11 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), num_events);
ReportAggregate report_aggregate;
- AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -447,7 +446,7 @@
ReportAggregate report_aggregate;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
index eb02a78..224ecff 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
@@ -24,10 +24,10 @@
string_buffer_max_(metric.string_buffer_max()) {}
void AtLeastOnceStringAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) {
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) {
Map<uint32_t, UniqueString> *unique_strings =
- aggregate_data->mutable_unique_strings()->mutable_unique_strings();
+ aggregate_data.mutable_unique_strings()->mutable_unique_strings();
std::string bytes =
util::FarmhashFingerprint(event_record.event()->string_event().string_value());
@@ -35,8 +35,8 @@
// Check if the current string event value's byte representation has appeared before in
// the string hashes of the current period bucket, if so, then initialize a UniqueString message
// if the index of the string hash doesn't exist in the current UniqueString mapping.
- for (int i = 0; i < bucket->string_hashes_size(); i++) {
- if (bucket->string_hashes(i) == bytes) {
+ for (int i = 0; i < bucket.string_hashes_size(); i++) {
+ if (bucket.string_hashes(i) == bytes) {
if (!unique_strings->contains(i)) {
(*unique_strings)[i] = UniqueString();
}
@@ -44,31 +44,31 @@
}
}
- if (bucket->string_hashes_size() < string_buffer_max_) {
+ if (bucket.string_hashes_size() < string_buffer_max_) {
// Add new entry
- (*unique_strings)[bucket->string_hashes_size()] = UniqueString();
- bucket->add_string_hashes(bytes);
+ (*unique_strings)[bucket.string_hashes_size()] = UniqueString();
+ bucket.add_string_hashes(bytes);
}
}
void AtLeastOnceStringAggregationProcedure::MergeAggregateData(
- AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
+ AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
// Merge in the aggregate data's mapping of indexes to their count.
// This only works correctly if the AggregateData objects are both part of the same
// AggregationPeriodBucket, such that their string_index values both refer to the same repeated
// string_hashes field in the bucket.
for (const auto &[string_index, unique_string] :
aggregate_data.unique_strings().unique_strings()) {
- if (merged_aggregate_data->unique_strings().unique_strings().contains(string_index)) {
- if (unique_string.last_day_index() > merged_aggregate_data->unique_strings()
+ if (merged_aggregate_data.unique_strings().unique_strings().contains(string_index)) {
+ if (unique_string.last_day_index() > merged_aggregate_data.unique_strings()
.unique_strings()
.at(string_index)
.last_day_index()) {
- (*merged_aggregate_data->mutable_unique_strings()->mutable_unique_strings())[string_index]
+ (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index]
.set_last_day_index(unique_string.last_day_index());
}
} else {
- (*merged_aggregate_data->mutable_unique_strings()->mutable_unique_strings())[string_index] =
+ (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index] =
unique_string;
}
}
@@ -92,16 +92,16 @@
// of string hashes in the hashes vector above.
std::map<std::string, uint32_t> seen_hashes;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!(event_vectors.count(event_vector) > 0)) {
continue;
}
// Create or add on to a unique event vector's string historgram indices.
for (const auto &[index, unique_string] :
- aggregate_data->data().unique_strings().unique_strings()) {
+ aggregate_data.data().unique_strings().unique_strings()) {
if (unique_string.last_day_index() >= time_info.day_index) {
continue;
}
@@ -142,7 +142,7 @@
}
void AtLeastOnceStringAggregationProcedure::ObservationsCommitted(
- ReportAggregate *aggregate, util::TimeInfo info, uint64_t system_profile_hash) const {
+ ReportAggregate &aggregate, util::TimeInfo info, uint64_t system_profile_hash) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
GetAggregateDataToGenerate(info, aggregate);
auto data_to_generate_it = data_to_generate.find(system_profile_hash);
@@ -158,15 +158,15 @@
const std::set<std::vector<uint32_t>> &event_vectors =
SelectEventVectorsForObservation(buckets);
for (AggregateDataToGenerate &bucket : buckets) {
- for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!(event_vectors.count(event_vector) > 0)) {
continue;
}
for (auto &[index, unique_string] :
- *aggregate_data->mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
+ *aggregate_data.mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
if (unique_string.last_day_index() >= info.day_index) {
continue;
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
index d63a13d..05453d5 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
@@ -19,17 +19,17 @@
explicit AtLeastOnceStringAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report);
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
// Call observation generation more frequently so that data can be sent as soon as possible.
[[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
- void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+ void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
uint64_t system_profile_hash) const override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
index 13e1365..2388006 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
@@ -23,8 +23,8 @@
protected:
void AddStringEventsForDay(uint32_t day_index,
const std::map<uint32_t, std::vector<std::string>>& events_to_strings,
- uint64_t system_profile_hash, AggregationProcedure* procedure,
- ReportAggregate* aggregate) {
+ uint64_t system_profile_hash, AggregationProcedure& procedure,
+ ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
@@ -35,8 +35,8 @@
event->set_event_code(0, event_code);
for (const std::string& str : strings) {
event->set_string_value(str);
- procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
- util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index)));
+ procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+ util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index)));
}
}
}
@@ -74,8 +74,8 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
@@ -116,7 +116,7 @@
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kStringMetricMetricId, kStringMetricUniqueDeviceStringCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 3);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
@@ -133,7 +133,7 @@
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_unique_strings());
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 0);
@@ -149,7 +149,7 @@
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
@@ -168,7 +168,7 @@
std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(1));
@@ -226,11 +226,11 @@
{kTestStrings2, kTestHashes2},
{kTestStrings3, kTestHashes3},
};
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -267,13 +267,13 @@
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
ASSERT_EQ(report_aggregate.daily().by_day_index_size(), 0);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -319,11 +319,11 @@
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -361,7 +361,7 @@
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
ASSERT_NE(report_aggregate.daily().by_day_index_size(), 0);
EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
@@ -377,7 +377,7 @@
}
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -386,13 +386,13 @@
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
// Check that obsolete aggregates get cleaned up the next day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
}
@@ -435,8 +435,8 @@
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
@@ -447,7 +447,7 @@
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -478,7 +478,7 @@
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
// Days in the 7-day window that had events continue to contain data until the last day when it
@@ -506,7 +506,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -571,16 +571,16 @@
{2, kTestStrings2},
{5, kTestStrings3},
};
- AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash,
- procedure.get(), &report_aggregate);
+ AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
+ report_aggregate);
const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
{2, kTestStrings4},
{7, kTestStrings5},
};
- AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash,
- procedure.get(), &report_aggregate);
+ AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
+ report_aggregate);
// This a vector string hashes that is a combination of hashes across 2 days for a single event
// vector.
@@ -605,7 +605,7 @@
util::TimeInfo time_info;
time_info.day_index = kDayIndexDay2;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -653,11 +653,11 @@
}
// Commit observation
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndexDay2 + 7;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -702,8 +702,8 @@
{kTestStrings1, kTestHashes1},
{kTestStrings2, kTestHashes2},
};
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
std::vector<std::string> expected_hashes;
expected_hashes.reserve(kTestStrings.size());
@@ -714,7 +714,7 @@
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -746,7 +746,7 @@
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
// Days in the 7-day window that had events continue to contain data.
@@ -766,7 +766,7 @@
}
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -777,7 +777,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
EXPECT_EQ(observations.size(), 0u);
@@ -821,8 +821,8 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
events_to_strings.size());
- AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+ report_aggregate);
EXPECT_LT(string_buffer_max, kTestStrings.size());
EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).string_hashes_size(),
@@ -880,15 +880,15 @@
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_1 = {
{0, kTestStrings1},
};
- AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash,
- procedure.get(), &report_aggregate);
+ AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
+ report_aggregate);
const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
{3, kTestStrings2},
};
- AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash,
- procedure.get(), &report_aggregate);
+ AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
+ report_aggregate);
const std::map<uint32_t, std::vector<std::string>> events_to_hashes = {
{0, kTestHashes1},
@@ -907,7 +907,7 @@
time_info.day_index = kDayIndexDay2;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -940,11 +940,11 @@
ASSERT_THAT(actualHashes, IsSubsetOf(test_hashes));
}
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// Check that generating an observation again for the same day will result in no observation, even
// though a string has not been observed due to the string buffer max being reached.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
index 961ade1..84faa4d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
@@ -33,15 +33,15 @@
}
void CountAggregationProcedure::UpdateAggregateData(const logger::EventRecord &event_record,
- AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
+ AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
uint64_t occurrence_count = event_record.event()->occurrence_event().count();
- aggregate_data->set_count(aggregate_data->count() + occurrence_count);
+ aggregate_data.set_count(aggregate_data.count() + occurrence_count);
}
-void CountAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void CountAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
- merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+ merged_aggregate_data.set_count(merged_aggregate_data.count() + aggregate_data.count());
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
@@ -50,13 +50,13 @@
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- aggregates_by_event_code[event_vector].push_back(&aggregate_data->data());
+ aggregates_by_event_code[event_vector].push_back(&aggregate_data.data());
}
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
index 155b201..8182ab4 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
@@ -19,10 +19,10 @@
[[nodiscard]] bool IsDaily() const override;
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
index 7f7d17d..8a9723a 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
@@ -31,8 +31,7 @@
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{56789};
const uint32_t kNumEventCodes = 100;
- AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, *procedure, aggregate);
ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -55,8 +54,8 @@
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{56789};
const uint32_t kNumEventCodes = 100;
- AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash,
- procedure.get(), &aggregate);
+ AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash, *procedure,
+ aggregate);
ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -78,7 +77,7 @@
std::unique_ptr<CountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 30);
}
@@ -89,7 +88,7 @@
std::unique_ptr<CountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 0);
}
@@ -101,7 +100,7 @@
std::unique_ptr<CountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 10);
}
@@ -113,7 +112,7 @@
std::unique_ptr<CountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 20);
}
@@ -130,14 +129,13 @@
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{56789};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, *procedure, aggregate);
}
util::TimeInfo info;
info.hour_id = kEndHourId;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(info, &aggregate);
+ procedure->GenerateObservations(info, aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -151,7 +149,7 @@
EXPECT_EQ(value.event_codes(0), value.value());
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+ procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
@@ -167,14 +165,13 @@
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{56789};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, *procedure, aggregate);
}
util::TimeInfo info;
info.hour_id = kEndHourId;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(info, &aggregate);
+ procedure->GenerateObservations(info, aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -188,7 +185,7 @@
EXPECT_EQ(value.event_codes(0), value.value());
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+ procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
index d1ea447..604eb3c 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
@@ -25,14 +25,14 @@
(metric.has_int_buckets()) ? metric.int_buckets() : report.int_buckets())) {}
void IntegerHistogramAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
switch (metric_type()) {
case MetricDefinition::INTEGER: {
const IntegerEvent &int_event = event_record.event()->integer_event();
::google::protobuf::Map<uint32_t, int64_t> *histogram =
- aggregate_data->mutable_integer_histogram()->mutable_histogram();
+ aggregate_data.mutable_integer_histogram()->mutable_histogram();
uint32_t index = integer_buckets_->BucketIndex(int_event.value());
(*histogram)[index] += 1;
@@ -42,7 +42,7 @@
event_record.event()->integer_histogram_event();
::google::protobuf::Map<uint32_t, int64_t> *histogram =
- aggregate_data->mutable_integer_histogram()->mutable_histogram();
+ aggregate_data.mutable_integer_histogram()->mutable_histogram();
for (const HistogramBucket &bucket : int_histogram_event.buckets()) {
(*histogram)[bucket.index()] += static_cast<int64_t>(bucket.count());
}
@@ -53,10 +53,10 @@
}
}
-void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
for (const auto &[index, count] : aggregate_data.integer_histogram().histogram()) {
- (*merged_aggregate_data->mutable_integer_histogram()->mutable_histogram())[index] += count;
+ (*merged_aggregate_data.mutable_integer_histogram()->mutable_histogram())[index] += count;
}
}
@@ -66,13 +66,13 @@
std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
data.reserve(bucket.aggregate_data.size());
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
std::vector<std::tuple<uint32_t, int64_t>> histogram;
- for (const auto &[key, value] : aggregate_data->data().integer_histogram().histogram()) {
+ for (const auto &[key, value] : aggregate_data.data().integer_histogram().histogram()) {
histogram.emplace_back(std::make_tuple(key, value));
}
- std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
data.emplace_back(std::make_tuple(event_codes, histogram));
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
index 6112ded..dde3cee 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
@@ -18,10 +18,10 @@
explicit IntegerHistogramAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report);
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
index 581e08b..3e7eac3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
@@ -17,7 +17,7 @@
class IntegerHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
public:
void LogIntegerEvents(uint32_t hour_id, uint32_t num_event_codes, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate) {
+ AggregationProcedure& procedure, ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
IntegerEvent* event = record->event()->mutable_integer_event();
@@ -25,15 +25,15 @@
for (int i = 0; i < num_event_codes; i++) {
event->set_event_code(0, i);
event->set_value(i);
- procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
- util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+ procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+ util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
void LogIntegerHistogramEvents(uint32_t hour_id, uint32_t num_event_codes,
const std::map<uint32_t, uint64_t>& histogram,
- uint64_t system_profile_hash, AggregationProcedure* procedure,
- ReportAggregate* aggregate) {
+ uint64_t system_profile_hash, AggregationProcedure& procedure,
+ ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
IntegerHistogramEvent* event = record->event()->mutable_integer_histogram_event();
@@ -45,8 +45,8 @@
event->add_event_code(0);
for (int i = 0; i < num_event_codes; i++) {
event->set_event_code(0, i);
- procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
- util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+ procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+ util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
@@ -68,7 +68,7 @@
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{1867};
- LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
+ LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, *procedure, aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -90,7 +90,7 @@
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{1867};
LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}},
- system_profile_hash, procedure.get(), &aggregate);
+ system_profile_hash, *procedure, aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -110,7 +110,7 @@
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
@@ -127,7 +127,7 @@
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_integer_histogram());
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0);
@@ -141,7 +141,7 @@
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
@@ -158,7 +158,7 @@
std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
@@ -179,11 +179,11 @@
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{1867};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
+ LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, *procedure, aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -199,7 +199,7 @@
EXPECT_EQ(value.bucket_counts(0), 1);
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+ procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
@@ -218,11 +218,11 @@
const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash,
- procedure.get(), &aggregate);
+ *procedure, aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -240,7 +240,7 @@
}
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+ procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
index 30ef332..a7b4ba7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
@@ -62,16 +62,17 @@
NumericStatAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
- std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
+ std::map<std::vector<uint32_t>, std::vector<std::reference_wrapper<const AggregateData>>>
+ aggregates_by_event_code;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- aggregates_by_event_code[event_vector].push_back(&aggregate_data->data());
+ aggregates_by_event_code[event_vector].push_back(aggregate_data.data());
}
}
@@ -89,135 +90,135 @@
}
void SumNumericStatAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- aggregate_data->set_integer_value(aggregate_data->integer_value() +
- event_record.event()->integer_event().value());
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ aggregate_data.set_integer_value(aggregate_data.integer_value() +
+ event_record.event()->integer_event().value());
}
-void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
- merged_aggregate_data->set_integer_value(merged_aggregate_data->integer_value() +
- aggregate_data.integer_value());
+ merged_aggregate_data.set_integer_value(merged_aggregate_data.integer_value() +
+ aggregate_data.integer_value());
}
int64_t SumNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = 0;
- for (const auto *aggregate : aggregates) {
- value += aggregate->integer_value();
+ for (const AggregateData &aggregate : aggregates) {
+ value += aggregate.integer_value();
}
return value;
}
void MinNumericStatAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
- aggregate_data->set_integer_value(
- std::min(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ aggregate_data.set_integer_value(
+ std::min(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
} else {
- aggregate_data->set_integer_value(event_record.event()->integer_event().value());
+ aggregate_data.set_integer_value(event_record.event()->integer_event().value());
}
}
-void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
- if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
- merged_aggregate_data->set_integer_value(
- std::min(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+ if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ merged_aggregate_data.set_integer_value(
+ std::min(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
} else {
- merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+ merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
}
}
}
int64_t MinNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::max();
- for (const auto *aggregate : aggregates) {
- value = std::min(value, aggregate->integer_value());
+ for (const AggregateData &aggregate : aggregates) {
+ value = std::min(value, aggregate.integer_value());
}
return value;
}
void MaxNumericStatAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
- aggregate_data->set_integer_value(
- std::max(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ aggregate_data.set_integer_value(
+ std::max(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
} else {
- aggregate_data->set_integer_value(event_record.event()->integer_event().value());
+ aggregate_data.set_integer_value(event_record.event()->integer_event().value());
}
}
-void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
- if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
- merged_aggregate_data->set_integer_value(
- std::max(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+ if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ merged_aggregate_data.set_integer_value(
+ std::max(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
} else {
- merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+ merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
}
}
}
int64_t MaxNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::min();
- for (const auto *aggregate : aggregates) {
- value = std::max(value, aggregate->integer_value());
+ for (const AggregateData &aggregate : aggregates) {
+ value = std::max(value, aggregate.integer_value());
}
return value;
}
void MeanNumericStatAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- SumAndCount *sum_and_count = aggregate_data->mutable_sum_and_count();
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
sum_and_count->set_count(sum_and_count->count() + 1);
}
-void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
- SumAndCount *sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+ SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
}
int64_t MeanNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
int64_t sum = 0;
uint32_t count = 0;
- for (const auto *aggregate : aggregates) {
- sum += aggregate->sum_and_count().sum();
- count += aggregate->sum_and_count().count();
+ for (const AggregateData &aggregate : aggregates) {
+ sum += aggregate.sum_and_count().sum();
+ count += aggregate.sum_and_count().count();
}
return sum / count;
}
void MedianNumericStatAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- aggregate_data->mutable_integer_values()->add_value(
- event_record.event()->integer_event().value());
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ aggregate_data.mutable_integer_values()->add_value(event_record.event()->integer_event().value());
}
void MedianNumericStatAggregationProcedure::MergeAggregateData(
- AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
- merged_aggregate_data->mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
+ AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
+ merged_aggregate_data.mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
}
namespace {
-std::vector<int64_t> CollectValues(const std::vector<const AggregateData *> &aggregates) {
+std::vector<int64_t> CollectValues(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values;
- for (const auto *aggregate : aggregates) {
- values.insert(values.end(), aggregate->integer_values().value().begin(),
- aggregate->integer_values().value().end());
+ for (const AggregateData &aggregate : aggregates) {
+ values.insert(values.end(), aggregate.integer_values().value().begin(),
+ aggregate.integer_values().value().end());
}
sort(values.begin(), values.end());
@@ -227,7 +228,7 @@
} // namespace
int64_t MedianNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
if (values.size() % 2 == 0) {
@@ -237,7 +238,7 @@
}
int64_t PercentileNNumericStatAggregationProcedure::CollectValue(
- const std::vector<const AggregateData *> &aggregates) {
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
std::vector<int64_t> values = CollectValues(aggregates);
auto index = static_cast<uint32_t>((static_cast<double>(percentile_n_) / 100.0) *
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
index 9ebcd06..eb19c5b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
@@ -33,7 +33,8 @@
const util::TimeInfo & /*time_info*/) override;
protected:
- virtual int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) = 0;
+ virtual int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) = 0;
};
class SumNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -42,16 +43,17 @@
const ReportDefinition &report)
: NumericStatAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override { return "SUM_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
};
class MinNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -60,16 +62,17 @@
const ReportDefinition &report)
: NumericStatAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override { return "MIN_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
};
class MaxNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -78,16 +81,17 @@
const ReportDefinition &report)
: NumericStatAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override { return "MAX_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
};
class MeanNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -96,16 +100,17 @@
const ReportDefinition &report)
: NumericStatAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override { return "MEAN_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
};
class MedianNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -114,16 +119,17 @@
const ReportDefinition &report)
: NumericStatAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override { return "MEDIAN_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
};
class PercentileNNumericStatAggregationProcedure : public MedianNumericStatAggregationProcedure {
@@ -136,7 +142,8 @@
[[nodiscard]] std::string DebugString() const override { return "PERCENTILE_N_NUMERIC_STAT"; }
private:
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+ int64_t CollectValue(
+ const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
uint32_t percentile_n_;
};
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
index 9e096ef..efd01f7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
@@ -34,20 +34,20 @@
int64_t /* expected value */>> {
public:
void LogIntegerSequence(const std::vector<int64_t> &values, util::TimeInfo time_info,
- uint64_t system_profile_hash, AggregationProcedure *procedure,
- ReportAggregate *aggregate) {
+ uint64_t system_profile_hash, AggregationProcedure &procedure,
+ ReportAggregate &aggregate) {
std::unique_ptr<logger::EventRecord> record = MakeEventRecord(time_info);
IntegerEvent *event = record->event()->mutable_integer_event();
for (int64_t value : values) {
event->set_value(value);
- procedure->UpdateAggregate(
+ procedure.UpdateAggregate(
*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::HourIdToUnixSeconds(time_info.hour_id)));
}
event->add_event_code(1);
for (int64_t value : values) {
event->set_value(value);
- procedure->UpdateAggregate(
+ procedure.UpdateAggregate(
*record, aggregate, system_profile_hash,
util::FromUnixSeconds(util::HourIdToUnixSeconds(time_info.hour_id)));
}
@@ -130,26 +130,26 @@
return name;
}
-void CheckDebugString(int64_t daily_report_type, AggregationProcedure *procedure) {
+void CheckDebugString(int64_t daily_report_type, const AggregationProcedure &procedure) {
switch (daily_report_type) {
case kIntegerMetricUniqueDeviceNumericStatsReport7DaySumReportIndex:
- ASSERT_EQ(procedure->DebugString(), "SUM_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "SUM_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMinReportIndex:
- ASSERT_EQ(procedure->DebugString(), "MIN_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "MIN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMaxReportIndex:
- ASSERT_EQ(procedure->DebugString(), "MAX_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "MAX_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMeanReportIndex:
- ASSERT_EQ(procedure->DebugString(), "MEAN_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "MEAN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMedianReportIndex:
- ASSERT_EQ(procedure->DebugString(), "MEDIAN_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "MEDIAN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7Day75thPercentileReportIndex:
case kIntegerMetricUniqueDeviceNumericStatsReport7Day99thPercentileReportIndex:
- ASSERT_EQ(procedure->DebugString(), "PERCENTILE_N_NUMERIC_STAT");
+ ASSERT_EQ(procedure.DebugString(), "PERCENTILE_N_NUMERIC_STAT");
break;
}
}
@@ -162,7 +162,7 @@
uint32_t metric_id = kIntegerMetricMetricId;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, daily_report_type);
- CheckDebugString(daily_report_type, procedure.get());
+ CheckDebugString(daily_report_type, *procedure);
ReportAggregate report_aggregate;
const uint32_t kDayIndex = 7;
@@ -177,12 +177,12 @@
} else {
window.assign(integer_sequence.begin() + (window_size * day), integer_sequence.end());
}
- LogIntegerSequence(window, TimeInfo::FromDayIndex(day + 1), system_profile_hash,
- procedure.get(), &report_aggregate);
+ LogIntegerSequence(window, TimeInfo::FromDayIndex(day + 1), system_profile_hash, *procedure,
+ report_aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromDayIndex(kDayIndex), &report_aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromDayIndex(kDayIndex), report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -197,7 +197,7 @@
ASSERT_EQ(integer_obs.values(1).value(), expected_value);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, util::TimeInfo::FromDayIndex(kDayIndex),
+ procedure->ObservationsCommitted(report_aggregate, util::TimeInfo::FromDayIndex(kDayIndex),
system_profile_hash);
EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex - 6), 0u);
}
@@ -208,7 +208,7 @@
uint32_t metric_id = kIntegerMetricMetricId;
std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, hourly_report_type);
- CheckDebugString(daily_report_type, procedure.get());
+ CheckDebugString(daily_report_type, *procedure);
ReportAggregate report_aggregate;
const uint32_t kHourId = 20;
@@ -216,11 +216,11 @@
for (uint32_t hour = 0; hour <= kHourId; hour += 2) {
LogIntegerSequence(integer_sequence, TimeInfo::FromHourId(hour), system_profile_hash,
- procedure.get(), &report_aggregate);
+ *procedure, report_aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId), &report_aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId), report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -235,7 +235,7 @@
ASSERT_EQ(integer_obs.values(1).value(), expected_value);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, util::TimeInfo::FromHourId(kHourId),
+ procedure->ObservationsCommitted(report_aggregate, util::TimeInfo::FromHourId(kHourId),
system_profile_hash);
EXPECT_EQ(report_aggregate.hourly().by_hour_id().count(kHourId), 0u);
}
@@ -304,7 +304,7 @@
std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_value(), 300);
}
@@ -315,7 +315,7 @@
std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_value(), 0);
}
@@ -327,7 +327,7 @@
std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_value(), 200);
}
@@ -339,7 +339,7 @@
std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.integer_value(), 100);
}
@@ -361,7 +361,7 @@
std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), -15);
@@ -373,7 +373,7 @@
std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), 0);
@@ -386,7 +386,7 @@
std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), -15);
@@ -399,7 +399,7 @@
std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), 100);
@@ -422,7 +422,7 @@
std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), 100);
@@ -434,7 +434,7 @@
std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), 0);
@@ -447,7 +447,7 @@
std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), -15);
@@ -460,7 +460,7 @@
std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.has_integer_value());
EXPECT_EQ(merged_data.integer_value(), 100);
@@ -485,7 +485,7 @@
std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 30);
EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
@@ -497,7 +497,7 @@
std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 0);
EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
@@ -511,7 +511,7 @@
std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 10);
EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
@@ -525,7 +525,7 @@
std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 20);
EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
@@ -550,7 +550,7 @@
std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
}
@@ -561,7 +561,7 @@
std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
}
@@ -574,7 +574,7 @@
std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
}
@@ -587,7 +587,7 @@
std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
}
@@ -611,7 +611,7 @@
std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
}
@@ -622,7 +622,7 @@
std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
}
@@ -635,7 +635,7 @@
std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
}
@@ -648,7 +648,7 @@
std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
index 23138ea..615b74f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
@@ -18,18 +18,18 @@
}
void SelectFirstAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
- aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ const logger::EventRecord & /*event_record*/, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ aggregate_data.mutable_at_least_once()->set_at_least_once(true);
}
-void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
if (aggregate_data.at_least_once().at_least_once()) {
- merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ merged_aggregate_data.mutable_at_least_once()->set_at_least_once(true);
if (aggregate_data.at_least_once().last_day_index() >
- merged_aggregate_data->at_least_once().last_day_index()) {
- merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+ merged_aggregate_data.at_least_once().last_day_index()) {
+ merged_aggregate_data.mutable_at_least_once()->set_last_day_index(
aggregate_data.at_least_once().last_day_index());
}
}
@@ -43,13 +43,13 @@
const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
std::set<std::vector<uint32_t>> event_vectors_to_send;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) {
+ if (aggregate_data.data().at_least_once().last_day_index() >= time_info.day_index) {
continue;
}
@@ -71,7 +71,7 @@
return logger::Encoder::EncodeIntegerObservation(data);
}
-void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
util::TimeInfo info,
uint64_t system_profile_hash) const {
std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
@@ -89,19 +89,19 @@
const std::set<std::vector<uint32_t>> &event_vectors =
SelectEventVectorsForObservation(buckets);
for (AggregateDataToGenerate &bucket : buckets) {
- for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
if (!event_vectors.count(event_vector)) {
continue;
}
- if (!aggregate_data->data().at_least_once().at_least_once()) {
+ if (!aggregate_data.data().at_least_once().at_least_once()) {
continue;
}
- if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) {
+ if (aggregate_data.data().at_least_once().last_day_index() >= info.day_index) {
continue;
}
- aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
+ aggregate_data.mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
}
}
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
index 0a540a7..3a3e648 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
@@ -16,17 +16,17 @@
const ReportDefinition &report);
void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
- AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
// Call observation generation more frequently so that data can be sent as soon as possible.
[[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
- void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+ void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
uint64_t system_profile_hash) const override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
index 1f6ee10..6e36562 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
@@ -45,8 +45,7 @@
ReportAggregate aggregate;
// Add events for 2 different event vectors: {1} and {2}.
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
// Check that |aggregate| was updated for the first event vector but not the second.
ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -69,7 +68,7 @@
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -82,7 +81,7 @@
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_at_least_once());
EXPECT_FALSE(merged_data.at_least_once().at_least_once());
@@ -98,7 +97,7 @@
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -113,7 +112,7 @@
std::unique_ptr<SelectFirstAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_TRUE(merged_data.at_least_once().at_least_once());
EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
@@ -133,11 +132,11 @@
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}.
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -151,12 +150,12 @@
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
@@ -176,11 +175,11 @@
ReportAggregate report_aggregate;
// Add events for 2 different event vectors: {1} and {2}.
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -195,7 +194,7 @@
// Check that aggregates get marked as sent and don't get cleaned up while they are still needed
// that day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
1u);
@@ -205,7 +204,7 @@
EXPECT_EQ(system_profile_agg.by_event_code(0).data().at_least_once().last_day_index(), kDayIndex);
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -214,13 +213,13 @@
// Check that calling observation generation the next day generates no observation.
time_info.day_index++;
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
// Check that obsolete aggregates get cleaned up the next day.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
}
@@ -240,15 +239,15 @@
// Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
// ending on |kDayIndex|.
for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
- AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, *procedure,
+ report_aggregate);
}
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -262,7 +261,7 @@
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
// Days in the 7-day window that had events continue to contain data.
@@ -285,7 +284,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
@@ -307,15 +306,15 @@
// Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
// ending on |kDayIndex|.
for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
- AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, *procedure,
+ report_aggregate);
}
// Check that the observation is generated for the next 7 days.
for (int i = 0; i < 7; i++) {
time_info.day_index = kDayIndex + i;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -329,7 +328,7 @@
ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// No data for days beyond the 7-day window.
EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
// Days in the 7-day window that had events continue to contain data.
@@ -349,7 +348,7 @@
}
// Check that calling observation generation again for the same day generates no observation.
- observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+ observations_or = procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 1u);
@@ -360,7 +359,7 @@
// After 7 days the observation is no longer generated.
time_info.day_index = kDayIndex + 7;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
ASSERT_EQ(observations.size(), 0u);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
index 75d7e4d..964c70d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
@@ -15,16 +15,16 @@
: AggregationProcedure(metric, report) {}
void SelectMostCommonAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) {
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
// TODO(fxbug.dev/53775): Handle the case where event_record is malformed.
- aggregate_data->set_count(aggregate_data->count() +
- event_record.event()->occurrence_event().count());
+ aggregate_data.set_count(aggregate_data.count() +
+ event_record.event()->occurrence_event().count());
}
-void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
- merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+ merged_aggregate_data.set_count(merged_aggregate_data.count() + aggregate_data.count());
}
std::string SelectMostCommonAggregationProcedure::DebugString() const {
@@ -48,10 +48,10 @@
uint32_t event_vector_count = 0;
for (const AggregateDataToGenerate &bucket : buckets) {
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- if (std::equal(aggregate_data->event_codes().begin(), aggregate_data->event_codes().end(),
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
event_vector.begin(), event_vector.end())) {
- event_vector_count += aggregate_data->data().count();
+ event_vector_count += aggregate_data.data().count();
continue;
}
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
index e2430ff..7a9bda5 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
@@ -18,10 +18,10 @@
explicit SelectMostCommonAggregationProcedure(const MetricDefinition &metric,
const ReportDefinition &report);
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket * /*bucket*/) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
[[nodiscard]] std::string DebugString() const override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
index 67ae97e..759b837 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
@@ -43,8 +43,7 @@
ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
ReportAggregate aggregate;
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
ASSERT_EQ(aggregate.daily().by_day_index().count(kDayIndex), 1u);
ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -66,7 +65,7 @@
std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 221);
}
@@ -78,7 +77,7 @@
std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 0);
}
@@ -91,7 +90,7 @@
std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 100);
}
@@ -104,7 +103,7 @@
std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId,
kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.count(), 121);
}
@@ -121,7 +120,7 @@
ReportAggregate report_aggregate;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -144,11 +143,11 @@
ReportAggregate report_aggregate;
// Log |event_code| OccurrenceEvents, each with a count of 1, for each |event_code| in the range
// [1, kNumEventCodes]. The most common event code will be |kNumEventCodes|.
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -163,7 +162,7 @@
EXPECT_EQ(integer_obs.values(0).value(), 1);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex), 0u);
}
@@ -184,18 +183,18 @@
const uint32_t kMostCommonEventCode = 1;
// Log |event_code| OccurrenceEvents, each with a count of 1, for each |event_code| in the
// range [1, kNumEventCodes].
- AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex - 1, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex - 1, system_profile_hash, *procedure,
+ report_aggregate);
// For another day in the aggregation period, log |kNumEventCodes| + 1 OccurrenceEvents, each with
// a count of 1, for event code |kMostCommonEventCode|. The most common event code over the
// aggregation period is now |kMostCommonEventCode|.
for (int i = 0; i < kNumEventCodes + 1; ++i) {
- AddOccurrenceEventsForDay(kMostCommonEventCode, kDayIndex, system_profile_hash, procedure.get(),
- &report_aggregate);
+ AddOccurrenceEventsForDay(kMostCommonEventCode, kDayIndex, system_profile_hash, *procedure,
+ report_aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(time_info, &report_aggregate);
+ procedure->GenerateObservations(time_info, report_aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -210,7 +209,7 @@
EXPECT_EQ(integer_obs.values(0).value(), 1);
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+ procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
// Days in the 7-day window that had events continue to contain data.
ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
index 6f232e3..db8e02d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
@@ -16,36 +16,35 @@
namespace cobalt::local_aggregation {
void StringHistogramAggregationProcedure::UpdateAggregateData(
- const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) {
- StringHistogram *histogram = aggregate_data->mutable_string_histogram();
+ const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) {
+ StringHistogram *histogram = aggregate_data.mutable_string_histogram();
std::string bytes =
util::FarmhashFingerprint(event_record.event()->string_event().string_value());
- for (int i = 0; i < bucket->string_hashes_size(); i++) {
- if (bucket->string_hashes(i) == bytes) {
+ for (int i = 0; i < bucket.string_hashes_size(); i++) {
+ if (bucket.string_hashes(i) == bytes) {
(*histogram->mutable_histogram())[i] += 1;
return;
}
}
- if (bucket->string_hashes_size() < string_buffer_max_) {
+ if (bucket.string_hashes_size() < string_buffer_max_) {
// Add new entry
- (*histogram->mutable_histogram())[bucket->string_hashes_size()] += 1;
- bucket->add_string_hashes(bytes);
+ (*histogram->mutable_histogram())[bucket.string_hashes_size()] += 1;
+ bucket.add_string_hashes(bytes);
}
}
-void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) {
// Merge in the aggregate data's mapping of indexes to their count.
// This only works correctly if the AggregateData objects are both part of the same
// AggregationPeriodBucket, such that their string_index values both refer to the same repeated
// string_hashes field in the bucket.
for (const auto &[string_index, count] : aggregate_data.string_histogram().histogram()) {
- (*merged_aggregate_data->mutable_string_histogram()->mutable_histogram())[string_index] +=
- count;
+ (*merged_aggregate_data.mutable_string_histogram()->mutable_histogram())[string_index] += count;
}
}
@@ -55,10 +54,10 @@
std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
data.reserve(bucket.aggregate_data.size());
- for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
- const StringHistogram &histogram = aggregate_data->data().string_histogram();
- std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ const StringHistogram &histogram = aggregate_data.data().string_histogram();
+ std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
std::vector<std::tuple<uint32_t, int64_t>> event_code_histogram;
for (const auto &[index, count] : histogram.histogram()) {
event_code_histogram.emplace_back(index, count);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
index aa67ed8..b53f303 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
@@ -18,10 +18,10 @@
: HourlyAggregationProcedure(metric, report),
string_buffer_max_(metric.string_buffer_max()) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
index 6f358e6..7b9b0bc 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
@@ -22,7 +22,7 @@
protected:
void LogStringEvents(uint32_t hour_id, uint32_t num_event_codes,
const std::vector<std::string>& strings, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate) {
+ AggregationProcedure& procedure, ReportAggregate& aggregate) {
std::unique_ptr<logger::EventRecord> record =
MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
StringEvent* event = record->event()->mutable_string_event();
@@ -31,8 +31,8 @@
event->set_event_code(0, i);
for (const std::string& str : strings) {
event->set_string_value(str);
- procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
- util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+ procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+ util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
}
@@ -59,8 +59,8 @@
"Integer a ullamcorper dolor.",
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
};
- LogStringEvents(kHourId, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
- &aggregate);
+ LogStringEvents(kHourId, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+ aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -80,7 +80,7 @@
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 3);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
@@ -97,7 +97,7 @@
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_FALSE(merged_data.has_string_histogram());
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 0);
@@ -111,7 +111,7 @@
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
@@ -128,7 +128,7 @@
std::unique_ptr<StringHistogramAggregationProcedure> procedure =
GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
@@ -153,12 +153,12 @@
"Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
- &aggregate);
+ LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+ aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -185,7 +185,7 @@
}
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+ procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
@@ -213,12 +213,12 @@
"Pellentesque dictum quam nec lectus sagittis interdum.",
};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
- &aggregate);
+ LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+ aggregate);
}
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+ procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -234,7 +234,7 @@
ASSERT_EQ(value.bucket_counts_size(), 5);
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+ procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
system_profile_hash);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
index a7572dd..365b325 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
@@ -14,31 +14,31 @@
namespace cobalt::local_aggregation {
-void SumAndCountAggregationProcedure::UpdateAggregateData(const logger::EventRecord& event_record,
- AggregateData* aggregate_data,
- AggregationPeriodBucket* /*bucket*/) {
- SumAndCount* sum_and_count = aggregate_data->mutable_sum_and_count();
+void SumAndCountAggregationProcedure::UpdateAggregateData(const logger::EventRecord &event_record,
+ AggregateData &aggregate_data,
+ AggregationPeriodBucket & /*bucket*/) {
+ SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count();
sum_and_count->set_count(sum_and_count->count() + 1);
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
}
-void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData* merged_aggregate_data,
- const AggregateData& aggregate_data) {
- SumAndCount* sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count();
sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
}
lib::statusor::StatusOr<std::unique_ptr<Observation>>
-SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate& bucket) {
+SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate &bucket) {
std::vector<std::tuple<std::vector<uint32_t>, int64_t, uint32_t>> data;
data.reserve(bucket.aggregate_data.size());
- for (const EventCodesAggregateData* aggregate_data : bucket.aggregate_data) {
- std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
- aggregate_data->event_codes().end());
- data.emplace_back(std::make_tuple(event_codes, aggregate_data->data().sum_and_count().sum(),
- aggregate_data->data().sum_and_count().count()));
+ for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+ std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+ aggregate_data.event_codes().end());
+ data.emplace_back(std::make_tuple(event_codes, aggregate_data.data().sum_and_count().sum(),
+ aggregate_data.data().sum_and_count().count()));
}
if (data.empty()) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
index fbc28b3..88e2f9f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
@@ -17,10 +17,10 @@
SumAndCountAggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report)
: HourlyAggregationProcedure(metric, report) {}
- void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
- AggregationPeriodBucket *bucket) override;
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+ AggregationPeriodBucket &bucket) override;
- void MergeAggregateData(AggregateData *merged_aggregate_data,
+ void MergeAggregateData(AggregateData &merged_aggregate_data,
const AggregateData &aggregate_data) override;
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
index 0bfed7b..dc769c3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
@@ -34,8 +34,7 @@
const int64_t kValue = 42;
const uint32_t kHourId = 1;
const uint64_t system_profile_hash = uint64_t{634354};
- AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, procedure.get(),
- &aggregate);
+ AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, *procedure, aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -60,7 +59,7 @@
std::unique_ptr<SumAndCountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 30);
EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
@@ -72,7 +71,7 @@
std::unique_ptr<SumAndCountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 0);
EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
@@ -86,7 +85,7 @@
std::unique_ptr<SumAndCountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 10);
EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
@@ -100,7 +99,7 @@
std::unique_ptr<SumAndCountAggregationProcedure> procedure =
GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
- procedure->MergeAggregateData(&merged_data, data);
+ procedure->MergeAggregateData(merged_data, data);
EXPECT_EQ(merged_data.sum_and_count().count(), 20);
EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
@@ -119,14 +118,13 @@
const uint32_t kEndHourId = 11;
const uint64_t system_profile_hash = uint64_t{634354};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
- AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, procedure.get(),
- &aggregate);
+ AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, *procedure, aggregate);
}
util::TimeInfo info;
info.hour_id = kEndHourId;
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
- procedure->GenerateObservations(info, &aggregate);
+ procedure->GenerateObservations(info, aggregate);
ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -142,7 +140,7 @@
EXPECT_EQ(sum_and_count.event_codes(0) * kValue, sum_and_count.sum());
}
// Check that obsolete aggregates get cleaned up.
- procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+ procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
index 4de47b5..75ca062 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
@@ -73,7 +73,7 @@
void AddOccurrenceEventsForDay(
uint32_t num_events, uint32_t day_index, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate,
+ AggregationProcedure& procedure, ReportAggregate& aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
AddOccurrenceEventsForDayWithCount(num_events, 1, day_index, system_profile_hash, procedure,
aggregate, event_time);
@@ -81,7 +81,7 @@
void AddOccurrenceEventsForDayWithCount(
uint32_t num_events, uint64_t count, uint32_t day_index, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate,
+ AggregationProcedure& procedure, ReportAggregate& aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
auto record = MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
OccurrenceEvent* event = record->event()->mutable_occurrence_event();
@@ -91,7 +91,7 @@
for (uint32_t i = 1; i <= num_events; i++) {
event->set_event_code(0, i);
for (uint32_t j = 0; j < i; j++) {
- procedure->UpdateAggregate(
+ procedure.UpdateAggregate(
*record, aggregate, system_profile_hash,
event_time.value_or(util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index))));
}
@@ -100,7 +100,7 @@
void AddOccurrenceEventsForHour(
uint32_t num_events, uint32_t hour_id, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate,
+ AggregationProcedure& procedure, ReportAggregate& aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
AddOccurrenceEventsForHourWithCount(num_events, 1, hour_id, system_profile_hash, procedure,
aggregate, event_time);
@@ -108,7 +108,7 @@
void AddOccurrenceEventsForHourWithCount(
uint32_t num_events, uint64_t count, uint32_t hour_id, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate,
+ AggregationProcedure& procedure, ReportAggregate& aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
OccurrenceEvent* event = record->event()->mutable_occurrence_event();
@@ -118,7 +118,7 @@
for (uint32_t i = 1; i <= num_events; i++) {
event->set_event_code(0, i);
for (uint32_t j = 0; j < i; j++) {
- procedure->UpdateAggregate(
+ procedure.UpdateAggregate(
*record, aggregate, system_profile_hash,
event_time.value_or(util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))));
}
@@ -127,7 +127,7 @@
void AddIntegerEvents(
uint32_t num_events, int64_t value, uint32_t hour_id, uint64_t system_profile_hash,
- AggregationProcedure* procedure, ReportAggregate* aggregate,
+ AggregationProcedure& procedure, ReportAggregate& aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
IntegerEvent* event = record->event()->mutable_integer_event();
@@ -137,7 +137,7 @@
for (uint32_t i = 1; i <= num_events; i++) {
event->set_event_code(0, i);
for (uint32_t j = 0; j < i; j++) {
- procedure->UpdateAggregate(
+ procedure.UpdateAggregate(
*record, aggregate, system_profile_hash,
event_time.value_or(util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))));
}
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
index 71927dc..27f062f 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
@@ -103,7 +103,7 @@
++system_profile_aggregates_it;
while (system_profile_aggregates_it != system_profile_aggregates->end()) {
- procedure->MergeSystemProfileAggregates(&merged_system_profile_aggregate,
+ procedure->MergeSystemProfileAggregates(merged_system_profile_aggregate,
*system_profile_aggregates_it);
// After merging, remove the old aggregate from the repeated field.
diff --git a/src/local_aggregation_1_1/local_aggregation.cc b/src/local_aggregation_1_1/local_aggregation.cc
index f395318..e110def 100644
--- a/src/local_aggregation_1_1/local_aggregation.cc
+++ b/src/local_aggregation_1_1/local_aggregation.cc
@@ -101,7 +101,7 @@
// Calculate the hash of the final filtered system profile.
uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());
- procedure->UpdateAggregate(event_record, &report_aggregate, system_profile_hash,
+ procedure->UpdateAggregate(event_record, report_aggregate, system_profile_hash,
event_timestamp);
// Make sure the final filtered system profile is stored.
diff --git a/src/local_aggregation_1_1/observation_generator.cc b/src/local_aggregation_1_1/observation_generator.cc
index b750daf..3233670 100644
--- a/src/local_aggregation_1_1/observation_generator.cc
+++ b/src/local_aggregation_1_1/observation_generator.cc
@@ -136,12 +136,11 @@
if (procedure) {
if (aggregate.aggregate()->mutable_by_report_id()->count(report.id())) {
- ReportAggregate* report_aggregate =
- &aggregate.aggregate()->mutable_by_report_id()->at(report.id());
+ ReportAggregate& report_aggregate =
+ aggregate.aggregate()->mutable_by_report_id()->at(report.id());
- Status status =
- GenerateObservationsForReportAggregate(aggregate, report_aggregate, procedure.get(),
- system_time, metric, metric_ref, report);
+ Status status = GenerateObservationsForReportAggregate(
+ aggregate, report_aggregate, *procedure, system_time, *metric, metric_ref, report);
if (!status.ok()) {
generation_errors.push_back(status);
@@ -172,9 +171,9 @@
}
Status ObservationGenerator::GenerateObservationsForReportAggregate(
- const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate,
- AggregationProcedure* procedure, std::chrono::system_clock::time_point end_time,
- const MetricDefinition* metric, const logger::MetricRef& metric_ref,
+ const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
+ AggregationProcedure& procedure, std::chrono::system_clock::time_point end_time,
+ const MetricDefinition& metric, const logger::MetricRef& metric_ref,
const ReportDefinition& report) {
SystemProfile current_filtered_system_profile;
MetadataBuilder::FilteredSystemProfile(report, system_data_->system_profile(),
@@ -183,23 +182,23 @@
CB_ASSIGN_OR_RETURN(
std::vector<util::TimeInfo> to_generate,
- backfill_manager_.CalculateBackfill(procedure->GetLastTimeInfo(*report_aggregate), end_time,
- *civil_time_converter_, *metric, procedure->IsDaily()));
+ backfill_manager_.CalculateBackfill(procedure.GetLastTimeInfo(report_aggregate), end_time,
+ *civil_time_converter_, metric, procedure.IsDaily()));
// In general, don't generate observations for the hour ID (if hourly) or the day index (if daily)
// corresponding to the current time. However, if `report` is a daily expedited report, then do
// generate observations for the current day.
//
// TODO(fxbug.dev/92131): Move this conditional into BackfillManager.
- if (procedure->IsDaily() && procedure->IsExpedited()) {
+ if (procedure.IsDaily() && procedure.IsExpedited()) {
CB_ASSIGN_OR_RETURN(util::TimeInfo end_time_info,
- util::TimeInfo::FromTimePoint(end_time, *civil_time_converter_, *metric));
+ util::TimeInfo::FromTimePoint(end_time, *civil_time_converter_, metric));
to_generate.push_back(end_time_info);
}
for (util::TimeInfo time_info : to_generate) {
CB_ASSIGN_OR_RETURN(std::vector<ObservationAndSystemProfile> observations,
- procedure->GenerateObservations(time_info, report_aggregate));
+ procedure.GenerateObservations(time_info, report_aggregate));
if (!observations.empty()) {
for (ObservationAndSystemProfile& observation : observations) {
@@ -209,7 +208,7 @@
CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
privacy_encoder_->MaybeMakePrivateObservations(
- std::move(observation.observation), *metric, report));
+ std::move(observation.observation), metric, report));
CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
report, private_observations, *system_profile,
@@ -218,7 +217,7 @@
} else {
// Allow the privacy encoder a chance to create a fabricated observation.
CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
- privacy_encoder_->MaybeMakePrivateObservations(nullptr, *metric, report));
+ privacy_encoder_->MaybeMakePrivateObservations(nullptr, metric, report));
// Use the current system profile if any fabricated private observations are created.
// TODO(fxbug.dev/92955): choose plausible SystemProfiles for fabricated observations.
@@ -231,7 +230,7 @@
return Status::OkStatus();
}
Status ObservationGenerator::WriteObservations(
- ReportAggregate* report_aggregate, AggregationProcedure* procedure, util::TimeInfo time_info,
+ ReportAggregate& report_aggregate, AggregationProcedure& procedure, util::TimeInfo time_info,
const logger::MetricRef& metric_ref, const ReportDefinition& report,
const std::vector<std::unique_ptr<Observation>>& private_observations,
const SystemProfile& system_profile, std::optional<uint64_t> commit_system_profile_hash) {
@@ -248,7 +247,7 @@
}
if (commit_system_profile_hash != std::nullopt) {
- procedure->ObservationsCommitted(report_aggregate, time_info, *commit_system_profile_hash);
+ procedure.ObservationsCommitted(report_aggregate, time_info, *commit_system_profile_hash);
}
return Status::OkStatus();
}
diff --git a/src/local_aggregation_1_1/observation_generator.h b/src/local_aggregation_1_1/observation_generator.h
index 5516648..61add03 100644
--- a/src/local_aggregation_1_1/observation_generator.h
+++ b/src/local_aggregation_1_1/observation_generator.h
@@ -85,12 +85,12 @@
// GenerateObservationsForReportAggregate generates all needed observations for a given
// ReportAggregate, returning a Status::OK if all observations were generated.
Status GenerateObservationsForReportAggregate(
- const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate,
- AggregationProcedure* procedure, std::chrono::system_clock::time_point end_time,
- const MetricDefinition* metric, const logger::MetricRef& metric_ref,
+ const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
+ AggregationProcedure& procedure, std::chrono::system_clock::time_point end_time,
+ const MetricDefinition& metric, const logger::MetricRef& metric_ref,
const ReportDefinition& report);
- Status WriteObservations(ReportAggregate* report_aggregate, AggregationProcedure* procedure,
+ Status WriteObservations(ReportAggregate& report_aggregate, AggregationProcedure& procedure,
util::TimeInfo time_info, const logger::MetricRef& metric_ref,
const ReportDefinition& report,
const std::vector<std::unique_ptr<Observation>>& private_observations,
diff --git a/src/logger/BUILD.gn b/src/logger/BUILD.gn
index 87b3b05..5b4c8a0 100644
--- a/src/logger/BUILD.gn
+++ b/src/logger/BUILD.gn
@@ -32,6 +32,7 @@
]
public_deps = [
":project_context",
+ "$cobalt_root/src/lib/util:datetime_util",
"$cobalt_root/src/pb",
"$cobalt_root/src/registry:cobalt_registry_proto",
]
diff --git a/src/logger/event_record.h b/src/logger/event_record.h
index 350d070..2822e8d 100644
--- a/src/logger/event_record.h
+++ b/src/logger/event_record.h
@@ -9,6 +9,7 @@
#include <third_party/abseil-cpp/absl/strings/str_cat.h>
+#include "src/lib/util/datetime_util.h"
#include "src/lib/util/status_builder.h"
#include "src/logger/project_context.h"
#include "src/logging.h"
@@ -47,6 +48,13 @@
// Get the Event that is to be logged.
[[nodiscard]] Event* event() const { return event_.get(); }
+ [[nodiscard]] util::TimeInfo GetTimeInfo() const {
+ return {
+ .day_index = event_->day_index(),
+ .hour_id = event_->hour_id(),
+ };
+ }
+
// Get the SystemProfile that is to be logged with the event. Only used for Cobalt 1.1 metrics.
[[nodiscard]] SystemProfile* system_profile() const { return system_profile_.get(); }