[REPORT_ALL] Re-implement REPORT_ALL aggregation
This change builds on top of the work done for fxbug.dev/91520 that
stores the SystemProfile in a convenient location.
Bug: 87403
Bug: 87271
Bug: 91520
Change-Id: I6eb001e5f835ee702f635a2400dec40d0e156dbe
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/598781
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
Reviewed-by: Laura Peskin <pesk@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/bin/config_parser/src/config_validator/report_definitions.go b/src/bin/config_parser/src/config_validator/report_definitions.go
index 3cfa421..cb4ca4d 100644
--- a/src/bin/config_parser/src/config_validator/report_definitions.go
+++ b/src/bin/config_parser/src/config_validator/report_definitions.go
@@ -195,10 +195,6 @@
reportErrors.addError("privacy_level", fmt.Errorf("The privacy_level field is required for reports of type %s", r.ReportType))
}
- if r.SystemProfileSelection == config.SystemProfileSelectionPolicy_REPORT_ALL {
- reportErrors.addError("system_profile_selection", fmt.Errorf("The REPORT_ALL system_profile_selection is not yet ready to be used"))
- }
-
if err := validateMinValueMaxValue(r); err != nil {
reportErrors.addError("min_value or max_value", err)
}
diff --git a/src/bin/config_parser/src/config_validator/report_definitions_test.go b/src/bin/config_parser/src/config_validator/report_definitions_test.go
index 2ff46a3..cb8a2b6 100644
--- a/src/bin/config_parser/src/config_validator/report_definitions_test.go
+++ b/src/bin/config_parser/src/config_validator/report_definitions_test.go
@@ -974,17 +974,3 @@
t.Errorf("Rejected report with expedited_sending and valid report type: %v", err)
}
}
-
-func TestValidateReportAll(t *testing.T) {
- m := makeValidMetric(config.MetricDefinition_OCCURRENCE)
- r := makeValidReportWithType(config.ReportDefinition_FLEETWIDE_OCCURRENCE_COUNTS)
- r.PrivacyLevel = config.ReportDefinition_NO_ADDED_PRIVACY
- r.SystemProfileSelection = config.SystemProfileSelectionPolicy_REPORT_ALL
- if err := validateReportsAddedInCobalt11(m, r); err == nil {
- t.Error("Accepted report with system_profile_selection policy REPORT_ALL")
- }
- r.SystemProfileSelection = config.SystemProfileSelectionPolicy_SELECT_LAST
- if err := validateReportsAddedInCobalt11(m, r); err != nil {
- t.Errorf("Rejected report with system_profile_selection policy SELECT_LAST: %v", err)
- }
-}
diff --git a/src/local_aggregation_1_1/BUILD.gn b/src/local_aggregation_1_1/BUILD.gn
index e056556..d90ec25 100644
--- a/src/local_aggregation_1_1/BUILD.gn
+++ b/src/local_aggregation_1_1/BUILD.gn
@@ -52,6 +52,7 @@
"local_aggregate_storage",
"local_aggregate_storage:tests",
"testing:report_all_test_registry_default_system_profile_selection",
+ "testing:report_all_test_registry_with_report_all_set",
"//third_party/googletest:gmock",
"//third_party/googletest:gtest",
]
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
index 1739024..e2cb77a 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
@@ -177,7 +177,23 @@
return GetAggregateData(system_profile_aggregate, event_codes);
}
- // TODO(fxbug.dev/85440): add support for finding or adding the system profile for REPORT_ALL.
+
+ if (system_profile_selection_policy_ == REPORT_ALL) {
+ SystemProfileAggregate *system_profile_aggregate = nullptr;
+ for (SystemProfileAggregate &aggregate : *(bucket->mutable_system_profile_aggregates())) {
+ if (aggregate.system_profile_hash() == system_profile_hash) {
+ system_profile_aggregate = &aggregate;
+ }
+ }
+ if (system_profile_aggregate == nullptr) {
+ system_profile_aggregate = bucket->add_system_profile_aggregates();
+ system_profile_aggregate->set_system_profile_hash(system_profile_hash);
+ system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
+ }
+ system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
+
+ return GetAggregateData(system_profile_aggregate, event_codes);
+ }
return nullptr;
}
@@ -185,6 +201,8 @@
AggregateData *AggregationProcedure::GetAggregateData(
SystemProfileAggregate *system_profile_aggregate,
google::protobuf::RepeatedField<uint32_t> event_codes) const {
+ CHECK(system_profile_aggregate != nullptr)
+ << "Null SystemProfileAggregate passed to GetAggregateData";
for (EventCodesAggregateData &aggregate_data :
*system_profile_aggregate->mutable_by_event_code()) {
// Find the event codes that match the event's.
@@ -275,42 +293,61 @@
util::TimeInfo start_time_info = GetStartTimeInfo(time_info);
if (IsDaily()) {
- // TODO(fxbug.dev/85440): add multiple system profiles to data_to_generate for REPORT_ALL.
- uint64_t system_profile_hash;
- uint32_t last_seen_timestamp = 0;
- uint32_t first_seen_timestamp = UINT32_MAX;
- std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
- for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
- if (!aggregate->daily().by_day_index().contains(i)) {
- continue;
- }
- AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
- AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
- for (SystemProfileAggregate &system_profile_aggregate :
- *agg->mutable_system_profile_aggregates()) {
- // For SELECT_FIRST and SELECT_LAST there should only be one SystemProfileAggregate, but
- // for multi-day reports the correct system profile to use has to be determined from the
- // multiple AggregationPeriodBuckets for the days in the aggregation window.
- if ((system_profile_selection_policy_ == SELECT_LAST ||
- system_profile_selection_policy_ == SELECT_DEFAULT) &&
- system_profile_aggregate.last_seen_timestamp() >= last_seen_timestamp) {
- system_profile_hash = system_profile_aggregate.system_profile_hash();
- last_seen_timestamp = system_profile_aggregate.last_seen_timestamp();
- } else if (system_profile_selection_policy_ == SELECT_FIRST &&
- system_profile_aggregate.first_seen_timestamp() < first_seen_timestamp) {
- system_profile_hash = system_profile_aggregate.system_profile_hash();
- first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
+ if (system_profile_selection_policy_ == SELECT_LAST ||
+ system_profile_selection_policy_ == SELECT_FIRST ||
+ system_profile_selection_policy_ == SELECT_DEFAULT) {
+ uint64_t system_profile_hash;
+ uint32_t last_seen_timestamp = 0;
+ uint32_t first_seen_timestamp = UINT32_MAX;
+ std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
+ for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
+ if (!aggregate->daily().by_day_index().contains(i)) {
+ continue;
}
- for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
- agg_to_generate.aggregate_data.push_back(&data);
+ AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+ AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
+ for (SystemProfileAggregate &system_profile_aggregate :
+ *agg->mutable_system_profile_aggregates()) {
+ // For SELECT_FIRST and SELECT_LAST there should only be one SystemProfileAggregate, but
+ // for multi-day reports the correct system profile to use has to be determined from the
+ // multiple AggregationPeriodBuckets for the days in the aggregation window.
+ if ((system_profile_selection_policy_ == SELECT_LAST ||
+ system_profile_selection_policy_ == SELECT_DEFAULT) &&
+ system_profile_aggregate.last_seen_timestamp() >= last_seen_timestamp) {
+ system_profile_hash = system_profile_aggregate.system_profile_hash();
+ last_seen_timestamp = system_profile_aggregate.last_seen_timestamp();
+ } else if (system_profile_selection_policy_ == SELECT_FIRST &&
+ system_profile_aggregate.first_seen_timestamp() < first_seen_timestamp) {
+ system_profile_hash = system_profile_aggregate.system_profile_hash();
+ first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
+ }
+ for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
+ agg_to_generate.aggregate_data.push_back(&data);
+ }
+ }
+ if (!agg_to_generate.aggregate_data.empty()) {
+ data_to_generate_for_system_profile.emplace_back(std::move(agg_to_generate));
}
}
- if (!agg_to_generate.aggregate_data.empty()) {
- data_to_generate_for_system_profile.emplace_back(std::move(agg_to_generate));
+ if (!data_to_generate_for_system_profile.empty()) {
+ data_to_generate[system_profile_hash] = std::move(data_to_generate_for_system_profile);
}
- }
- if (!data_to_generate_for_system_profile.empty()) {
- data_to_generate[system_profile_hash] = std::move(data_to_generate_for_system_profile);
+ } else if (system_profile_selection_policy_ == REPORT_ALL) {
+ for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
+ if (!aggregate->daily().by_day_index().contains(i)) {
+ continue;
+ }
+ AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+ for (SystemProfileAggregate &system_profile_aggregate :
+ *agg->mutable_system_profile_aggregates()) {
+ AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
+ for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
+ agg_to_generate.aggregate_data.push_back(&data);
+ }
+ data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
+ std::move(agg_to_generate));
+ }
+ }
}
} else {
if (aggregate->hourly().by_hour_id().contains(start_time_info.hour_id)) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
index b59f028..7b53ab9 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
@@ -141,6 +141,61 @@
}
}
+TEST_F(AggregationProcedureTest, UpdateHourlyAggregateReportAllSystemProfile) {
+ std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportAllReportIndex);
+
+ uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
+ ASSERT_GT(event_vector_buffer_max, 0u);
+
+ uint32_t kHourId = 10000;
+ ReportAggregate aggregate;
+
+ int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
+ uint64_t first_system_profile_hash = uint64_t{213};
+ uint64_t num_events = event_vector_buffer_max + 1;
+ AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
+ count_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(first_event_time));
+
+ int64_t second_event_time = first_event_time + 1000;
+ uint64_t second_system_profile_hash = uint64_t{324};
+ AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
+ count_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(second_event_time));
+
+ ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
+ ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 2u);
+
+ {
+ // Check aggregate 1 (first_system_profile_hash)
+ const SystemProfileAggregate& system_profile_agg =
+ aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(0);
+ EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
+ EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
+ EXPECT_EQ(system_profile_agg.last_seen_timestamp(), first_event_time);
+ ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
+ for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+ EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
+ EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
+ }
+ }
+
+ {
+ // Check aggregate 2 (second_system_profile_hash)
+ const SystemProfileAggregate& system_profile_agg =
+ aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates(1);
+ EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
+ EXPECT_EQ(system_profile_agg.first_seen_timestamp(), second_event_time);
+ EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
+ ASSERT_EQ(system_profile_agg.by_event_code_size(), event_vector_buffer_max);
+ for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+ EXPECT_EQ(system_profile_agg.by_event_code(i).event_codes(0), i + 1);
+ EXPECT_EQ(system_profile_agg.by_event_code(i).data().count(), i + 1);
+ }
+ }
+}
+
TEST_F(AggregationProcedureTest, UpdateDailyAggregate) {
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
@@ -242,10 +297,62 @@
EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
}
+TEST_F(AggregationProcedureTest, UpdateDailyAggregateReportAll) {
+ std::unique_ptr<AggregationProcedure> select_first_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
+
+ uint64_t event_vector_buffer_max = GetEventVectorBufferMax();
+ ASSERT_GT(event_vector_buffer_max, 0u);
+
+ const uint32_t kDayIndex = 10000;
+ ReportAggregate aggregate;
+ uint32_t kNumEventCodes = 2;
+
+ int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
+ uint64_t first_system_profile_hash = uint64_t{213};
+ // Add events for 2 different event vectors: {1} and {2}.
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
+ select_first_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(first_event_time));
+
+ int64_t second_event_time = first_event_time + 7200;
+ uint64_t second_system_profile_hash = uint64_t{324};
+ AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
+ select_first_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(second_event_time));
+
+ ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
+ ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 2u);
+
+ {
+ // Check aggregate 1 (first_system_profile_hash)
+ const SystemProfileAggregate& system_profile_agg =
+ aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(0);
+ EXPECT_EQ(system_profile_agg.system_profile_hash(), first_system_profile_hash);
+ EXPECT_EQ(system_profile_agg.first_seen_timestamp(), first_event_time);
+ EXPECT_EQ(system_profile_agg.last_seen_timestamp(), first_event_time);
+ ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
+ ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
+ EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
+ }
+
+ {
+ // Check aggregate 2 (second_system_profile_hash)
+ const SystemProfileAggregate& system_profile_agg =
+ aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates(1);
+ EXPECT_EQ(system_profile_agg.system_profile_hash(), second_system_profile_hash);
+ EXPECT_EQ(system_profile_agg.first_seen_timestamp(), second_event_time);
+ EXPECT_EQ(system_profile_agg.last_seen_timestamp(), second_event_time);
+ ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
+ ASSERT_EQ(system_profile_agg.by_event_code(0).event_codes(0), 1);
+ EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
+ }
+}
+
TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectLast) {
const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
ReportDefinition report =
- metric.reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+ metric.reports(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
ASSERT_GE(event_vector_buffer_max, 0u);
@@ -253,34 +360,23 @@
const uint32_t kDayIndex = 10000;
ReportAggregate aggregate;
- // Manually create some events for the report as they would be with a REPORT_ALL policy.
- // TODO(fxbug.dev/87403): use AddOccurrenceEventsForDay once REPORT_ALL is supported.
+ std::unique_ptr<AggregationProcedure> report_all_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportIndex);
int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
uint64_t first_system_profile_hash = uint64_t{213};
- AggregationPeriodBucket& bucket = (*aggregate.mutable_daily()->mutable_by_day_index())[kDayIndex];
- SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
- system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
- system_profile_aggregate->set_first_seen_timestamp(first_event_time);
- system_profile_aggregate->set_last_seen_timestamp(first_event_time);
- for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
- EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
- data->add_event_codes(i);
- data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
- }
+ AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, first_system_profile_hash,
+ report_all_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(first_event_time));
+ AggregationPeriodBucket& bucket =
+ aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
+ ASSERT_EQ(bucket.system_profile_aggregates(0).by_event_code_size(), event_vector_buffer_max);
int64_t second_event_time = first_event_time + 7200;
uint64_t second_system_profile_hash = uint64_t{324};
- system_profile_aggregate = bucket.add_system_profile_aggregates();
- system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
- system_profile_aggregate->set_first_seen_timestamp(second_event_time);
- system_profile_aggregate->set_last_seen_timestamp(second_event_time);
- // Add more event_codes than can be supported in a single aggregate.
- for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
- EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
- data->add_event_codes(i);
- data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
- }
+ AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, second_system_profile_hash,
+ report_all_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(second_event_time));
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
@@ -314,34 +410,25 @@
uint32_t kHourId = 10000;
ReportAggregate aggregate;
- // Manually create some events for the report as they would be with a REPORT_ALL policy.
- // TODO(fxbug.dev/87403): use AddOccurrenceEventsForHour once REPORT_ALL is supported.
+ std::unique_ptr<AggregationProcedure> report_all_aggregation_procedure = GetProcedureFor(
+ kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportAllReportIndex);
int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
uint64_t first_system_profile_hash = uint64_t{213};
- AggregationPeriodBucket& bucket = (*aggregate.mutable_hourly()->mutable_by_hour_id())[kHourId];
- SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
- system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
- system_profile_aggregate->set_first_seen_timestamp(first_event_time);
- system_profile_aggregate->set_last_seen_timestamp(first_event_time);
- for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
- EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
- data->add_event_codes(i);
- data->mutable_data()->set_count(i);
- }
+ AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, first_system_profile_hash,
+ report_all_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(first_event_time));
+ AggregationPeriodBucket& bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 1u);
+ ASSERT_EQ(bucket.system_profile_aggregates(0).by_event_code_size(), event_vector_buffer_max);
int64_t second_event_time = first_event_time + 1000;
uint64_t second_system_profile_hash = uint64_t{324};
- system_profile_aggregate = bucket.add_system_profile_aggregates();
- system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
- system_profile_aggregate->set_first_seen_timestamp(second_event_time);
- system_profile_aggregate->set_last_seen_timestamp(second_event_time);
- // Add more event_codes than can be supported in a single aggregate.
- for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
- EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
- data->add_event_codes(i);
- data->mutable_data()->set_count(i);
- }
+ AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, second_system_profile_hash,
+ report_all_aggregation_procedure.get(), &aggregate,
+ util::FromUnixSeconds(second_event_time));
+ ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
+ ASSERT_EQ(bucket.system_profile_aggregates(1).by_event_code_size(), event_vector_buffer_max);
// Merge the second system profile aggregates into the first one.
std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
@@ -361,8 +448,7 @@
for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
// Middle event code is double due to merge from both system profiles.
- EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(),
- i + 1 == event_vector_buffer_max / 2 ? 2 * (i + 1) : i + 1);
+ EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(), 2 * (i + 1));
}
}
diff --git a/src/local_aggregation_1_1/local_aggregation_test.cc b/src/local_aggregation_1_1/local_aggregation_test.cc
index e07952f..f19f95e 100644
--- a/src/local_aggregation_1_1/local_aggregation_test.cc
+++ b/src/local_aggregation_1_1/local_aggregation_test.cc
@@ -14,6 +14,7 @@
#include "src/lib/util/clock.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/testing/report_all_test_registry_default_system_profile_selection.cb.h"
+#include "src/local_aggregation_1_1/testing/report_all_test_registry_with_report_all_set.cb.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/event_record.h"
#include "src/logger/logger_test_utils.h"
@@ -238,14 +239,18 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 2 observations created, for fleetwide_occurrence_counts_report and hourly_device_histograms.
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
+ // hourly_device_histograms_report_all.
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {4});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{4});
+ VerifyStoredIntegerObservation(
+ 2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
+ "100", {4});
// An hour later at 9:10PM, no more events, next aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
@@ -253,10 +258,10 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // Still only the original 2 observations created.
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ // Still only the original 3 observations created.
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
- // 45 minutes later, 2 more events occur, with a new system version.
+ // 45 minutes later, 3 more events occur, with a new system version.
mock_clock_->increment_by(std::chrono::minutes(45));
now = std::chrono::system_clock::to_time_t(mock_clock_->now());
record->event()->set_day_index(day_index);
@@ -272,14 +277,17 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 2 new observations created, this time with an incremental count of 2.
- ASSERT_EQ(test_writer_->num_observations_added(), 4);
+ // 3 new observations created, this time with an incremental count of 3.
+ ASSERT_EQ(test_writer_->num_observations_added(), 6);
VerifyStoredIntegerObservation(
- 2, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
+ 3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"101", {2});
VerifyStoredIntegerObservation(
- 3, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "101",
+ 4, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "101",
{2});
+ VerifyStoredIntegerObservation(
+ 5, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
+ "101", {2});
// An hour later at 11:10PM, no more events, next aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
@@ -287,8 +295,8 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // Still only the expected 4 observations created.
- ASSERT_EQ(test_writer_->num_observations_added(), 4);
+ // Still only the expected 6 observations created.
+ ASSERT_EQ(test_writer_->num_observations_added(), 6);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
mock_clock_->increment_by(std::chrono::minutes(60));
@@ -296,26 +304,45 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 6 new observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 10);
+ // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
+ // reports, two for each of the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 16);
VerifyStoredIntegerObservation(
- 4, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
+ 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"101", {1});
VerifyStoredIntegerObservation(
- 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
+ 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"101", {1});
// SELECT_FIRST reports use the first system version, others use the second.
VerifyStoredIntegerObservation(
- 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
+ 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
+ 9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
+
+ // This report is REPORT_ALL and generates 2 observations
VerifyStoredIntegerObservation(
- 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
+ 10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "101", {1});
VerifyStoredIntegerObservation(
- 9,
+ 11, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
+ day_index, "100", {1});
+
+ // This report is REPORT_ALL and generates 2 observations
+ VerifyStoredIntegerObservation(
+ 12, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "101", {1});
+ VerifyStoredIntegerObservation(
+ 13, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "100", {1});
+
+ VerifyStoredIntegerObservation(
+ 14,
+ metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
+ day_index, "101", {1});
+ VerifyStoredIntegerObservation(
+ 15,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "101", {1});
@@ -335,14 +362,17 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 2 new observations created, this time with an incremental count of 1.
- ASSERT_EQ(test_writer_->num_observations_added(), 12);
+ // 3 new observations created, this time with an incremental count of 1.
+ ASSERT_EQ(test_writer_->num_observations_added(), 19);
VerifyStoredIntegerObservation(
- 10, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
+ 16, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"102", {1});
VerifyStoredIntegerObservation(
- 11, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "102",
+ 17, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "102",
{1});
+ VerifyStoredIntegerObservation(
+ 18, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
+ "102", {1});
// The rest of the day, no more events occur so 22 hourly aggregation runs create no new
// observations.
@@ -351,7 +381,7 @@
aggregation->GenerateAggregatedObservations(
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- ASSERT_EQ(test_writer_->num_observations_added(), 12);
+ ASSERT_EQ(test_writer_->num_observations_added(), 19);
}
// At 12:10AM (just after midnight UTC), daily event aggregation run happens.
@@ -360,28 +390,45 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 6 new observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 18);
+ // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
+ // reports, 4 for each of the 7-day REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 29);
VerifyStoredIntegerObservation(
- 12, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
+ 19, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"102", {1});
VerifyStoredIntegerObservation(
- 13, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
+ 20, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"102", {1});
// SELECT_FIRST 1-day report uses the new system version.
VerifyStoredIntegerObservation(
- 14, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
+ 21, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "102", {1});
// SELECT_FIRST 7-day report continues to use the original system version.
VerifyStoredIntegerObservation(
- 15, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
+ 22, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 16,
+ 23, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
+ day_index, "102", {1});
+
+ // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
+ // is deterministic based on the hash value)
+ VerifyStoredIntegerObservation(
+ 24, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "101", {1});
+ VerifyStoredIntegerObservation(
+ 25, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "100", {1});
+ VerifyStoredIntegerObservation(
+ 26, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "102", {1});
+
+ VerifyStoredIntegerObservation(
+ 27,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
day_index, "102", {1});
VerifyStoredIntegerObservation(
- 17,
+ 28,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "102", {1});
@@ -391,7 +438,7 @@
aggregation->GenerateAggregatedObservations(
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- ASSERT_EQ(test_writer_->num_observations_added(), 18);
+ ASSERT_EQ(test_writer_->num_observations_added(), 29);
}
// Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
@@ -400,19 +447,32 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
- // 3 new observations are created for the next day, only for the 7-day window
- // UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 21);
+ // 6 new observations are created for the next day, only for the 7-day window
+ // UNIQUE_DEVICE_COUNTS reports, (3 for the REPORT_ALL 7-day report).
+ ASSERT_EQ(test_writer_->num_observations_added(), 35);
day_index += 1;
VerifyStoredIntegerObservation(
- 18, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
+ 29, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"102", {1});
// SELECT_FIRST 7-day report continues to use the original system version.
VerifyStoredIntegerObservation(
- 19, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
+ 30, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
+ day_index, "100", {1});
+
+ // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
+ // is deterministic based on the hash value)
+ VerifyStoredIntegerObservation(
+ 31, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "101", {1});
+ VerifyStoredIntegerObservation(
+ 32, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 20,
+ 33, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "102", {1});
+
+ VerifyStoredIntegerObservation(
+ 34,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "102", {1});
}
@@ -471,14 +531,18 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
ASSERT_EQ(status.error_code(), StatusCode::OK);
- // 2 observations created, for fleetwide_occurrence_counts_report and hourly_device_histograms.
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
+ // hourly_device_histograms_report_all.
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {4});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{4});
+ VerifyStoredIntegerObservation(
+ 2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
+ "100", {4});
// 45 minutes later, 2 more events occur.
mock_clock_->increment_by(std::chrono::minutes(45));
@@ -497,7 +561,7 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
ASSERT_NE(status.error_code(), StatusCode::OK);
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
// 30 minutes later, 1 more event occurs.
mock_clock_->increment_by(std::chrono::minutes(30));
@@ -513,7 +577,7 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
ASSERT_NE(status.error_code(), StatusCode::OK);
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens and fails.
mock_clock_->increment_by(std::chrono::minutes(60));
@@ -521,7 +585,7 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
ASSERT_NE(status.error_code(), StatusCode::OK);
- ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ ASSERT_EQ(test_writer_->num_observations_added(), 3);
// An hour later at 1:10AM (after midnight UTC), now aggregation succeeds, both daily and hourly.
test_writer_->SetFailCalls(false);
@@ -531,40 +595,47 @@
util::TimeInfo::FromTimePointPrevious(mock_clock_->now(), MetricDefinition::LOCAL));
ASSERT_EQ(status.error_code(), StatusCode::OK);
- // 8 total new observations created.
- // 4 new hourly observations created, 2 with an incremental count of 2, 2 with count of 1.
- // 6 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 12);
- VerifyStoredIntegerObservation(
- 2, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
- "100", {2});
+ // 15 total new observations created.
+ // 6 new hourly observations created, 3 with an incremental count of 2, 3 with count of 1.
+ // 9 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 17);
VerifyStoredIntegerObservation(
3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
+ "100", {2});
+ VerifyStoredIntegerObservation(
+ 4, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
- 4, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
+ 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
- 5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
+ 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
"100", {1});
VerifyStoredIntegerObservation(
- 6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
+ 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
+ 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
+ 9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 9,
+ 10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
+ day_index, "100", {1});
+ VerifyStoredIntegerObservation(
+ 11,
+ metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
+ day_index, "100", {1});
+ VerifyStoredIntegerObservation(
+ 12,
metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
day_index, "100", {1});
VerifyStoredIntegerObservation(
- 10, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
+ 13, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{2});
VerifyStoredIntegerObservation(
- 11, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
+ 14, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
{1});
}
@@ -592,7 +663,7 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
ASSERT_EQ(test_writer_->num_observations_added(), 0);
- // 10 minutes later, 4 events occur with event code 45.
+ // 10 minutes later, 4 events occur with event code 45, with two different system_versions.
system_time += std::chrono::minutes(10);
std::unique_ptr<logger::EventRecord> record =
logger::EventRecord::MakeEventRecord(project_context, kExpeditedOccurrenceMetricMetricId)
@@ -605,11 +676,12 @@
record->event()->set_day_index(day_index);
record->event()->set_hour_id(
cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+ record->system_profile()->set_system_version("99");
+ ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
+ ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
record->system_profile()->set_system_version("100");
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
- ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
- ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
// 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
system_time += std::chrono::minutes(50);
@@ -617,20 +689,33 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- // 4 observations created, for the expedited UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 4);
+ // 8 observations created, one for each expedited non-REPORT_ALL UNIQUE_DEVICE_COUNTS reports, two
+ // for the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 8);
VerifyStoredIntegerObservation(
0, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
1, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
2, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst1DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
VerifyStoredIntegerObservation(
3, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 4, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "99", {1},
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 5, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 6, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", {1},
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 7, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
// An hour later at 9:10PM, no more events, next aggregation run happens.
system_time += std::chrono::minutes(60);
@@ -639,7 +724,7 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
// Still only the original observations created, expedited observations are not re-sent.
- ASSERT_EQ(test_writer_->num_observations_added(), 4);
+ ASSERT_EQ(test_writer_->num_observations_added(), 8);
// 45 minutes later, 2 more events occur, one with a different event code 46.
system_time += std::chrono::minutes(45);
@@ -660,16 +745,22 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- // 2 new observation for the AT_LEAST_ONCE reports, with the new event code of 46.
+ // 4 new observation for the AT_LEAST_ONCE reports, with the new event code of 46.
// Note that the expedited SELECT_FIRST report was already sent, so doesn't get new observations.
// Also, no new observations for event code 45 are created, as they were already sent.
- ASSERT_EQ(test_writer_->num_observations_added(), 6);
+ ASSERT_EQ(test_writer_->num_observations_added(), 12);
VerifyStoredIntegerObservation(
- 4, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}});
+ 8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
- 5, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}});
+ 9, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
+ VerifyStoredIntegerObservation(
+ 10, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
+ VerifyStoredIntegerObservation(
+ 11, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// An hour later at 11:10PM, no more events, next aggregation run happens.
system_time += std::chrono::minutes(60);
@@ -678,7 +769,7 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
// Still only the expected original observation created.
- ASSERT_EQ(test_writer_->num_observations_added(), 6);
+ ASSERT_EQ(test_writer_->num_observations_added(), 12);
// An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
system_time += std::chrono::minutes(60);
@@ -687,18 +778,27 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
// Note that the daily expedited reports were already sent, so they don't get new observations.
- // But 2 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE
- // report containing the codes 45 and 46, and a SELECT_FIRST report for the code 45.
- ASSERT_EQ(test_writer_->num_observations_added(), 8);
+ // But 4 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE
+ // report containing the codes 45 and 46, a SELECT_FIRST report for the code 45, and two
+ // REPORT_ALL reports for the two system_versions reported earlier.
+ ASSERT_EQ(test_writer_->num_observations_added(), 16);
day_index += 1;
VerifyStoredIntegerObservation(
- 6, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
+ 12, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1, 1},
- {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45},
- {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}});
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
- 7, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ 13, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 14, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 15, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
+ {1, 1},
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// The next day, no events occur so all 23 hourly aggregation runs create no new observations.
for (int i = 1; i <= 23; i++) {
@@ -706,7 +806,7 @@
aggregation->GenerateAggregatedObservations(
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- ASSERT_EQ(test_writer_->num_observations_added(), 8);
+ ASSERT_EQ(test_writer_->num_observations_added(), 16);
}
// Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
@@ -715,17 +815,25 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- // 2 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 10);
+ // 4 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 20);
day_index += 1;
VerifyStoredIntegerObservation(
- 8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
+ 16, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
{1, 1},
- {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45},
- {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}});
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
- 9, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
- {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ 17, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 18, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
+ {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 19, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
+ {1, 1},
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
// For the next 4 days (3 days of data have been sent above) no more events occur, and the 7-day
// reports continue to be sent at the start of the day.
@@ -736,7 +844,7 @@
aggregation->GenerateAggregatedObservations(
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- ASSERT_EQ(test_writer_->num_observations_added(), 10 + 2 * j);
+ ASSERT_EQ(test_writer_->num_observations_added(), 20 + 4 * j);
}
// The 12:10AM (just after midnight UTC) daily event aggregation run.
@@ -745,17 +853,25 @@
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- // 2 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports.
- ASSERT_EQ(test_writer_->num_observations_added(), 12 + 2 * j);
+ // 3 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports.
+ ASSERT_EQ(test_writer_->num_observations_added(), 24 + 4 * j);
day_index += 1;
VerifyStoredIntegerObservation(
- 10 + 2 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId),
+ 20 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId),
day_index, "100", {1, 1},
- {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45},
- {ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46}});
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
VerifyStoredIntegerObservation(
- 11 + 2 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId),
- day_index, "100", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45}});
+ 21 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId),
+ day_index, "100", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 22 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
+ "99", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
+ VerifyStoredIntegerObservation(
+ 23 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
+ "100", {1, 1},
+ {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
+ {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
}
// Finally, on the 8th day, and all subsequent days, no observations are created at all.
@@ -764,7 +880,7 @@
aggregation->GenerateAggregatedObservations(
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::UTC),
util::TimeInfo::FromTimePointPrevious(system_time, MetricDefinition::LOCAL));
- ASSERT_EQ(test_writer_->num_observations_added(), 18);
+ ASSERT_EQ(test_writer_->num_observations_added(), 36);
}
}
@@ -827,4 +943,189 @@
"There is no space in slush");
}
+TEST_F(LocalAggregationTest, MigrateToReportAll) {
+ OverrideRegistry(report_all_test::default_system_profile_selection::kRegistryBase64);
+ std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
+
+ std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
+ uint32_t day_index;
+
+ {
+ auto record =
+ logger::EventRecord::MakeEventRecord(
+ project_context,
+ report_all_test::default_system_profile_selection::kOccurrenceMetricReportAllMetricId)
+ .ValueOrDie();
+ record->event()->mutable_occurrence_event()->set_count(1);
+ time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now());
+ day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy());
+ record->event()->set_day_index(day_index);
+ record->event()->set_hour_id(
+ cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+
+ record->system_profile()->set_system_version("100");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ record->system_profile()->set_system_version("101");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ }
+
+ OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
+ project_context = GetProjectContext();
+ aggregation = MakeLocalAggregation();
+ {
+ auto record = logger::EventRecord::MakeEventRecord(
+ project_context,
+ report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
+ .ValueOrDie();
+ record->event()->mutable_occurrence_event()->set_count(1);
+ time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now());
+ day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy());
+ record->event()->set_day_index(day_index);
+ record->event()->set_hour_id(
+ cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+
+ record->system_profile()->set_system_version("100");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ record->system_profile()->set_system_version("101");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ }
+
+ aggregation->GenerateAggregatedObservations(
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::UTC),
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::LOCAL));
+
+ ASSERT_EQ(test_writer_->num_observations_added(), 2);
+ EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
+
+ lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
+ report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId);
+
+ // The first two metrics will be logged to the non partitioned data portion. When the
+ // migration happens, those two will be moved to whatever value the SystemProfile currently
+ // holds. Thus 2 ungrouped + 1 grouped = 3
+ VerifyStoredIntegerObservation(
+ 0,
+ metric_id.ForReport(report_all_test::with_report_all_set::
+ kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
+ day_index, "101", {3});
+ VerifyStoredIntegerObservation(
+ 1,
+ metric_id.ForReport(report_all_test::with_report_all_set::
+ kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
+ day_index, "100", {1});
+}
+
+TEST_F(LocalAggregationTest, MigrateFromReportAll) {
+ OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
+ std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
+
+ std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
+ uint32_t day_index;
+
+ {
+ auto record = logger::EventRecord::MakeEventRecord(
+ project_context,
+ report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
+ .ValueOrDie();
+ record->event()->mutable_occurrence_event()->set_count(1);
+ time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now());
+ day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy());
+ record->event()->set_day_index(day_index);
+ record->event()->set_hour_id(
+ cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+
+ record->system_profile()->set_system_version("100");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ record->system_profile()->set_system_version("101");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ }
+
+ OverrideRegistry(report_all_test::default_system_profile_selection::kRegistryBase64);
+ project_context = GetProjectContext();
+ aggregation = MakeLocalAggregation();
+ {
+ auto record =
+ logger::EventRecord::MakeEventRecord(
+ project_context,
+ report_all_test::default_system_profile_selection::kOccurrenceMetricReportAllMetricId)
+ .ValueOrDie();
+ record->event()->mutable_occurrence_event()->set_count(1);
+ time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now());
+ day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy());
+ record->event()->set_day_index(day_index);
+ record->event()->set_hour_id(
+ cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+
+ record->system_profile()->set_system_version("100");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ record->system_profile()->set_system_version("101");
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ }
+
+ aggregation->GenerateAggregatedObservations(
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::UTC),
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::LOCAL));
+
+ ASSERT_EQ(test_writer_->num_observations_added(), 1);
+ EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
+
+ lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
+ report_all_test::default_system_profile_selection::kOccurrenceMetricReportAllMetricId);
+
+ // All 4 metrics should be grouped back together under system_version 101 (the most recently
+ // seen).
+ VerifyStoredIntegerObservation(
+ 0,
+ metric_id.ForReport(report_all_test::default_system_profile_selection::
+ kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
+ day_index, "101", {4});
+}
+
+TEST_F(LocalAggregationTest, ReportAllWorks) {
+ const int32_t kNumPartitions = 15;
+ OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
+ std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
+
+ std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
+
+ auto record =
+ logger::EventRecord::MakeEventRecord(
+ project_context, report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
+ .ValueOrDie();
+ record->event()->mutable_occurrence_event()->set_count(1);
+ time_t now = std::chrono::system_clock::to_time_t(mock_clock_->now());
+ uint32_t day_index = cobalt::util::TimeToDayIndex(now, record->metric()->time_zone_policy());
+ record->event()->set_day_index(day_index);
+ record->event()->set_hour_id(
+ cobalt::util::TimeToHourId(now, record->metric()->time_zone_policy()));
+ for (int32_t i = 1; i <= kNumPartitions; i++) {
+ for (int32_t j = i; j <= kNumPartitions; j++) {
+ record->system_profile()->set_system_version(absl::StrCat("Version ", j));
+ ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
+ }
+ }
+
+ aggregation->GenerateAggregatedObservations(
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::UTC),
+ util::TimeInfo::FromTimePoint(mock_clock_->now(), MetricDefinition::LOCAL));
+
+ ASSERT_EQ(test_writer_->num_observations_added(), kNumPartitions);
+ ASSERT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
+ size_t i = 0;
+ for (const std::unique_ptr<observation_store::StoredObservation>& obs :
+ test_writer_->messages_received) {
+ ASSERT_TRUE(obs->has_unencrypted());
+
+ Observation observation = obs->unencrypted();
+ ASSERT_TRUE(observation.has_integer());
+ ASSERT_GT(observation.integer().values_size(), 0);
+
+ std::string system_version =
+ test_writer_->metadata_received[i]->system_profile().system_version();
+ int system_version_number = std::stoi(system_version.substr(system_version.find(' ')));
+ ASSERT_EQ(observation.integer().values(0).value(), system_version_number);
+ ++i;
+ }
+}
+
} // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation_1_1/observation_generator_test.cc b/src/local_aggregation_1_1/observation_generator_test.cc
index c667b81..88bd49e 100644
--- a/src/local_aggregation_1_1/observation_generator_test.cc
+++ b/src/local_aggregation_1_1/observation_generator_test.cc
@@ -454,6 +454,71 @@
}
}
+TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsWithReportAllAsExpected) {
+ SetRegistry(
+ testing::MutateProject(GetRegistry(), kCustomerId, kProjectId, [](ProjectConfig* project) {
+ project->mutable_metrics(kOccurrenceMetricMetricIndex)
+ ->mutable_reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex)
+ ->set_system_profile_selection(SystemProfileSelectionPolicy::REPORT_ALL);
+ }));
+ const uint32_t kMaxDayIndex = 5;
+ SystemProfile system_profile;
+ system_profile.set_os(SystemProfile::FUCHSIA);
+ system_profile.set_system_version("100");
+ uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
+
+ {
+ MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
+ ReportAggregate* report =
+ &(*aggregate.aggregate()
+ ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
+ for (uint32_t i = 1; i <= kMaxDayIndex; i += 1) {
+ SystemProfileAggregate* system_profile_agg =
+ (*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
+ system_profile_agg->set_system_profile_hash(system_profile_hash);
+ system_profile_agg->add_by_event_code()
+ ->mutable_data()
+ ->mutable_at_least_once()
+ ->set_at_least_once(true);
+ }
+ aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
+ ASSERT_TRUE(aggregate.Save().ok());
+ }
+
+ std::unique_ptr<ObservationMetadata> last_metadata;
+ std::unique_ptr<Observation> last_observation;
+ std::string last_contribution_id;
+ TestObservationStoreWriter test_writer(
+ [&last_metadata, &last_observation, &last_contribution_id](
+ std::unique_ptr<observation_store::StoredObservation> observation,
+ std::unique_ptr<ObservationMetadata> metadata) {
+ if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
+ last_metadata = std::move(metadata);
+ if (observation->has_unencrypted()) {
+ last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
+ last_contribution_id = observation->contribution_id();
+ }
+ }
+ });
+
+ logger::ObservationWriter observation_writer(&test_writer, nullptr);
+ ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
+
+ for (uint32_t i = 1; i <= kMaxDayIndex; i += 1) {
+ GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i));
+
+ EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
+ ASSERT_TRUE(last_metadata);
+ EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
+ EXPECT_EQ(last_metadata->day_index(), i);
+ ASSERT_TRUE(last_observation);
+ EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
+ ASSERT_TRUE(last_observation->has_integer());
+ ASSERT_EQ(last_observation->integer().values_size(), 1);
+ EXPECT_EQ(last_observation->integer().values(0).value(), 1);
+ }
+}
+
TEST_F(ObservationGeneratorTest, GeneratesPrivateObservations) {
uint32_t kMaxHourId = 101;
int kNumPrivateObs = 2;
diff --git a/src/local_aggregation_1_1/testing/BUILD.gn b/src/local_aggregation_1_1/testing/BUILD.gn
index 2a01f62..ae6e408 100644
--- a/src/local_aggregation_1_1/testing/BUILD.gn
+++ b/src/local_aggregation_1_1/testing/BUILD.gn
@@ -32,3 +32,17 @@
generate_cc = true
}
+
+metrics_registry("report_all_test_registry_with_report_all_set") {
+ customer_id = 123
+ project_id = 100
+ features = [
+ "generate-config-base64",
+ "testing",
+ ]
+ source = "report_all_test_registry/with_report_all_set.yaml"
+ namespace = "cobalt.report_all_test.with_report_all_set"
+ var_name = "registry_base64"
+
+ generate_cc = true
+}
diff --git a/src/local_aggregation_1_1/testing/report_all_test_registry/default_system_profile_selection.yaml b/src/local_aggregation_1_1/testing/report_all_test_registry/default_system_profile_selection.yaml
index 7d9a41b..ee56ec0 100644
--- a/src/local_aggregation_1_1/testing/report_all_test_registry/default_system_profile_selection.yaml
+++ b/src/local_aggregation_1_1/testing/report_all_test_registry/default_system_profile_selection.yaml
@@ -13,4 +13,4 @@
local_aggregation_period: WINDOW_1_DAY
privacy_level: NO_ADDED_PRIVACY
event_vector_buffer_max: 100
- system_profile_field: [CHANNEL]
+ system_profile_field: [SYSTEM_VERSION]
diff --git a/src/local_aggregation_1_1/testing/report_all_test_registry/with_report_all_set.yaml b/src/local_aggregation_1_1/testing/report_all_test_registry/with_report_all_set.yaml
index faed8ee..0953a28 100644
--- a/src/local_aggregation_1_1/testing/report_all_test_registry/with_report_all_set.yaml
+++ b/src/local_aggregation_1_1/testing/report_all_test_registry/with_report_all_set.yaml
@@ -14,4 +14,4 @@
system_profile_selection: REPORT_ALL
privacy_level: NO_ADDED_PRIVACY
event_vector_buffer_max: 100
- system_profile_field: [CHANNEL]
+ system_profile_field: [SYSTEM_VERSION]
diff --git a/src/local_aggregation_1_1/testing/test_registry/CustomerA/ProjectA1/metrics.yaml b/src/local_aggregation_1_1/testing/test_registry/CustomerA/ProjectA1/metrics.yaml
index f756e56..71b9236 100644
--- a/src/local_aggregation_1_1/testing/test_registry/CustomerA/ProjectA1/metrics.yaml
+++ b/src/local_aggregation_1_1/testing/test_registry/CustomerA/ProjectA1/metrics.yaml
@@ -49,6 +49,24 @@
system_profile_field: [OS, SYSTEM_VERSION]
system_profile_selection: SELECT_FIRST
- id: 6
+ report_name: "unique_device_counts_report_all_report_1_day"
+ report_type: UNIQUE_DEVICE_COUNTS
+ local_aggregation_procedure: AT_LEAST_ONCE
+ local_aggregation_period: WINDOW_1_DAY
+ privacy_level: NO_ADDED_PRIVACY
+ event_vector_buffer_max: 100
+ system_profile_field: [OS, SYSTEM_VERSION, EXPERIMENT_TOKENS]
+ system_profile_selection: REPORT_ALL
+ - id: 7
+ report_name: "unique_device_counts_report_all_report_7_days"
+ report_type: UNIQUE_DEVICE_COUNTS
+ local_aggregation_procedure: AT_LEAST_ONCE
+ local_aggregation_period: WINDOW_7_DAYS
+ privacy_level: NO_ADDED_PRIVACY
+ event_vector_buffer_max: 100
+ system_profile_field: [OS, SYSTEM_VERSION]
+ system_profile_selection: REPORT_ALL
+ - id: 8
report_name: "unique_device_counts_select_most_common_report_1_day"
report_type: UNIQUE_DEVICE_COUNTS
local_aggregation_procedure: SELECT_MOST_COMMON
@@ -57,7 +75,7 @@
event_vector_buffer_max: 100
system_profile_field: [OS, SYSTEM_VERSION]
system_profile_selection: SELECT_LAST
- - id: 7
+ - id: 9
report_name: "unique_device_counts_select_most_common_report_7_days"
report_type: UNIQUE_DEVICE_COUNTS
local_aggregation_procedure: SELECT_MOST_COMMON
@@ -66,7 +84,7 @@
event_vector_buffer_max: 100
system_profile_field: [OS, SYSTEM_VERSION]
system_profile_selection: SELECT_LAST
- - id: 8
+ - id: 10
report_name: "hourly_device_histograms"
report_type: HOURLY_VALUE_HISTOGRAMS
privacy_level: NO_ADDED_PRIVACY
@@ -78,6 +96,18 @@
event_vector_buffer_max: 100
system_profile_field: [OS, SYSTEM_VERSION]
system_profile_selection: SELECT_FIRST
+ - id: 11
+ report_name: "hourly_device_histograms_report_all"
+ report_type: HOURLY_VALUE_HISTOGRAMS
+ privacy_level: NO_ADDED_PRIVACY
+ int_buckets:
+ linear:
+ floor: 1
+ num_buckets: 100
+ step_size: 1
+ event_vector_buffer_max: 100
+ system_profile_field: [OS, SYSTEM_VERSION]
+ system_profile_selection: REPORT_ALL
- id: 2
metric_name: "integer_metric"
metric_type: INTEGER
@@ -301,3 +331,23 @@
expedited_sending: true
privacy_level: NO_ADDED_PRIVACY
system_profile_field: [OS, SYSTEM_VERSION]
+ - id: 5
+ report_name: "report_all_1_day"
+ report_type: UNIQUE_DEVICE_COUNTS
+ local_aggregation_procedure: AT_LEAST_ONCE
+ local_aggregation_period: WINDOW_1_DAY
+ expedited_sending: true
+ privacy_level: NO_ADDED_PRIVACY
+ event_vector_buffer_max: 100
+ system_profile_field: [OS, SYSTEM_VERSION]
+ system_profile_selection: REPORT_ALL
+ - id: 6
+ report_name: "report_all_7_day"
+ report_type: UNIQUE_DEVICE_COUNTS
+ local_aggregation_procedure: AT_LEAST_ONCE
+ local_aggregation_period: WINDOW_7_DAYS
+ expedited_sending: true
+ privacy_level: NO_ADDED_PRIVACY
+ event_vector_buffer_max: 100
+ system_profile_field: [OS, SYSTEM_VERSION]
+ system_profile_selection: REPORT_ALL