Migration of aggregate data when REPORT_ALL changes to SELECT_FIRST/LAST.
Implement the Merge methods for the local aggregation procedures.
Use them to migrate REPORT_ALL aggregate data to SELECT_FIRST/LAST.
Bug: 91520
Change-Id: I22fc9c8a2482d4213dac73a5ea4acd9959d4d089
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/644182
Reviewed-by: Zach Bush <zmbush@google.com>
Commit-Queue: Cameron Dale <camrdale@google.com>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
index 31c37d4..1739024 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
@@ -175,26 +175,33 @@
system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
}
- for (EventCodesAggregateData &aggregate_data :
- *system_profile_aggregate->mutable_by_event_code()) {
- // Find the event codes that match the event's.
- if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
- event_codes.begin(), event_codes.end())) {
- return aggregate_data.mutable_data();
- }
- }
- // Event codes were not found, so add them as a new entry.
- if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
- EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
- aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
- return aggregate_data->mutable_data();
- }
+ return GetAggregateData(system_profile_aggregate, event_codes);
}
// TODO(fxbug.dev/85440): add support for finding or adding the system profile for REPORT_ALL.
return nullptr;
}
+AggregateData *AggregationProcedure::GetAggregateData(
+ SystemProfileAggregate *system_profile_aggregate,
+ google::protobuf::RepeatedField<uint32_t> event_codes) const {
+ for (EventCodesAggregateData &aggregate_data :
+ *system_profile_aggregate->mutable_by_event_code()) {
+ // Find the event codes that match the event's.
+ if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
+ event_codes.begin(), event_codes.end())) {
+ return aggregate_data.mutable_data();
+ }
+ }
+ // Event codes were not found, so add them as a new entry.
+ if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
+ EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
+ aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
+ return aggregate_data->mutable_data();
+ }
+ return nullptr;
+}
+
void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
ReportAggregate *aggregate, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) {
@@ -218,6 +225,36 @@
}
}
+void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+ const SystemProfileAggregate &aggregate) {
+ if (system_profile_selection_policy_ == SELECT_FIRST) {
+ if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
+ merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
+ merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+ }
+ if (aggregate.last_seen_timestamp() > merged_aggregate->last_seen_timestamp()) {
+ merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+ }
+ } else { // SELECT_LAST or SELECT_DEFAULT
+ if (aggregate.last_seen_timestamp() >= merged_aggregate->last_seen_timestamp()) {
+ merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
+ merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+ }
+ if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
+ merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+ }
+ }
+
+ for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) {
+ // Find or create the corresponding AggregateData in the merged system profile aggregate.
+ AggregateData *data = GetAggregateData(merged_aggregate, aggregate_data.event_codes());
+ // nullptr means there is no room in event_vector_buffer_max_ for more event codes.
+ if (data != nullptr) {
+ MergeAggregateData(data, aggregate_data.data());
+ }
+ }
+}
+
util::TimeInfo AggregationProcedure::GetStartTimeInfo(
const util::TimeInfo ¤t_time_info) const {
if (IsDaily()) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
index 9267897..ce1ca12 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
@@ -81,6 +81,15 @@
uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time);
+ // Merge two instances of SystemProfileAggregate according to this aggregation procedure.
+ //
+ // The event codes and their data from |aggregate| are merged into the event codes and data in
+ // |merged_aggregate|. Both system profile aggregates must be included in the same
+ // AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
+ // merging the AggregateData in each bucket.
+ void MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+ const SystemProfileAggregate &aggregate);
+
// GenerateObservations is the public interface for generating observations. It handles
// reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
// passing all that information down to GenerateSingleObservation.
@@ -140,15 +149,11 @@
// Merge two instances of the aggregate data for this procedure.
//
- // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Any fields
- // in the |bucket| (which contains |aggregate_data|) are also merged into |merged_bucket| (which
- // contains |merged_aggregate_data|).
- //
- // TODO(fxbug.dev/91520): implement this in all subclasses and make it pure virtual.
+ // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
+ // AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
+ // bucket to be preserved accurately.
virtual void MergeAggregateData(AggregateData *merged_aggregate_data,
- AggregationPeriodBucket *merged_bucket,
- const AggregateData &aggregate_data,
- const AggregationPeriodBucket &bucket) {}
+ const AggregateData &aggregate_data) = 0;
// GenerateSingleObservation generates an observation for the given report at the given time.
//
@@ -205,6 +210,14 @@
AggregationPeriodBucket *bucket, uint64_t system_profile_hash,
std::chrono::system_clock::time_point system_time) const;
+ // Returns a pointer to the AggregateData of |system_profile_aggregate| that data for
+ // |event_codes| should be added to, if |system_profile_aggregate| has capacity for the event
+ // vector of |event_codes|. If it is not present, adds the event vector of |event_codes| to
+ // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a null pointer
+ // if |system_profile_aggregate| does not have capacity for new event codes.
+ AggregateData *GetAggregateData(SystemProfileAggregate *system_profile_aggregate,
+ google::protobuf::RepeatedField<uint32_t> event_codes) const;
+
// Returns the starting TimeInfo to use aggregate data from for this report, when processing the
// current_time_info.
util::TimeInfo GetStartTimeInfo(const util::TimeInfo ¤t_time_info) const;
@@ -228,6 +241,11 @@
LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
}
+ void MergeAggregateData(AggregateData * /*merged_aggregate_data*/,
+ const AggregateData & /*aggregate_data*/) override {
+ LOG(ERROR) << "MergeAggregateData is UNIMPLEMENTED for " << DebugString();
+ }
+
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> & /* aggregates */,
const std::set<std::vector<uint32_t>> & /*event_vectors*/,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
index 2815581..e675c1a 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
@@ -48,7 +48,7 @@
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
- uint64_t system_profile_hash = 213ULL;
+ uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
@@ -78,14 +78,14 @@
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
- uint64_t second_system_profile_hash = 324ULL;
+ uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
@@ -115,14 +115,14 @@
ReportAggregate aggregate;
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
uint64_t num_events = event_vector_buffer_max + 1;
AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 1000;
- uint64_t second_system_profile_hash = 324ULL;
+ uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
count_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
@@ -149,7 +149,7 @@
ASSERT_GE(event_vector_buffer_max, 0u);
const uint32_t kDayIndex = 10000;
- uint64_t system_profile_hash = 213ULL;
+ uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
@@ -180,13 +180,13 @@
uint64_t num_events = event_vector_buffer_max + 1;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
- uint64_t second_system_profile_hash = 324ULL;
+ uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
@@ -218,14 +218,14 @@
uint32_t kNumEventCodes = 2;
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
// Add events for 2 different event vectors: {1} and {2}.
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(first_event_time));
int64_t second_event_time = first_event_time + 7200;
- uint64_t second_system_profile_hash = 324ULL;
+ uint64_t second_system_profile_hash = uint64_t{324};
AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate,
util::FromUnixSeconds(second_event_time));
@@ -242,6 +242,130 @@
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
+TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectLast) {
+ const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
+ ReportDefinition report =
+ metric.reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+
+ uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
+ ASSERT_GE(event_vector_buffer_max, 0u);
+
+ const uint32_t kDayIndex = 10000;
+ ReportAggregate aggregate;
+
+ // Manually create some events for the report as they would be with a REPORT_ALL policy.
+ // TODO(fxbug.dev/87403): use AddOccurrenceEventsForDay once REPORT_ALL is supported.
+
+ int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
+ uint64_t first_system_profile_hash = uint64_t{213};
+ AggregationPeriodBucket& bucket = (*aggregate.mutable_daily()->mutable_by_day_index())[kDayIndex];
+ SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
+ system_profile_aggregate->set_first_seen_timestamp(first_event_time);
+ system_profile_aggregate->set_last_seen_timestamp(first_event_time);
+ for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(i);
+ data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+ }
+
+ int64_t second_event_time = first_event_time + 7200;
+ uint64_t second_system_profile_hash = uint64_t{324};
+ system_profile_aggregate = bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
+ system_profile_aggregate->set_first_seen_timestamp(second_event_time);
+ system_profile_aggregate->set_last_seen_timestamp(second_event_time);
+ // Add more event_codes than can be supported in a single aggregate.
+ for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(i);
+ data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+ }
+
+ // Merge the second system profile aggregates into the first one.
+ std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
+ bucket = aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
+ SystemProfileAggregate* merged_system_profile_aggregate =
+ bucket.mutable_system_profile_aggregates(0);
+ at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
+ merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+
+ EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
+ EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
+ EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
+ ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
+ for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+ EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
+ EXPECT_TRUE(
+ merged_system_profile_aggregate->by_event_code(i).data().at_least_once().at_least_once());
+ }
+}
+
+TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectFirst) {
+ const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
+ ReportDefinition report = metric.reports(kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+
+ uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
+ ASSERT_GE(event_vector_buffer_max, 0u);
+
+ uint32_t kHourId = 10000;
+ ReportAggregate aggregate;
+
+ // Manually create some events for the report as they would be with a REPORT_ALL policy.
+ // TODO(fxbug.dev/87403): use AddOccurrenceEventsForHour once REPORT_ALL is supported.
+
+ int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
+ uint64_t first_system_profile_hash = uint64_t{213};
+ AggregationPeriodBucket& bucket = (*aggregate.mutable_hourly()->mutable_by_hour_id())[kHourId];
+ SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
+ system_profile_aggregate->set_first_seen_timestamp(first_event_time);
+ system_profile_aggregate->set_last_seen_timestamp(first_event_time);
+ for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(i);
+ data->mutable_data()->set_count(i);
+ }
+
+ int64_t second_event_time = first_event_time + 1000;
+ uint64_t second_system_profile_hash = uint64_t{324};
+ system_profile_aggregate = bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
+ system_profile_aggregate->set_first_seen_timestamp(second_event_time);
+ system_profile_aggregate->set_last_seen_timestamp(second_event_time);
+ // Add more event_codes than can be supported in a single aggregate.
+ for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(i);
+ data->mutable_data()->set_count(i);
+ }
+
+ // Merge the second system profile aggregates into the first one.
+ std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
+ bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
+ SystemProfileAggregate* merged_system_profile_aggregate =
+ bucket.mutable_system_profile_aggregates(0);
+ at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
+ merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+
+ EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
+ EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
+ EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
+ ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
+ for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+ EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
+ // Middle event code is double due to merge from both system profiles.
+ EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(),
+ i + 1 == event_vector_buffer_max / 2 ? 2 * (i + 1) : i + 1);
+ }
+}
+
TEST_F(AggregationProcedureTest, GenerateHourlyObservation) {
std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
@@ -250,7 +374,7 @@
ASSERT_GE(event_vector_buffer_max, 0u);
uint32_t kHourId = 10000;
- uint64_t system_profile_hash = 213ULL;
+ uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint64_t num_events = event_vector_buffer_max + 1;
@@ -284,7 +408,7 @@
uint32_t kFinalDayIndex = 10000;
uint32_t kFirstDayIndex = 10000 - 6;
- uint64_t system_profile_hash = 213ULL;
+ uint64_t system_profile_hash = uint64_t{213};
ReportAggregate aggregate;
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
@@ -322,12 +446,12 @@
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
- uint64_t second_system_profile_hash = 426ULL;
+ uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
at_least_once_aggregation_procedure.get(), &aggregate);
@@ -360,12 +484,12 @@
uint32_t kFirstDayIndex = 10000 - 6;
ReportAggregate aggregate;
- uint64_t first_system_profile_hash = 213ULL;
+ uint64_t first_system_profile_hash = uint64_t{213};
uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate);
- uint64_t second_system_profile_hash = 426ULL;
+ uint64_t second_system_profile_hash = uint64_t{426};
uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
select_first_aggregation_procedure.get(), &aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
index ab2159b..8472172 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
@@ -21,6 +21,18 @@
aggregate_data->mutable_at_least_once()->set_at_least_once(true);
}
+void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ if (aggregate_data.at_least_once().at_least_once()) {
+ merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ if (aggregate_data.at_least_once().last_day_index() >
+ merged_aggregate_data->at_least_once().last_day_index()) {
+ merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+ aggregate_data.at_least_once().last_day_index());
+ }
+ }
+}
+
std::string AtLeastOnceAggregationProcedure::DebugString() const { return "AT_LEAST_ONCE"; }
lib::statusor::StatusOr<std::unique_ptr<Observation>>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
index 3168fe9..5acf645 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
@@ -21,6 +21,10 @@
void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
// Call observation generation more frequently so that data can be sent as soon as possible.
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
index 5e69bd7..77bc7a1 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
@@ -25,7 +25,8 @@
return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
}
- std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+ std::unique_ptr<AtLeastOnceAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
return std::make_unique<AtLeastOnceAggregationProcedure>(GetMetricDef(metric_id),
GetReportDef(metric_id, report_index));
}
@@ -39,7 +40,7 @@
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
uint32_t kNumEventCodes = 100;
ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
@@ -58,6 +59,63 @@
}
}
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_at_least_once()->set_at_least_once(true);
+ data.mutable_at_least_once()->set_last_day_index(101);
+ AggregateData merged_data;
+ merged_data.mutable_at_least_once()->set_at_least_once(true);
+ merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+ std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_at_least_once());
+ EXPECT_FALSE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 0);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_at_least_once()->set_at_least_once(true);
+ data.mutable_at_least_once()->set_last_day_index(101);
+ AggregateData merged_data;
+
+ std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_at_least_once()->set_at_least_once(true);
+ merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+ std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
+}
+
TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservation1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex;
@@ -66,7 +124,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
const uint32_t kNumEventCodes = 100;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -117,7 +175,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
const uint32_t kNumEventCodes = 7;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -190,7 +248,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
const uint32_t kNumEventCodes = 7;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -267,7 +325,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
const uint32_t kNumEventCodes = 7;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -348,7 +406,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 777ULL;
+ const uint64_t system_profile_hash = uint64_t{777};
const uint32_t num_events = 1;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), num_events);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
index 301fa6f..961ade1 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
@@ -39,6 +39,11 @@
aggregate_data->set_count(aggregate_data->count() + occurrence_count);
}
+void CountAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+}
+
lib::statusor::StatusOr<std::unique_ptr<Observation>>
CountAggregationProcedure::GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
index 6450932..155b201 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
@@ -22,6 +22,9 @@
void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) override;
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
const std::vector<AggregateDataToGenerate> &buckets,
const std::set<std::vector<uint32_t>> &event_vectors,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
index 2f26277..7f7d17d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h"
+
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
@@ -12,7 +14,13 @@
namespace cobalt::local_aggregation {
-class CountAggregationProcedureTest : public testing::TestAggregationProcedure {};
+class CountAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<CountAggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+ return std::make_unique<CountAggregationProcedure>(GetMetricDef(metric_id),
+ GetReportDef(metric_id, report_index));
+ }
+};
TEST_F(CountAggregationProcedureTest, UpdateAggregateWorks) {
uint32_t metric_id = kOccurrenceMetricMetricId;
@@ -21,7 +29,7 @@
ReportAggregate aggregate;
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 56789ULL;
+ const uint64_t system_profile_hash = uint64_t{56789};
const uint32_t kNumEventCodes = 100;
AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, procedure.get(),
&aggregate);
@@ -45,7 +53,7 @@
ReportAggregate aggregate;
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 56789ULL;
+ const uint64_t system_profile_hash = uint64_t{56789};
const uint32_t kNumEventCodes = 100;
AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash,
procedure.get(), &aggregate);
@@ -62,6 +70,54 @@
}
}
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.set_count(10);
+ AggregateData merged_data;
+ merged_data.set_count(20);
+
+ std::unique_ptr<CountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 30);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<CountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 0);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.set_count(10);
+ AggregateData merged_data;
+
+ std::unique_ptr<CountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 10);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.set_count(20);
+
+ std::unique_ptr<CountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 20);
+}
+
TEST_F(CountAggregationProcedureTest, GenerateObservationWorks) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex;
@@ -72,7 +128,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 56789ULL;
+ const uint64_t system_profile_hash = uint64_t{56789};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
&aggregate);
@@ -109,7 +165,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 56789ULL;
+ const uint64_t system_profile_hash = uint64_t{56789};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
&aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
index 754b139..d1ea447 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
@@ -53,6 +53,13 @@
}
}
+void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ for (const auto &[index, count] : aggregate_data.integer_histogram().histogram()) {
+ (*merged_aggregate_data->mutable_integer_histogram()->mutable_histogram())[index] += count;
+ }
+}
+
lib::statusor::StatusOr<std::unique_ptr<Observation>>
IntegerHistogramAggregationProcedure::GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
index ce6f006..6112ded 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
@@ -21,6 +21,9 @@
void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) override;
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
index 5535308..581e08b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
+
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
@@ -47,6 +49,12 @@
util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
}
}
+
+ std::unique_ptr<IntegerHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<IntegerHistogramAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
};
TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksInteger) {
@@ -59,7 +67,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 1867ULL;
+ const uint64_t system_profile_hash = uint64_t{1867};
LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
@@ -80,7 +88,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 1867ULL;
+ const uint64_t system_profile_hash = uint64_t{1867};
LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}},
system_profile_hash, procedure.get(), &aggregate);
@@ -92,6 +100,73 @@
EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
}
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
+ data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
+ AggregateData merged_data;
+ merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
+ merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
+
+ std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 50);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_integer_histogram());
+ EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
+ data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
+ AggregateData merged_data;
+
+ std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 20);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
+ merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
+
+ std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 30);
+ ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
+ EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
+}
+
TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksInteger) {
uint32_t metric_id = kIntegerMetricMetricId;
int report_index = kIntegerMetricFleetwideHistogramsReportIndex;
@@ -102,7 +177,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 1867ULL;
+ const uint64_t system_profile_hash = uint64_t{1867};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
}
@@ -139,7 +214,7 @@
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 1867ULL;
+ const uint64_t system_profile_hash = uint64_t{1867};
const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
index 8c26130..30ef332 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
@@ -94,6 +94,13 @@
aggregate_data->set_integer_value(aggregate_data->integer_value() +
event_record.event()->integer_event().value());
}
+
+void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ merged_aggregate_data->set_integer_value(merged_aggregate_data->integer_value() +
+ aggregate_data.integer_value());
+}
+
int64_t SumNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = 0;
@@ -113,6 +120,19 @@
aggregate_data->set_integer_value(event_record.event()->integer_event().value());
}
}
+
+void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
+ merged_aggregate_data->set_integer_value(
+ std::min(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+ } else {
+ merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+ }
+ }
+}
+
int64_t MinNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::max();
@@ -132,6 +152,19 @@
aggregate_data->set_integer_value(event_record.event()->integer_event().value());
}
}
+
+void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+ if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
+ merged_aggregate_data->set_integer_value(
+ std::max(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+ } else {
+ merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+ }
+ }
+}
+
int64_t MaxNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t value = std::numeric_limits<int64_t>::min();
@@ -148,6 +181,14 @@
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
sum_and_count->set_count(sum_and_count->count() + 1);
}
+
+void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ SumAndCount *sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+ sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
+ sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
+}
+
int64_t MeanNumericStatAggregationProcedure::CollectValue(
const std::vector<const AggregateData *> &aggregates) {
int64_t sum = 0;
@@ -166,6 +207,11 @@
event_record.event()->integer_event().value());
}
+void MedianNumericStatAggregationProcedure::MergeAggregateData(
+ AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
+ merged_aggregate_data->mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
+}
+
namespace {
std::vector<int64_t> CollectValues(const std::vector<const AggregateData *> &aggregates) {
std::vector<int64_t> values;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
index 71a0fa0..9ebcd06 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
@@ -36,28 +36,95 @@
virtual int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) = 0;
};
-#define PROCEDURE(type) \
- class type##NumericStatAggregationProcedure : public NumericStatAggregationProcedure { \
- public: \
- explicit type##NumericStatAggregationProcedure(const MetricDefinition &metric, \
- const ReportDefinition &report) \
- : NumericStatAggregationProcedure(metric, report) {} \
- \
- void UpdateAggregateData(const logger::EventRecord &event_record, \
- AggregateData *aggregate_data, \
- AggregationPeriodBucket * /*bucket*/) override; \
- \
- [[nodiscard]] std::string DebugString() const override { return #type " NUMERIC_STAT"; } \
- \
- private: \
- int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override; \
- }
+class SumNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+ explicit SumNumericStatAggregationProcedure(const MetricDefinition &metric,
+ const ReportDefinition &report)
+ : NumericStatAggregationProcedure(metric, report) {}
-PROCEDURE(Sum);
-PROCEDURE(Min);
-PROCEDURE(Max);
-PROCEDURE(Mean);
-PROCEDURE(Median);
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+ AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
+ [[nodiscard]] std::string DebugString() const override { return "SUM_NUMERIC_STAT"; }
+
+ private:
+ int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MinNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+ explicit MinNumericStatAggregationProcedure(const MetricDefinition &metric,
+ const ReportDefinition &report)
+ : NumericStatAggregationProcedure(metric, report) {}
+
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+ AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
+ [[nodiscard]] std::string DebugString() const override { return "MIN_NUMERIC_STAT"; }
+
+ private:
+ int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MaxNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+ explicit MaxNumericStatAggregationProcedure(const MetricDefinition &metric,
+ const ReportDefinition &report)
+ : NumericStatAggregationProcedure(metric, report) {}
+
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+ AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
+ [[nodiscard]] std::string DebugString() const override { return "MAX_NUMERIC_STAT"; }
+
+ private:
+ int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MeanNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+ explicit MeanNumericStatAggregationProcedure(const MetricDefinition &metric,
+ const ReportDefinition &report)
+ : NumericStatAggregationProcedure(metric, report) {}
+
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+ AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
+ [[nodiscard]] std::string DebugString() const override { return "MEAN_NUMERIC_STAT"; }
+
+ private:
+ int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MedianNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+ explicit MedianNumericStatAggregationProcedure(const MetricDefinition &metric,
+ const ReportDefinition &report)
+ : NumericStatAggregationProcedure(metric, report) {}
+
+ void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+ AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
+ [[nodiscard]] std::string DebugString() const override { return "MEDIAN_NUMERIC_STAT"; }
+
+ private:
+ int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
class PercentileNNumericStatAggregationProcedure : public MedianNumericStatAggregationProcedure {
public:
@@ -66,7 +133,7 @@
: MedianNumericStatAggregationProcedure(metric, report),
percentile_n_(report.local_aggregation_procedure_percentile_n()) {}
- [[nodiscard]] std::string DebugString() const override { return "PercentileN NUMERIC_STAT"; }
+ [[nodiscard]] std::string DebugString() const override { return "PERCENTILE_N_NUMERIC_STAT"; }
private:
int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
index a3ac06e..9e096ef 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
@@ -7,6 +7,7 @@
#include <limits>
#include <memory>
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "src/lib/util/datetime_util.h"
@@ -22,6 +23,8 @@
namespace cobalt::local_aggregation {
using TimeInfo = util::TimeInfo;
+using ::testing::IsEmpty;
+using ::testing::UnorderedElementsAre;
class NumericStatAggregationProcedureTest
: public testing::TestAggregationProcedure,
@@ -130,23 +133,23 @@
void CheckDebugString(int64_t daily_report_type, AggregationProcedure *procedure) {
switch (daily_report_type) {
case kIntegerMetricUniqueDeviceNumericStatsReport7DaySumReportIndex:
- ASSERT_EQ(procedure->DebugString(), "Sum NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "SUM_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMinReportIndex:
- ASSERT_EQ(procedure->DebugString(), "Min NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "MIN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMaxReportIndex:
- ASSERT_EQ(procedure->DebugString(), "Max NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "MAX_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMeanReportIndex:
- ASSERT_EQ(procedure->DebugString(), "Mean NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "MEAN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7DayMedianReportIndex:
- ASSERT_EQ(procedure->DebugString(), "Median NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "MEDIAN_NUMERIC_STAT");
break;
case kIntegerMetricUniqueDeviceNumericStatsReport7Day75thPercentileReportIndex:
case kIntegerMetricUniqueDeviceNumericStatsReport7Day99thPercentileReportIndex:
- ASSERT_EQ(procedure->DebugString(), "PercentileN NUMERIC_STAT");
+ ASSERT_EQ(procedure->DebugString(), "PERCENTILE_N_NUMERIC_STAT");
break;
}
}
@@ -163,7 +166,7 @@
ReportAggregate report_aggregate;
const uint32_t kDayIndex = 7;
- const uint64_t system_profile_hash = 987ULL;
+ const uint64_t system_profile_hash = uint64_t{987};
int64_t window_size = static_cast<int64_t>(integer_sequence.size()) / kDayIndex;
std::vector<int64_t> window;
@@ -209,7 +212,7 @@
ReportAggregate report_aggregate;
const uint32_t kHourId = 20;
- const uint64_t system_profile_hash = 987ULL;
+ const uint64_t system_profile_hash = uint64_t{987};
for (uint32_t hour = 0; hour <= kHourId; hour += 2) {
LogIntegerSequence(integer_sequence, TimeInfo::FromHourId(hour), system_profile_hash,
@@ -284,4 +287,370 @@
}),
TestName);
+class SumNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<SumNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<SumNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.set_integer_value(200);
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_value(), 300);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.set_integer_value(200);
+ AggregateData merged_data;
+
+ std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_value(), 200);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MinNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<MinNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<MinNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.set_integer_value(-15);
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.set_integer_value(-15);
+ AggregateData merged_data;
+
+ std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MaxNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<MaxNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<MaxNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.set_integer_value(-15);
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.set_integer_value(-15);
+ AggregateData merged_data;
+
+ std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.set_integer_value(100);
+
+ std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.has_integer_value());
+ EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MeanNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<MeanNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<MeanNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_sum_and_count()->set_count(10);
+ data.mutable_sum_and_count()->set_sum(200);
+ AggregateData merged_data;
+ merged_data.mutable_sum_and_count()->set_count(20);
+ merged_data.mutable_sum_and_count()->set_sum(100);
+
+ std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 30);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 0);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_sum_and_count()->set_count(10);
+ data.mutable_sum_and_count()->set_sum(200);
+ AggregateData merged_data;
+
+ std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 10);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_sum_and_count()->set_count(20);
+ merged_data.mutable_sum_and_count()->set_sum(100);
+
+ std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 20);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
+}
+
+class MedianNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<MedianNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<MedianNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_integer_values()->add_value(-15);
+ data.mutable_integer_values()->add_value(150);
+ AggregateData merged_data;
+ merged_data.mutable_integer_values()->add_value(100);
+ merged_data.mutable_integer_values()->add_value(10);
+
+ std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_integer_values()->add_value(-15);
+ data.mutable_integer_values()->add_value(150);
+ AggregateData merged_data;
+
+ std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_integer_values()->add_value(100);
+ merged_data.mutable_integer_values()->add_value(10);
+
+ std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+ GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
+}
+
+class PercentileNNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<PercentileNNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<PercentileNNumericStatAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
+};
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_integer_values()->add_value(-15);
+ data.mutable_integer_values()->add_value(150);
+ AggregateData merged_data;
+ merged_data.mutable_integer_values()->add_value(100);
+ merged_data.mutable_integer_values()->add_value(10);
+
+ std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+ kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+ kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_integer_values()->add_value(-15);
+ data.mutable_integer_values()->add_value(150);
+ AggregateData merged_data;
+
+ std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+ kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_integer_values()->add_value(100);
+ merged_data.mutable_integer_values()->add_value(10);
+
+ std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+ kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
+}
+
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
index cef9424..23138ea 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
@@ -23,6 +23,18 @@
aggregate_data->mutable_at_least_once()->set_at_least_once(true);
}
+void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ if (aggregate_data.at_least_once().at_least_once()) {
+ merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+ if (aggregate_data.at_least_once().last_day_index() >
+ merged_aggregate_data->at_least_once().last_day_index()) {
+ merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+ aggregate_data.at_least_once().last_day_index());
+ }
+ }
+}
+
std::string SelectFirstAggregationProcedure::DebugString() const { return "SELECT_FIRST"; }
lib::statusor::StatusOr<std::unique_ptr<Observation>>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
index 814cee1..0a540a7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
@@ -18,6 +18,10 @@
void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
// Call observation generation more frequently so that data can be sent as soon as possible.
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
index 75ec93c..1f6ee10 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
@@ -23,7 +23,8 @@
return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
}
- std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+ std::unique_ptr<SelectFirstAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
return std::make_unique<SelectFirstAggregationProcedure>(GetMetricDef(metric_id),
GetReportDef(metric_id, report_index));
}
@@ -36,7 +37,7 @@
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
- const uint64_t system_profile_hash = 999ULL;
+ const uint64_t system_profile_hash = uint64_t{999};
std::unique_ptr<logger::EventRecord> record = GetEventRecord(metric_id);
record->event()->set_day_index(kDayIndex);
@@ -57,6 +58,67 @@
ASSERT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_at_least_once()->set_at_least_once(true);
+ data.mutable_at_least_once()->set_last_day_index(101);
+ AggregateData merged_data;
+ merged_data.mutable_at_least_once()->set_at_least_once(true);
+ merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+ std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_at_least_once());
+ EXPECT_FALSE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 0);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_at_least_once()->set_at_least_once(true);
+ data.mutable_at_least_once()->set_last_day_index(101);
+ AggregateData merged_data;
+
+ std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_at_least_once()->set_at_least_once(true);
+ merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+ std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+ EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
+}
+
TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation1DayReport) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex;
@@ -65,7 +127,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 999ULL;
+ const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
@@ -108,7 +170,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 999ULL;
+ const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
@@ -170,7 +232,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 999ULL;
+ const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
@@ -237,7 +299,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 999ULL;
+ const uint64_t system_profile_hash = uint64_t{999};
const uint32_t kNumEventCodes = 2;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
index f276749..75d7e4d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
@@ -22,6 +22,11 @@
event_record.event()->occurrence_event().count());
}
+void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+}
+
std::string SelectMostCommonAggregationProcedure::DebugString() const {
return "SELECT_MOST_COMMON";
}
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
index e4f67a4..e2430ff 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
@@ -20,6 +20,10 @@
void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket * /*bucket*/) override;
+
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
[[nodiscard]] std::string DebugString() const override;
[[nodiscard]] bool IsDaily() const override { return true; }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
index cf1a45d..67ae97e 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
@@ -23,7 +23,8 @@
return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
}
- std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+ std::unique_ptr<SelectMostCommonAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
return std::make_unique<SelectMostCommonAggregationProcedure>(
GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
}
@@ -37,7 +38,7 @@
std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
const uint32_t kDayIndex = 10000;
- const uint64_t system_profile_hash = 1234568ULL;
+ const uint64_t system_profile_hash = uint64_t{1234568};
uint32_t kNumEventCodes = 100;
ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
@@ -56,6 +57,58 @@
}
}
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.set_count(100);
+ AggregateData merged_data;
+ merged_data.set_count(121);
+
+ std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 221);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 0);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.set_count(100);
+ AggregateData merged_data;
+
+ std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 100);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.set_count(121);
+
+ std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId,
+ kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.count(), 121);
+}
+
TEST_F(SelectMostCommonAggregationProcedureTest, GenerateObservation1DayReportNoEvents) {
uint32_t metric_id = kOccurrenceMetricMetricId;
int report_index = kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex;
@@ -83,7 +136,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 1234568ULL;
+ const uint64_t system_profile_hash = uint64_t{1234568};
const uint32_t kNumEventCodes = 100;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -122,7 +175,7 @@
const uint32_t kDayIndex = 10000;
util::TimeInfo time_info;
time_info.day_index = kDayIndex;
- const uint64_t system_profile_hash = 1234568ULL;
+ const uint64_t system_profile_hash = uint64_t{1234568};
const uint32_t kNumEventCodes = 7;
ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
index bf4cd9b..6f232e3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
@@ -37,6 +37,18 @@
}
}
+void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) {
+ // Merge in the aggregate data's mapping of indexes to their count.
+ // This only works correctly if the AggregateData objects are both part of the same
+ // AggregationPeriodBucket, such that their string_index values both refer to the same repeated
+ // string_hashes field in the bucket.
+ for (const auto &[string_index, count] : aggregate_data.string_histogram().histogram()) {
+ (*merged_aggregate_data->mutable_string_histogram()->mutable_histogram())[string_index] +=
+ count;
+ }
+}
+
lib::statusor::StatusOr<std::unique_ptr<Observation>>
StringHistogramAggregationProcedure::GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
index 60edd72..aa67ed8 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
@@ -21,6 +21,9 @@
void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket *bucket) override;
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
index d370ce4..6f358e6 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h"
+
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -17,7 +19,7 @@
using ::testing::Contains;
class StringHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
- public:
+ protected:
void LogStringEvents(uint32_t hour_id, uint32_t num_event_codes,
const std::vector<std::string>& strings, uint64_t system_profile_hash,
AggregationProcedure* procedure, ReportAggregate* aggregate) {
@@ -34,6 +36,12 @@
}
}
}
+
+ std::unique_ptr<StringHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<StringHistogramAggregationProcedure>(
+ GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+ }
};
TEST_F(StringHistogramAggregationProcedureTest, UpdateAggregateWorks) {
@@ -43,7 +51,7 @@
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 100;
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 111111ULL;
+ const uint64_t system_profile_hash = uint64_t{111111};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
@@ -62,6 +70,73 @@
ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
}
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
+ data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
+ AggregateData merged_data;
+ merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
+ merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
+
+ std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.string_histogram().histogram_size(), 3);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 50);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_FALSE(merged_data.has_string_histogram());
+ EXPECT_EQ(merged_data.string_histogram().histogram_size(), 0);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
+ data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
+ AggregateData merged_data;
+
+ std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 20);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
+ merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
+
+ std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+ GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 30);
+ ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
+ EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
+}
+
TEST_F(StringHistogramAggregationProcedureTest, GenerateObservationWorks) {
std::unique_ptr<AggregationProcedure> procedure =
GetProcedureFor(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
@@ -69,7 +144,7 @@
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 111111ULL;
+ const uint64_t system_profile_hash = uint64_t{111111};
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
"Suspendisse ullamcorper mi vel pulvinar dictum.",
@@ -122,7 +197,7 @@
ReportAggregate aggregate;
const uint32_t kNumEventCodes = 10;
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 111111ULL;
+ const uint64_t system_profile_hash = uint64_t{111111};
// The string_buffer_max is 5, this is too many
const std::vector<std::string> kTestStrings = {
"Nunc dictum justo ac arcu.",
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
index ab6dc27..a7572dd 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
@@ -22,6 +22,13 @@
sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
}
+void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData* merged_aggregate_data,
+ const AggregateData& aggregate_data) {
+ SumAndCount* sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+ sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
+ sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
+}
+
lib::statusor::StatusOr<std::unique_ptr<Observation>>
SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate& bucket) {
std::vector<std::tuple<std::vector<uint32_t>, int64_t, uint32_t>> data;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
index 62cd875..fbc28b3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
@@ -20,6 +20,9 @@
void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
AggregationPeriodBucket *bucket) override;
+ void MergeAggregateData(AggregateData *merged_aggregate_data,
+ const AggregateData &aggregate_data) override;
+
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
const AggregateDataToGenerate &bucket) override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
index 15d8876..0bfed7b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h"
+
#include <gtest/gtest.h>
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
@@ -11,7 +13,14 @@
namespace cobalt::local_aggregation {
-class SumAndCountAggregationProcedureTest : public testing::TestAggregationProcedure {};
+class SumAndCountAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+ std::unique_ptr<SumAndCountAggregationProcedure> GetProcedure(uint32_t metric_id,
+ int report_index) {
+ return std::make_unique<SumAndCountAggregationProcedure>(GetMetricDef(metric_id),
+ GetReportDef(metric_id, report_index));
+ }
+};
TEST_F(SumAndCountAggregationProcedureTest, UpdateAggregateWorks) {
uint32_t metric_id = kIntegerMetricMetricId;
@@ -24,7 +33,7 @@
const int64_t kValue = 42;
const uint32_t kHourId = 1;
- const uint64_t system_profile_hash = 634354ULL;
+ const uint64_t system_profile_hash = uint64_t{634354};
AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, procedure.get(),
&aggregate);
@@ -41,6 +50,62 @@
}
}
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataBothSet) {
+ AggregateData data;
+ data.mutable_sum_and_count()->set_count(10);
+ data.mutable_sum_and_count()->set_sum(200);
+ AggregateData merged_data;
+ merged_data.mutable_sum_and_count()->set_count(20);
+ merged_data.mutable_sum_and_count()->set_sum(100);
+
+ std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 30);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+ AggregateData data;
+ AggregateData merged_data;
+
+ std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 0);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataFromSet) {
+ AggregateData data;
+ data.mutable_sum_and_count()->set_count(10);
+ data.mutable_sum_and_count()->set_sum(200);
+ AggregateData merged_data;
+
+ std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 10);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataToSet) {
+ AggregateData data;
+ AggregateData merged_data;
+ merged_data.mutable_sum_and_count()->set_count(20);
+ merged_data.mutable_sum_and_count()->set_sum(100);
+
+ std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+ GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+ procedure->MergeAggregateData(&merged_data, data);
+
+ EXPECT_EQ(merged_data.sum_and_count().count(), 20);
+ EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
+}
+
TEST_F(SumAndCountAggregationProcedureTest, GenerateObservationWorks) {
uint32_t metric_id = kIntegerMetricMetricId;
int report_index = kIntegerMetricFleetwideMeansReportReportIndex;
@@ -52,7 +117,7 @@
const int64_t kValue = 42;
const uint32_t kEndHourId = 11;
- const uint64_t system_profile_hash = 634354ULL;
+ const uint64_t system_profile_hash = uint64_t{634354};
for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, procedure.get(),
&aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
index cd13e23..194d2a7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
@@ -48,8 +48,8 @@
current_metric_id_ = metric_id;
current_report_index_ = report_index;
- const auto& metric = GetMetricDef(current_metric_id_);
- const auto& report = metric.reports(current_report_index_);
+ const MetricDefinition& metric = GetMetricDef(current_metric_id_);
+ const ReportDefinition& report = metric.reports(current_report_index_);
current_event_vector_buffer_max_ = report.event_vector_buffer_max();
auto agg_or = AggregationProcedure::Get(metric, report);
@@ -83,7 +83,7 @@
AggregationProcedure* procedure, ReportAggregate* aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
auto record = MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
- auto* event = record->event()->mutable_occurrence_event();
+ OccurrenceEvent* event = record->event()->mutable_occurrence_event();
event->add_event_code(0);
event->set_count(count);
@@ -110,7 +110,7 @@
AggregationProcedure* procedure, ReportAggregate* aggregate,
std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
- auto* event = record->event()->mutable_occurrence_event();
+ OccurrenceEvent* event = record->event()->mutable_occurrence_event();
event->add_event_code(0);
event->set_count(count);
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
index 2c1b08a..68b17a3 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
+++ b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
@@ -23,12 +23,14 @@
"$cobalt_root/src/lib/util:hash",
"$cobalt_root/src/lib/util:protected_fields",
"$cobalt_root/src/local_aggregation_1_1:proto",
+ "$cobalt_root/src/local_aggregation_1_1/aggregation_procedures",
"$cobalt_root/src/logger:internal_metrics",
"$cobalt_root/src/logger:logger_interface",
"$cobalt_root/src/logger:project_context_factory",
"$cobalt_root/src/pb:metadata_builder",
"$cobalt_root/src/public/lib:registry_identifiers",
"$cobalt_root/src/public/lib:status",
+ "$cobalt_root/src/public/lib/statusor",
"$cobalt_root/src/registry:cobalt_registry_proto",
]
configs += [ "$cobalt_root:cobalt_config" ]
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
index 9e56cdf..a03fcf0 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
@@ -101,7 +101,7 @@
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
- agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
+ agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile());
agg.Save();
}
@@ -430,7 +430,7 @@
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
- agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
+ agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile());
agg.Save();
}
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
index bbc2eaa..99d6fad 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
@@ -8,10 +8,12 @@
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
+#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
+#include "src/public/lib/statusor/statusor.h"
namespace cobalt::local_aggregation {
@@ -76,6 +78,40 @@
StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
}
+bool LocalAggregateStorage::MigrateReportAllData(AggregationPeriodBucket& bucket,
+ const MetricDefinition& metric_definition,
+ const ReportDefinition& report_def) {
+ lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> procedure_or =
+ AggregationProcedure::Get(metric_definition, report_def);
+ if (!procedure_or.ok()) {
+ LOG(ERROR) << "Failed to create aggregation procedure to migrate REPORT_ALL data for report "
+ << metric_definition.id() << "-" << report_def.id() << ": " << procedure_or.status();
+ return false;
+ }
+ std::unique_ptr<AggregationProcedure> procedure = procedure_or.ConsumeValueOrDie();
+ if (procedure == nullptr) {
+ // This is a non cobalt 1.1 report type, should be silently ignored.
+ return false;
+ }
+
+ google::protobuf::RepeatedPtrField<SystemProfileAggregate>* system_profile_aggregates =
+ bucket.mutable_system_profile_aggregates();
+ auto system_profile_aggregates_it = system_profile_aggregates->begin();
+
+ // Merge all the other aggregates into the first entry.
+ SystemProfileAggregate& merged_system_profile_aggregate = *system_profile_aggregates_it;
+
+ ++system_profile_aggregates_it;
+ while (system_profile_aggregates_it != system_profile_aggregates->end()) {
+ procedure->MergeSystemProfileAggregates(&merged_system_profile_aggregate,
+ *system_profile_aggregates_it);
+
+ // After merging, remove the old aggregate from the repeated field.
+ system_profile_aggregates_it = system_profile_aggregates->erase(system_profile_aggregates_it);
+ }
+ return true;
+}
+
bool LocalAggregateStorage::MigrateStoredData(MetricAggregate& metric,
const MetricDefinition& metric_definition,
const MetadataBuilder* metadata_builder) {
@@ -121,6 +157,15 @@
"for report: "
<< report_id << " hour id: " << hour_id;
}
+ if (report_def->system_profile_selection() != REPORT_ALL &&
+ bucket.system_profile_aggregates_size() > 1) {
+ if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
+ changed = true;
+ LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
+ << SystemProfileSelectionPolicy_Name(
+ report_def->system_profile_selection());
+ }
+ }
}
break;
case ReportAggregate::kDaily:
@@ -134,6 +179,15 @@
"for report: "
<< report_id << " day index: " << day_index;
}
+ if (report_def->system_profile_selection() != REPORT_ALL &&
+ bucket.system_profile_aggregates_size() > 1) {
+ if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
+ changed = true;
+ LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
+ << SystemProfileSelectionPolicy_Name(
+ report_def->system_profile_selection());
+ }
+ }
}
break;
default:
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
index 196ca33..0b0e916 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
@@ -193,6 +193,10 @@
const ReportDefinition &report_def,
const MetadataBuilder *metadata_builder);
+ static bool MigrateReportAllData(AggregationPeriodBucket &bucket,
+ const MetricDefinition &metric_definition,
+ const ReportDefinition &report_def);
+
const int64_t per_project_reserved_bytes_;
struct ByteTracking {
// The total number of bytes used across all projects.
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
index 70b59dd..99d0bdd 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
@@ -4,16 +4,22 @@
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logging.h"
+#include "src/public/lib/statusor/statusor.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
+using lib::statusor::StatusOr;
+using ::testing::Contains;
+using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
+
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
@@ -86,4 +92,155 @@
EXPECT_EQ(storage_->AmountStored(proj), 0);
}
+TEST_F(LocalAggregateStorageTest, MigrateDailyReportAllToSelectLast) {
+ // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
+ // a REPORT_ALL metric.
+ uint64_t system_profile_hash_1 = uint64_t{1234};
+ uint64_t system_profile_hash_2 = uint64_t{5678};
+ uint32_t day_index = 1000;
+ {
+ StatusOr<MetricAggregateRef> agg_or =
+ storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+ .ForProject(kProjectId)
+ .ForMetric(kOccurrenceMetricMetricId));
+ ASSERT_TRUE(agg_or.ok());
+ MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
+
+ ReportAggregate& report_agg =
+ (*agg.aggregate()
+ ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
+ report_agg.mutable_daily()->set_last_day_index(day_index);
+ AggregationPeriodBucket& mutable_bucket =
+ (*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
+
+ SystemProfileAggregate* system_profile_aggregate =
+ mutable_bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
+ system_profile_aggregate->set_first_seen_timestamp(9000);
+ system_profile_aggregate->set_last_seen_timestamp(9001);
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(1);
+ data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+
+ system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
+ system_profile_aggregate->set_first_seen_timestamp(10000);
+ system_profile_aggregate->set_last_seen_timestamp(10001);
+ data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(2);
+ data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+
+ ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
+ } // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
+
+ ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
+ .ConsumeValueOrDie(),
+ Contains("1"));
+
+ // Reload the storage, triggering a migration.
+ ReplaceRegistry();
+
+ StatusOr<MetricAggregateRef> agg_or =
+ storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+ .ForProject(kProjectId)
+ .ForMetric(kOccurrenceMetricMetricId));
+ ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
+ MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
+ const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
+ kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
+ ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index));
+ const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
+
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
+ const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
+ EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_2);
+ EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
+ EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
+
+ ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
+ const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
+ ASSERT_EQ(data.event_codes_size(), 1);
+ EXPECT_EQ(data.event_codes(0), 1);
+ EXPECT_TRUE(data.data().at_least_once().at_least_once());
+ const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
+ ASSERT_EQ(data2.event_codes_size(), 1);
+ EXPECT_EQ(data2.event_codes(0), 2);
+ EXPECT_TRUE(data2.data().at_least_once().at_least_once());
+}
+
+TEST_F(LocalAggregateStorageTest, MigrateHourlyReportAllToSelectFirst) {
+ // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
+ // a REPORT_ALL metric.
+ uint64_t system_profile_hash_1 = uint64_t{1234};
+ uint64_t system_profile_hash_2 = uint64_t{5678};
+ uint32_t hour_id = 100001;
+ {
+ StatusOr<MetricAggregateRef> agg_or =
+ storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+ .ForProject(kProjectId)
+ .ForMetric(kOccurrenceMetricMetricId));
+ ASSERT_TRUE(agg_or.ok());
+ MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
+
+ ReportAggregate& report_agg =
+ (*agg.aggregate()->mutable_by_report_id())[kOccurrenceMetricHourlyDeviceHistogramsReportId];
+ report_agg.mutable_hourly()->set_last_hour_id(hour_id);
+ AggregationPeriodBucket& mutable_bucket =
+ (*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id];
+
+ SystemProfileAggregate* system_profile_aggregate =
+ mutable_bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
+ system_profile_aggregate->set_first_seen_timestamp(9000);
+ system_profile_aggregate->set_last_seen_timestamp(9001);
+ EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(1);
+ data->mutable_data()->set_count(3);
+
+ system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
+ system_profile_aggregate->set_first_seen_timestamp(10000);
+ system_profile_aggregate->set_last_seen_timestamp(10001);
+ data = system_profile_aggregate->add_by_event_code();
+ data->add_event_codes(2);
+ data->mutable_data()->set_count(5);
+
+ ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
+ } // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
+
+ ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
+ .ConsumeValueOrDie(),
+ Contains("1"));
+
+ // Reload the storage, triggering a migration.
+ ReplaceRegistry();
+
+ StatusOr<MetricAggregateRef> agg_or =
+ storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+ .ForProject(kProjectId)
+ .ForMetric(kOccurrenceMetricMetricId));
+ ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
+ MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
+ const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
+ kOccurrenceMetricHourlyDeviceHistogramsReportId);
+ ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id));
+ const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id);
+
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
+ const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
+ EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_1);
+ EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
+ EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
+
+ ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
+ const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
+ ASSERT_EQ(data.event_codes_size(), 1);
+ EXPECT_EQ(data.event_codes(0), 1);
+ EXPECT_EQ(data.data().count(), 3);
+ const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
+ ASSERT_EQ(data2.event_codes_size(), 1);
+ EXPECT_EQ(data2.event_codes(0), 2);
+ EXPECT_EQ(data2.data().count(), 5);
+}
+
} // namespace cobalt::local_aggregation