[EventAggregator] Generate PerDeviceCountObservations
The EventAggregator's GenerateObservations() method now
generates observations for PER_DEVICE_COUNT_STATS reports.
Change-Id: I158ccdb81e015886457e61aa19808274520d0c23
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index 706ef72..26ce7e9 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -5,8 +5,10 @@
#include "logger/event_aggregator.h"
#include <algorithm>
+#include <map>
#include <string>
#include <utility>
+#include <vector>
#include "algorithms/rappor/rappor_config_helper.h"
#include "config/metric_definition.pb.h"
@@ -427,6 +429,12 @@
if (final_day_index_local == 0u) {
final_day_index_local = final_day_index_utc;
}
+ if (std::min(final_day_index_utc, final_day_index_local) <
+ backfill_days_ + kMaxAllowedAggregationWindowSize) {
+ LOG(ERROR) << "GenerateObservations: Day index of Observation must be >= "
+ "backfill_days_ + kMaxAllowedAggregationWindowSize.";
+ return kInvalidArguments;
+ }
// Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
// generate observations.
auto local_aggregate_store = CopyLocalAggregateStore();
@@ -484,10 +492,27 @@
if (status != kOK) {
return status;
}
+ break;
}
default:
continue;
}
+ break;
+ }
+ case MetricDefinition::EVENT_COUNT: {
+ switch (report.report_type()) {
+ case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+ auto status = GeneratePerDeviceCountObservations(
+ metric_ref, pair.first, pair.second, final_day_index);
+ if (status != kOK) {
+ return status;
+ }
+ break;
+ }
+ default:
+ continue;
+ }
+ break;
}
default:
continue;
@@ -501,6 +526,12 @@
if (day_index_local == 0u) {
day_index_local = day_index_utc;
}
+ if (std::min(day_index_utc, day_index_local) <
+ backfill_days_ + kMaxAllowedAggregationWindowSize) {
+ LOG(ERROR) << "GarbageCollect: Day index must be >= backfill_days_ + "
+ "kMaxAllowedAggregationWindowSize.";
+ return kInvalidArguments;
+ }
auto locked = protected_aggregate_store_.lock();
for (auto pair : locked->local_aggregate_store.by_report_key()) {
uint32_t day_index;
@@ -532,10 +563,6 @@
"window size.";
return kInvalidConfig;
}
- if (day_index < backfill_days_ + max_window_size) {
- LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size.";
- return kInvalidArguments;
- }
// For each ReportAggregates, descend to and iterate over the sub-map of
// local aggregates keyed by day index. Keep buckets with day indices
// greater than |day_index| - |backfill_days_| - |max_window_size|, and
@@ -723,6 +750,10 @@
const MetricRef metric_ref, const std::string& report_key,
const ReportAggregates& report_aggregates, uint32_t num_event_codes,
uint32_t final_day_index) {
+ // The earliest day index for which we might need to generate an Observation.
+ // GenerateObservations() has checked that this value is > 0.
+ auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
+
for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
auto daily_aggregates =
report_aggregates.unique_actives_aggregates().by_event_code().find(
@@ -746,8 +777,7 @@
// generated on this invocation.
auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
window_size);
- auto first_day_index =
- std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_));
+ auto first_day_index = std::max(last_gen + 1, backfill_period_start);
// The latest day index on which |event_type| is known to have
// occurred, so far. This value will be updated as we search
// forward from the earliest day index belonging to a window of
@@ -794,5 +824,203 @@
return kOK;
}
+////////// GeneratePerDeviceCountObservations and helper methods /////////////
+
+uint32_t EventAggregator::PerDeviceCountLastGeneratedDayIndex(
+ const std::string& report_key, const std::string& component,
+ uint32_t event_code, uint32_t window_size) const {
+ const auto& report_history = obs_history_.by_report_key().find(report_key);
+ if (report_history == obs_history_.by_report_key().end()) {
+ return 0u;
+ }
+ if (!report_history->second.has_per_device_count_history()) {
+ return 0u;
+ }
+ const auto& component_history =
+ report_history->second.per_device_count_history().by_component().find(
+ component);
+ if (component_history ==
+ report_history->second.per_device_count_history().by_component().end()) {
+ return 0u;
+ }
+ const auto& event_code_history =
+ component_history->second.by_event_code().find(event_code);
+ if (event_code_history == component_history->second.by_event_code().end()) {
+ return 0u;
+ }
+ const auto& window_size_history =
+ event_code_history->second.by_window_size().find(window_size);
+ if (window_size_history ==
+ event_code_history->second.by_window_size().end()) {
+ return 0u;
+ }
+ return window_size_history->second;
+}
+
+Status EventAggregator::GenerateSinglePerDeviceCountObservation(
+ const MetricRef metric_ref, const ReportDefinition* report,
+ uint32_t obs_day_index, const std::string& component, uint32_t event_code,
+ uint32_t window_size, int64_t count) const {
+ auto encoder_result = encoder_->EncodePerDeviceCountObservation(
+ metric_ref, report, obs_day_index, component, event_code, count,
+ window_size);
+ if (encoder_result.status != kOK) {
+ return encoder_result.status;
+ }
+ if (encoder_result.observation == nullptr ||
+ encoder_result.metadata == nullptr) {
+ LOG(ERROR) << "Failed to encode PerDeviceCountObservation";
+ return kOther;
+ }
+
+ const auto& writer_status = observation_writer_->WriteObservation(
+ *encoder_result.observation, std::move(encoder_result.metadata));
+ if (writer_status != kOK) {
+ return writer_status;
+ }
+ return kOK;
+}
+
+Status EventAggregator::GeneratePerDeviceCountObservations(
+ const MetricRef metric_ref, const std::string& report_key,
+ const ReportAggregates& report_aggregates, uint32_t final_day_index) {
+ // Get the window sizes for this report and sort them in increasing order.
+ // TODO(pesk): Instead, have UpdateAggregationConfigs() store the window sizes
+ // in increasing order.
+ std::vector<uint32_t> window_sizes;
+ for (uint32_t window_size :
+ report_aggregates.aggregation_config().report().window_size()) {
+ if (window_size > kMaxAllowedAggregationWindowSize) {
+ LOG(WARNING) << "GeneratePerDeviceCountObservations ignoring a window "
+ "size exceeding the maximum allowed value";
+ continue;
+ }
+ window_sizes.push_back(window_size);
+ }
+ std::sort(window_sizes.begin(), window_sizes.end());
+
+ // The first day index for which we might have to generate an Observation.
+ // GenerateObservations() has checked that this value is > 0.
+ auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
+
+ // Generate any necessary PerDeviceCountObservations for this report.
+ for (const auto& component_pair :
+ report_aggregates.count_aggregates().by_component()) {
+ const auto& component = component_pair.first;
+ for (const auto& event_code_pair : component_pair.second.by_event_code()) {
+ auto event_code = event_code_pair.first;
+ const auto& event_code_aggregates = event_code_pair.second;
+ // Populate a helper map keyed by day indices which belong to the range
+ // [|backfill_period_start|, |final_day_index|]. The value at a day index
+ // is the list of window sizes, in increasing order, for which an
+ // Observation should be generated for that day index.
+ std::map<uint32_t, std::vector<uint32_t>> window_sizes_by_obs_day;
+ for (auto window_size : window_sizes) {
+ auto last_gen = PerDeviceCountLastGeneratedDayIndex(
+ report_key, component, event_code, window_size);
+ auto first_day_index = std::max(last_gen + 1, backfill_period_start);
+ for (auto obs_day_index = first_day_index;
+ obs_day_index <= final_day_index; obs_day_index++) {
+ window_sizes_by_obs_day[obs_day_index].push_back(window_size);
+ }
+ }
+ // Iterate over the day indices |obs_day_index| for which we might need to
+ // generate an Observation. For each day index, generate an Observation
+ // for each needed window size.
+ //
+ // More precisely, for each needed window size, compute the count over the
+ // window and generate a PerDeviceCountObservation only if the count is
+ // nonzero. Whether or not the count was zero, update the
+ // AggregatedObservationHistory for this report, component, event code,
+ // and window size with |obs_day_index| as the most recent date of
+ // Observation generation. This reflects the fact that the count was
+ // computed for the window ending on that date, even though an Observation
+ // is only sent if the count is nonzero.
+ for (auto obs_day_index = backfill_period_start;
+ obs_day_index <= final_day_index; obs_day_index++) {
+ const auto& window_sizes = window_sizes_by_obs_day.find(obs_day_index);
+ if (window_sizes == window_sizes_by_obs_day.end()) {
+ continue;
+ }
+ int64_t count = 0;
+ uint32_t num_days = 0;
+ for (auto window_size : window_sizes->second) {
+ while (num_days < window_size) {
+ const auto& day_aggregates =
+ event_code_aggregates.by_day_index().find(obs_day_index -
+ num_days);
+ if (day_aggregates != event_code_aggregates.by_day_index().end()) {
+ count += day_aggregates->second.count_daily_aggregate().count();
+ }
+ num_days++;
+ }
+ if (count != 0) {
+ auto status = GenerateSinglePerDeviceCountObservation(
+ metric_ref, &report_aggregates.aggregation_config().report(),
+ obs_day_index, component, event_code, window_size, count);
+ if (status != kOK) {
+ return status;
+ }
+ }
+ // Update |obs_history_| with the latest date of Observation
+ // generation for this report, component, event code, and window
+ // size.
+ (*(*(*(*obs_history_.mutable_by_report_key())[report_key]
+ .mutable_per_device_count_history()
+ ->mutable_by_component())[component]
+ .mutable_by_event_code())[event_code]
+ .mutable_by_window_size())[window_size] = obs_day_index;
+ }
+ }
+ }
+ }
+ // Generate any necessary ReportParticipationObservations for this report.
+ auto participation_last_gen =
+ ReportParticipationLastGeneratedDayIndex(report_key);
+ auto participation_first_day_index =
+ std::max(participation_last_gen + 1, backfill_period_start);
+ for (auto obs_day_index = participation_first_day_index;
+ obs_day_index <= final_day_index; obs_day_index++) {
+ GenerateSingleReportParticipationObservation(
+ metric_ref, &report_aggregates.aggregation_config().report(),
+ obs_day_index);
+ (*obs_history_.mutable_by_report_key())[report_key]
+ .mutable_report_participation_history()
+ ->set_last_generated(obs_day_index);
+ }
+ return kOK;
+}
+
+uint32_t EventAggregator::ReportParticipationLastGeneratedDayIndex(
+ const std::string& report_key) const {
+ const auto& report_history = obs_history_.by_report_key().find(report_key);
+ if (report_history == obs_history_.by_report_key().end()) {
+ return 0u;
+ }
+ return report_history->second.report_participation_history().last_generated();
+}
+
+Status EventAggregator::GenerateSingleReportParticipationObservation(
+ const MetricRef metric_ref, const ReportDefinition* report,
+ uint32_t obs_day_index) const {
+ auto encoder_result = encoder_->EncodeReportParticipationObservation(
+ metric_ref, report, obs_day_index);
+ if (encoder_result.status != kOK) {
+ return encoder_result.status;
+ }
+ if (encoder_result.observation == nullptr ||
+ encoder_result.metadata == nullptr) {
+ LOG(ERROR) << "Failed to encode ReportParticipationObservation";
+ return kOther;
+ }
+
+ const auto& writer_status = observation_writer_->WriteObservation(
+ *encoder_result.observation, std::move(encoder_result.metadata));
+ if (writer_status != kOK) {
+ return writer_status;
+ }
+ return kOK;
+}
+
} // namespace logger
} // namespace cobalt
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h
index 8fbaf24..a742ce3 100644
--- a/logger/event_aggregator.h
+++ b/logger/event_aggregator.h
@@ -27,20 +27,19 @@
namespace cobalt {
namespace logger {
-// The EventAggregator class manages an in-memory store of aggregated values
-// of Events logged for locally aggregated report types. For each day, this
-// LocalAggregateStore contains an aggregate of the values of logged Events of
-// a given event code over that day. These daily aggregates are used to
-// produce Observations of values aggregated over a rolling window of size
-// specified in the ReportDefinition.
+// The EventAggregator manages an in-memory store of aggregated Event values,
+// indexed by report, day index, and other dimensions specific to the report
+// type (e.g. event code). Periodically, this data is used to generate
+// Observations representing aggregates of Event values over a day, week, month,
+// etc.
//
// Each Logger interacts with the EventAggregator in the following way:
// (1) When the Logger is created, it calls UpdateAggregationConfigs() to
-// provide the EventAggregator with the configurations of its locally
-// aggregated reports.
-// (2) When logging an Event for a locally aggregated report, a Logger calls an
-// Update*Aggregation() method with the Event and the ReportAggregationKey of
-// the report.
+// provide the EventAggregator with its view of Cobalt's metric and report
+// registry.
+// (2) When logging an Event for a locally aggregated report, a Logger
+// calls an Update*Aggregation() method with the Event and the
+// ReportAggregationKey of the report.
//
// A worker thread does the following tasks at intervals specified in the
// EventAggregator's constructor:
@@ -115,8 +114,8 @@
// Start the worker thread.
void Start();
- // Updates the EventAggregator's view of the set of locally aggregated
- // report configurations.
+ // Updates the EventAggregator's view of the Cobalt metric and report
+ // registry.
//
// This method may be called multiple times during the EventAggregator's
// lifetime. If the EventAggregator is provided with a report whose tuple
@@ -266,6 +265,22 @@
uint32_t event_code,
uint32_t window_size) const;
+ // Returns the most recent day index for which an Observation was generated
+ // for a given PER_DEVICE_COUNT_STATS report, component, event code,
+ // and window size, according to |obs_history_|. Returns 0 if no Observation
+ // has been generated for the given arguments.
+ uint32_t PerDeviceCountLastGeneratedDayIndex(const std::string& report_key,
+ const std::string& component,
+ uint32_t event_code,
+ uint32_t window_size) const;
+
+ // Returns the most recent day index for which a
+ // ReportParticipationObservation was generated for a given report, according
+ // to |obs_history_|. Returns 0 if no Observation has been generated for the
+ // given arguments.
+ uint32_t ReportParticipationLastGeneratedDayIndex(
+ const std::string& report_key) const;
+
// For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
// for each event code of the parent metric, for each window size of the
// report, for the window of that size ending on |final_day_index|, unless
@@ -290,6 +305,40 @@
uint32_t window_size,
bool was_active) const;
+ // For a fixed report of type PER_DEVICE_COUNT_STATS, generates a
+ // PerDeviceCountObservation for each tuple (component, event code, window
+ // size) for which a positive number of events with that event code occurred
+ // with that component during the window of that size ending on
+ // |final_day_index|, unless an Observation with those parameters has been
+ // generated in the past. Also generates PerDeviceCountObservations for days
+ // in the backfill period if needed.
+ //
+ // In addition to PerDeviceCountObservations, generates a
+ // ReportParticipationObservation for |final_day_index| and any needed days in
+ // the backfill period. These ReportParticipationObservations are used by the
+ // PerDeviceCount report generator to infer the fleet-wide number of devices
+ // for which the event count associated to each tuple (component, event code,
+ // window size) was zero.
+ //
+ // Observations are not generated for aggregation windows larger than
+ // |kMaxAllowedAggregationWindowSize|.
+ Status GeneratePerDeviceCountObservations(
+ const MetricRef metric_ref, const std::string& report_key,
+ const ReportAggregates& report_aggregates, uint32_t final_day_index);
+
+ // Helper method called by GeneratePerDeviceCountObservations() to generate
+ // and write a single PerDeviceCountObservation.
+ Status GenerateSinglePerDeviceCountObservation(
+ const MetricRef metric_ref, const ReportDefinition* report,
+ uint32_t obs_day_index, const std::string& component, uint32_t event_code,
+ uint32_t window_size, int64_t count) const;
+
+ // Helper method called by GeneratePerDeviceCountObservations() to generate
+ // and write a single ReportParticipationObservation.
+ Status GenerateSingleReportParticipationObservation(
+ const MetricRef metric_ref, const ReportDefinition* report,
+ uint32_t obs_day_index) const;
+
// Returns a copy of the LocalAggregateStore. Does not assume that the
// caller holds the mutex of |protected_aggregate_store_|.
LocalAggregateStore CopyLocalAggregateStore() {
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 6ba882e..fe266bc 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -34,13 +34,17 @@
namespace logger {
+using testing::CheckPerDeviceCountObservations;
using testing::CheckUniqueActivesObservations;
using testing::ExpectedAggregationParams;
+using testing::ExpectedPerDeviceCountObservations;
+using testing::ExpectedReportParticipationObservations;
using testing::ExpectedUniqueActivesObservations;
using testing::FakeObservationStore;
using testing::FetchAggregatedObservations;
using testing::MakeAggregationConfig;
using testing::MakeAggregationKey;
+using testing::MakeExpectedReportParticipationObservations;
using testing::MakeNullExpectedUniqueActivesObservations;
using testing::MockConsistentProtoStore;
using testing::PopulateMetricDefinitions;
@@ -122,14 +126,14 @@
static const ExpectedAggregationParams kExpectedParams = {
/* The total number of locally aggregated Observations that should be
generated for each day index. */
- 9,
+ 10,
/* The MetricReportIds of the locally aggregated reports in this
configuration. */
{kErrorsOccurredMetricReportId, kConnectionFailuresMetricReportId},
/* The number of Observations which should be generated for each day index,
broken down by MetricReportId. */
{{kErrorsOccurredMetricReportId, 9},
- {kConnectionFailuresMetricReportId, 0}},
+ {kConnectionFailuresMetricReportId, 1}},
/* The number of event codes for each report of type UNIQUE_N_DAY_ACTIVES,
by MetricReportId. */
{{kErrorsOccurredMetricReportId, 3}},
@@ -345,21 +349,19 @@
// Properties of the locally aggregated Observations which should be generated
// for the reports in |kMetricDefinitions|, assuming that no events have ever
// been logged for those reports.
-//
-// TODO(pesk): update these fields once the EventAggregator is generating
-// observations for PerDeviceCount reports.
static const ExpectedAggregationParams kPerDeviceCountExpectedParams = {
/* The total number of Observations that should be generated for a day
index. */
- 0,
+ 2,
/* The MetricReportIds of the locally aggregated reports in this
configuration. */
{kConnectionFailuresMetricReportId, kSettingsChangedMetricReportId},
/* The number of Observations which should be generated for a day index,
broken down by MetricReportId. */
- {{kConnectionFailuresMetricReportId, 0},
- {kSettingsChangedMetricReportId, 0}},
- /* Omitted because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
+ {{kConnectionFailuresMetricReportId, 1},
+ {kSettingsChangedMetricReportId, 1}},
+ /* The number of event codes for each UNIQUE_N_DAY_ACTIVES report. Omitted
+ because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
{},
/* The set of window sizes for each MetricReportId. */
{{kConnectionFailuresMetricReportId, {1}},
@@ -472,11 +474,11 @@
event_aggregator_.reset(new EventAggregator(
encoder_.get(), observation_writer_.get(),
local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
- // Provide the EventAggregator with a mock clock starting at 1 year after
+ // Provide the EventAggregator with a mock clock starting at 10 years after
// the beginning of time.
mock_clock_ = new IncrementingClock(std::chrono::system_clock::duration(0));
- mock_clock_->set_time(
- std::chrono::system_clock::time_point(std::chrono::seconds(kYear)));
+ mock_clock_->set_time(std::chrono::system_clock::time_point(
+ std::chrono::seconds(10 * kYear)));
event_aggregator_->SetClock(mock_clock_);
day_store_created_ = CurrentDayIndex();
}
@@ -2101,7 +2103,7 @@
// Tests that the expected UniqueActivesObservations are generated when events
// are logged over multiple days and when Observations are backfilled for some
// days during that period, and when the LocalAggregateStore is
-// garbage-collected after each all to GenerateObservations().
+// garbage-collected after each call to GenerateObservations().
//
// The test sets the number of backfill days to 3.
//
@@ -2361,6 +2363,851 @@
}
}
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated after
+// some CountEvents have been logged for PerDeviceCount reports, without any
+// garbage collection.
+//
+// For 35 days, logs a positive number of events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Each day, calls GenerateObservations() with the day index of the previous
+// day. Checks that a positive status is returned and that the
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservations) {
+ int num_days = 1;
+ std::vector<Observation2> observations(0);
+ ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams;
+ for (int offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ observations.clear();
+ ResetObservationStore();
+ EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+ observation_store_.get(),
+ update_recipient_.get()));
+ for (int i = 0; i < 2; i++) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 1));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_B", 0u, 5));
+ }
+ // If this is the first time we're logging events, update the expected
+ // numbers of generated Observations to account for the logged events.
+ // For each report, for each window size, expect 1 Observation more than if
+ // no events had been logged.
+ if (offset == 0) {
+ expected_params.daily_num_obs += 3;
+ expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] +=
+ 1;
+ expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2;
+ }
+ AdvanceClock(kDay);
+ }
+ observations.clear();
+ ResetObservationStore();
+ EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex() - 1));
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+ observation_store_.get(),
+ update_recipient_.get()));
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated after
+// some CountEvents have been logged for PerDeviceCount reports over multiple
+// days, and when the LocalAggregateStore is garbage-collected each day.
+//
+// For 35 days, logs a positive number of events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Each day, calls GenerateObservations() with the day index of the previous
+// day. Checks that a positive status is returned and that the
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithGc) {
+ int num_days = 35;
+ std::vector<Observation2> observations(0);
+ ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams;
+ for (int offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ observations.clear();
+ ResetObservationStore();
+ EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+ observation_store_.get(),
+ update_recipient_.get()));
+ EXPECT_EQ(kOK, GarbageCollect(day_index));
+ for (int i = 0; i < 2; i++) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 1));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_B", 0u, 5));
+ }
+ // If this is the first time we're logging events, update the expected
+ // numbers of generated Observations to account for the logged events.
+ // For each report, for each window size, expect 1 Observation more than if
+ // no events had been logged.
+ if (offset == 0) {
+ expected_params.daily_num_obs += 3;
+ expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] +=
+ 1;
+ expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2;
+ }
+ AdvanceClock(kDay);
+ }
+ observations.clear();
+ ResetObservationStore();
+ auto day_index = CurrentDayIndex();
+ EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+ EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+ observation_store_.get(),
+ update_recipient_.get()));
+ EXPECT_EQ(kOK, GarbageCollect(day_index));
+}
+
+// Tests that GenerateObservations() returns a positive status and that the
+// expected number of Observations is generated when events are logged over
+// multiple days and some of those days' Observations are backfilled, without
+// any garbage collection of the LocalAggregateStore.
+//
+// Sets the |backfill_days_| field of the EventAggregator to 3.
+//
+// Logging pattern:
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
+//
+// Observation generation pattern:
+// Calls GenerateObservations() on the 1st through 5th and the 7th out of
+// every 10 days, for 35 days.
+//
+// Expected numbers of Observations:
+// It is expected that 4 days' worth of Observations are generated on
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
+// are generated on the 2nd through 5th day of every 10, that 2 days'
+// worth of Observations are generated on the 7th day of every 10 (the
+// day index for which GenerateObservations() was called, plus 1 day of
+// backfill), and that no Observations are generated on the remaining days.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithBackfill) {
+ // Set |backfill_days_| to 3.
+ size_t backfill_days = 3;
+ SetBackfillDays(backfill_days);
+ // Log 2 events each day for 35 days. Call GenerateObservations() on the
+ // first 5 day indices, and the 7th, out of every 10.
+ for (int offset = 0; offset < 35; offset++) {
+ auto day_index = CurrentDayIndex();
+ for (int i = 0; i < 2; i++) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 1));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_B", 0u, 5));
+ }
+ auto num_obs_before = observation_store_->messages_received.size();
+ if (offset % 10 < 5 || offset % 10 == 6) {
+ EXPECT_EQ(kOK, GenerateObservations(day_index));
+ }
+ auto num_obs_after = observation_store_->messages_received.size();
+ EXPECT_GE(num_obs_after, num_obs_before);
+ // Check that the expected daily number of Observations was generated.
+ switch (offset % 10) {
+ case 0:
+ // If this is the first day of logging, expect 3 Observations for each
+ // day in the backfill period and 6 Observations for the current day.
+ if (offset == 0) {
+ EXPECT_EQ(
+ (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) +
+ kPerDeviceCountExpectedParams.daily_num_obs + 3,
+ num_obs_after - num_obs_before);
+ } else {
+ // If this is another day whose offset is a multiple of 10, expect 6
+ // Observations for each day in the backfill period as well as the
+ // current day.
+ EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) *
+ (backfill_days + 1),
+ num_obs_after - num_obs_before);
+ }
+ break;
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ // Expect 6 Observations for this day.
+ EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3,
+ num_obs_after - num_obs_before);
+ break;
+ case 6:
+ // Expect 6 Observations for each of today and yesterday.
+ EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2,
+ num_obs_after - num_obs_before);
+ break;
+ default:
+ EXPECT_EQ(num_obs_after, num_obs_before);
+ }
+ AdvanceClock(kDay);
+ }
+}
+
+// Tests that GenerateObservations() returns a positive status and that the
+// expected number of Observations is generated when events are logged over
+// multiple days and some of those days' Observations are backfilled, and when
+// the LocalAggregateStore is garbage-collected after each call to
+// GenerateObservations().
+//
+// Sets the |backfill_days_| field of the EventAggregator to 3.
+//
+// Logging pattern:
+// For 35 days, logs 2 events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and 2 events for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Observation generation pattern:
+// Calls GenerateObservations() on the 1st through 5th and the 7th out of
+// every 10 days, for 35 days. Garbage-collects the LocalAggregateStore after
+// each call.
+//
+// Expected numbers of Observations:
+// It is expected that 4 days' worth of Observations are generated on
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
+// are generated on the 2nd through 5th day of every 10, that 2 days'
+// worth of Observations are generated on the 7th day of every 10 (the
+// day index for which GenerateObservations() was called, plus 1 day of
+// backfill), and that no Observations are generated on the remaining days.
+TEST_F(PerDeviceCountEventAggregatorTest,
+ GenerateObservationsWithBackfillAndGc) {
+ int num_days = 35;
+ // Set |backfill_days_| to 3.
+ size_t backfill_days = 3;
+ SetBackfillDays(backfill_days);
+ // Log 2 events each day for 35 days. Call GenerateObservations() on the
+ // first 5 day indices, and the 7th, out of every 10.
+ for (int offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ for (int i = 0; i < 2; i++) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 1));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_B", 0u, 5));
+ }
+ auto num_obs_before = observation_store_->messages_received.size();
+ if (offset % 10 < 5 || offset % 10 == 6) {
+ EXPECT_EQ(kOK, GenerateObservations(day_index));
+ EXPECT_EQ(kOK, GarbageCollect(day_index));
+ }
+ auto num_obs_after = observation_store_->messages_received.size();
+ EXPECT_GE(num_obs_after, num_obs_before);
+ // Check that the expected daily number of Observations was generated.
+ switch (offset % 10) {
+ case 0:
+ // If this is the first day of logging, expect 3 Observations for each
+ // day in the backfill period and 6 Observations for the current day.
+ if (offset == 0) {
+ EXPECT_EQ(
+ (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) +
+ kPerDeviceCountExpectedParams.daily_num_obs + 3,
+ num_obs_after - num_obs_before);
+ } else {
+ // If this is another day whose offset is a multiple of 10, expect 6
+ // Observations for each day in the backfill period as well as the
+ // current day.
+ EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) *
+ (backfill_days + 1),
+ num_obs_after - num_obs_before);
+ }
+ break;
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ // Expect 6 Observations for this day.
+ EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3,
+ num_obs_after - num_obs_before);
+ break;
+ case 6:
+ // Expect 6 Observations for each of today and yesterday.
+ EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2,
+ num_obs_after - num_obs_before);
+ break;
+ default:
+ EXPECT_EQ(num_obs_after, num_obs_before);
+ }
+ AdvanceClock(kDay);
+ }
+}
+
+// Generate Observations without logging any events, and check that the
+// resulting Observations are as expected: 1 ReportParticipationObservation for
+// each PER_DEVICE_COUNT_STATS report in the config, and no
+// PerDeviceCountObservations.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesNoEvents) {
+ auto current_day_index = CurrentDayIndex();
+ EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+ auto expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams,
+ current_day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ {}, expected_report_participation_obs, observation_store_.get(),
+ update_recipient_.get()));
+}
+
+// Check that the expected PerDeviceCountObservations and
+// ReportParticipationObservations are generated when GenerateObservations() is
+// called after logging some events for PER_DEVICE_COUNT_STATS reports over a
+// single day index.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesSingleDay) {
+ auto day_index = CurrentDayIndex();
+ // Log several events on |day_index|.
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 5));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_B", 0u, 5));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 0u, 5));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+ day_index, "component_A", 1u, 5));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_C", 0u, 5));
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "component_C", 0u, 5));
+ // Generate locally aggregated Observations for |day_index|.
+ EXPECT_EQ(kOK, GenerateObservations(day_index));
+
+ // Form the expected Observations.
+ auto expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams,
+ day_index);
+ ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+ expected_per_device_count_obs[{kConnectionFailuresMetricReportId,
+ day_index}][1] = {
+ {"component_A", 0u, 10}, {"component_A", 1u, 5}, {"component_B", 0u, 5}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}]
+ [7] = {{"component_C", 0u, 10}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}]
+ [30] = {{"component_C", 0u, 10}};
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+}
+
+// Checks that PerDeviceCountObservations with the expected values are
+// generated when some events have been logged for a PER_DEVICE_COUNT
+// report for over multiple days and GenerateObservations() is called each
+// day, without garbage collection or backfill.
+//
+// Logged events for the SettingsChanged_PerDeviceCount metric on the i-th day:
+//
+// i (component, event code, count)
+// -----------------------------------------------------------------------
+// 0
+// 1 ("A", 1, 3)
+// 2 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2)
+// 3 ("A", 1, 3)
+// 4 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2)
+// 5 ("A", 1, 3)
+// 6 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2)
+// 7 ("A", 1, 3)
+// 8 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2)
+// 9 ("A", 1, 3)
+//
+// Expected PerDeviceCountObservations for the SettingsChanged_PerDeviceCount
+// report on the i-th day:
+//
+// (i, window size) (component, event code, count)
+// -----------------------------------------------------------------------
+// (0, 7)
+// (0, 30)
+// (1, 7) ("A", 1, 3)
+// (1, 30) ("A", 1, 3)
+// (2, 7) ("A", 1, 6), ("A", 2, 3), ("B", 1, 2)
+// (2, 30) ("A", 1, 6), ("A", 2, 3), ("B", 1, 2)
+// (3, 7) ("A", 1, 9), ("A", 2, 3), ("B", 1, 2)
+// (3, 30) ("A", 1, 9), ("A", 2, 3), ("B", 1, 2)
+// (4, 7) ("A", 1, 12), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2)
+// (4, 30) ("A", 1, 12), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2)
+// (5, 7) ("A", 1, 15), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2)
+// (5, 30) ("A", 1, 15), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2)
+// (6, 7) ("A", 1, 18), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2)
+// (6, 30) ("A", 1, 18), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2)
+// (7, 7) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2)
+// (7, 30) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2)
+// (8, 7) ("A", 1, 21), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+// (8, 30) ("A", 1, 24), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+// (9, 7) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 4)
+// (9, 30) ("A", 1, 27), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+//
+// In addition, expect 2 ReportParticipationObservations each day, 1 for each
+// of ConnectionFailures_PerDeviceCount and SettingsChanged_PerDeviceCount.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesMultiDay) {
+ auto start_day_index = CurrentDayIndex();
+ // Form expected Observations for the 10 days of logging.
+ uint32_t num_days = 10;
+ std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs(
+ num_days);
+ std::vector<ExpectedReportParticipationObservations>
+ expected_report_participation_obs(num_days);
+ for (uint32_t offset = 0; offset < num_days; offset++) {
+ expected_report_participation_obs[offset] =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, start_day_index + offset);
+ }
+ expected_per_device_count_obs[0] = {};
+ expected_per_device_count_obs[1][{kSettingsChangedMetricReportId,
+ start_day_index + 1}] = {
+ {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}};
+ expected_per_device_count_obs[2][{kSettingsChangedMetricReportId,
+ start_day_index + 2}] = {
+ {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_per_device_count_obs[3][{kSettingsChangedMetricReportId,
+ start_day_index + 3}] = {
+ {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_per_device_count_obs[4][{kSettingsChangedMetricReportId,
+ start_day_index + 4}] = {
+ {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[5][{kSettingsChangedMetricReportId,
+ start_day_index + 5}] = {
+ {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[6][{kSettingsChangedMetricReportId,
+ start_day_index + 6}] = {
+ {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[7][{kSettingsChangedMetricReportId,
+ start_day_index + 7}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[8][{kSettingsChangedMetricReportId,
+ start_day_index + 8}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+ expected_per_device_count_obs[9][{kSettingsChangedMetricReportId,
+ start_day_index + 9}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+
+ for (uint32_t offset = 0; offset < 1; offset++) {
+ auto day_index = CurrentDayIndex();
+ for (uint32_t event_code = 1; event_code < 3; event_code++) {
+ if (offset > 0 && offset % event_code == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "A", event_code, 3));
+ }
+ if (offset > 0 && offset % (2 * event_code) == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "B", event_code, 2));
+ }
+ }
+ // Clear the FakeObservationStore.
+ ResetObservationStore();
+ // Generate locally aggregated Observations.
+ EXPECT_EQ(kOK, GenerateObservations(day_index));
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs[offset],
+ expected_report_participation_obs[offset], observation_store_.get(),
+ update_recipient_.get()))
+ << "offset = " << offset;
+ AdvanceClock(kDay);
+ }
+}
+
+// Repeat the CheckObservationValuesMultiDay test, this time calling
+// GarbageCollect() after each call to GenerateObservations.
+//
+// The logging pattern and set of Observations for each day index is the same
+// as in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest,
+ CheckObservationValuesMultiDayWithGarbageCollection) {
+ auto start_day_index = CurrentDayIndex();
+ // Form expected Observations for the 10 days of logging.
+ uint32_t num_days = 10;
+ std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs(
+ num_days);
+ std::vector<ExpectedReportParticipationObservations>
+ expected_report_participation_obs(num_days);
+ for (uint32_t offset = 0; offset < num_days; offset++) {
+ expected_report_participation_obs[offset] =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, start_day_index + offset);
+ }
+ expected_per_device_count_obs[0] = {};
+ expected_per_device_count_obs[1][{kSettingsChangedMetricReportId,
+ start_day_index + 1}] = {
+ {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}};
+ expected_per_device_count_obs[2][{kSettingsChangedMetricReportId,
+ start_day_index + 2}] = {
+ {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_per_device_count_obs[3][{kSettingsChangedMetricReportId,
+ start_day_index + 3}] = {
+ {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_per_device_count_obs[4][{kSettingsChangedMetricReportId,
+ start_day_index + 4}] = {
+ {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[5][{kSettingsChangedMetricReportId,
+ start_day_index + 5}] = {
+ {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[6][{kSettingsChangedMetricReportId,
+ start_day_index + 6}] = {
+ {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[7][{kSettingsChangedMetricReportId,
+ start_day_index + 7}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[8][{kSettingsChangedMetricReportId,
+ start_day_index + 8}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+ expected_per_device_count_obs[9][{kSettingsChangedMetricReportId,
+ start_day_index + 9}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+
+ for (uint32_t offset = 0; offset < 10; offset++) {
+ auto day_index = CurrentDayIndex();
+ for (uint32_t event_code = 1; event_code < 3; event_code++) {
+ if (offset > 0 && offset % event_code == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "A", event_code, 3));
+ }
+ if (offset > 0 && offset % (2 * event_code) == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "B", event_code, 2));
+ }
+ }
+ // Advance |mock_clock_| by 1 day.
+ AdvanceClock(kDay);
+ // Clear the FakeObservationStore.
+ ResetObservationStore();
+ // Generate locally aggregated Observations and garbage-collect the
+ // LocalAggregateStore, both for the previous day as measured by
+ // |mock_clock_|. Back up the LocalAggregateStore and
+ // AggregatedObservationHistoryStore.
+ DoScheduledTasksNow();
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs[offset],
+ expected_report_participation_obs[offset], observation_store_.get(),
+ update_recipient_.get()));
+ }
+}
+
+// Tests that the expected PerDeviceCountObservations are generated when events
+// are logged over multiple days and when Observations are backfilled for some
+// days during that period, without any garbage-collection of the
+// LocalAggregateStore.
+//
+// The logging pattern and set of Observations for each day index is the same as
+// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesWithBackfill) {
+ auto start_day_index = CurrentDayIndex();
+ // Set |backfill_days_| to 3.
+ size_t backfill_days = 3;
+ SetBackfillDays(backfill_days);
+ // Log events for 9 days. Call GenerateObservations() on the first 6 day
+ // indices, and the 9th.
+ uint32_t num_days = 9;
+ for (uint32_t offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ ResetObservationStore();
+ for (uint32_t event_code = 1; event_code < 3; event_code++) {
+ if (offset > 0 && (offset % event_code == 0)) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "A", event_code, 3));
+ }
+ if (offset > 0 && offset % (2 * event_code) == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "B", event_code, 2));
+ }
+ }
+ if (offset < 6 || offset == 8) {
+ EXPECT_EQ(kOK, GenerateObservations(day_index));
+ }
+ // Make the set of Observations which are expected to be generated on
+ // |start_day_index + offset| and check it against the contents of the
+ // FakeObservationStore.
+ ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+ ExpectedReportParticipationObservations expected_report_participation_obs;
+ switch (offset) {
+ case 0: {
+ for (uint32_t day_index = start_day_index - backfill_days;
+ day_index <= start_day_index; day_index++) {
+ for (const auto& pair : MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index)) {
+ expected_report_participation_obs.insert(pair);
+ }
+ }
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 1: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {{7, {{"A", 1u, 3}}},
+ {30, {{"A", 1u, 3}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 2: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 3: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 4: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 5: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 8: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 6}] = {
+ {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 7}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 8}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+ for (uint32_t day_index = start_day_index + 6;
+ day_index <= start_day_index + 8; day_index++) {
+ for (const auto& pair : MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index)) {
+ expected_report_participation_obs.insert(pair);
+ }
+ }
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ default:
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ }
+ AdvanceClock(kDay);
+ }
+}
+
+// Tests that the expected Observations are generated for PerDeviceCount reports
+// when events are logged over multiple days and when Observations are
+// backfilled for some days during that period, and when the
+// LocalAggregatedStore is garbage-collected after each call to
+// GenerateObservations().
+//
+// The logging pattern and set of Observations for each day index is the same as
+// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest,
+ CheckObservationValuesWithBackfillAndGc) {
+ auto start_day_index = CurrentDayIndex();
+ // Set |backfill_days_| to 3.
+ size_t backfill_days = 3;
+ SetBackfillDays(backfill_days);
+ // Log events for 9 days. Call GenerateObservations() on the first 6 day
+ // indices, and the 9th.
+ uint32_t num_days = 9;
+ for (uint32_t offset = 0; offset < num_days; offset++) {
+ auto day_index = CurrentDayIndex();
+ ResetObservationStore();
+ for (uint32_t event_code = 1; event_code < 3; event_code++) {
+ if (offset > 0 && (offset % event_code == 0)) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "A", event_code, 3));
+ }
+ if (offset > 0 && offset % (2 * event_code) == 0) {
+ EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+ day_index, "B", event_code, 2));
+ }
+ }
+ // Advance |mock_clock_| by 1 day.
+ AdvanceClock(kDay);
+ if (offset < 6 || offset == 8) {
+ // Generate Observations and garbage-collect, both for the previous day
+ // index according to |mock_clock_|. Back up the LocalAggregateStore and
+ // the AggregatedObservationHistoryStore.
+ DoScheduledTasksNow();
+ }
+ // Make the set of Observations which are expected to be generated on
+ // |start_day_index + offset| and check it against the contents of the
+ // FakeObservationStore.
+ ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+ ExpectedReportParticipationObservations expected_report_participation_obs;
+ switch (offset) {
+ case 0: {
+ for (uint32_t day_index = start_day_index - backfill_days;
+ day_index <= start_day_index; day_index++) {
+ for (const auto& pair : MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index)) {
+ expected_report_participation_obs.insert(pair);
+ }
+ }
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 1: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {{7, {{"A", 1u, 3}}},
+ {30, {{"A", 1u, 3}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 2: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 3: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+ {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 4: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 5: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ day_index}] = {
+ {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+ expected_report_participation_obs =
+ MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index);
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ case 8: {
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 6}] = {
+ {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 7}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+ {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+ expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+ start_day_index + 8}] = {
+ {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+ {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+ for (uint32_t day_index = start_day_index + 6;
+ day_index <= start_day_index + 8; day_index++) {
+ for (const auto& pair : MakeExpectedReportParticipationObservations(
+ kPerDeviceCountExpectedParams, day_index)) {
+ expected_report_participation_obs.insert(pair);
+ }
+ }
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ break;
+ }
+ default:
+ EXPECT_TRUE(CheckPerDeviceCountObservations(
+ expected_per_device_count_obs, expected_report_participation_obs,
+ observation_store_.get(), update_recipient_.get()));
+ }
+ }
+}
+
// Tests GenerateObservations() and GarbageCollect() in the case where the
// LocalAggregateStore contains aggregates for metrics with both UTC and LOCAL
// time zone policies, and where the day index in local time may be less than
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto
index 28ceb2f..0f7837d 100644
--- a/logger/local_aggregation.proto
+++ b/logger/local_aggregation.proto
@@ -98,18 +98,30 @@
ReportDefinition report = 3;
}
-// A container used by the EventAggregator to store the latest day index for
-// which an Observation has been generated for each report, event code, and
-// window size.
+// A container used by the EventAggregator to store the history of generated
+// Observations, indexed by report.
message AggregatedObservationHistoryStore {
// Keyed by base64-encoded serializations of ReportAggregationKey messages.
map<string, AggregatedObservationHistory> by_report_key = 1;
}
+// A container for the history of Observations that have been generated for a
+// single report.
message AggregatedObservationHistory {
+ // If this is a locally aggregated report, then AggregatedObservationHistory
+ // has one of the following types.
oneof type {
+ // A container for the history of UniqueActivesObservations generated
+ // for a UNIQUE_N_DAY_ACTIVES report.
UniqueActivesObservationHistory unique_actives_history = 1;
+ // A container for the history of PerDeviceCountObservations generated
+ // for a PER_DEVICE_COUNT_STATS report.
+ PerDeviceCountObservationHistory per_device_count_history = 2;
}
+ // A container for the history of ReportParticipationObservations generated
+ // for this report. Unset if ReportParticipationObservations should not be
+ // generated for this report.
+ ReportParticipationObservationHistory report_participation_history = 100;
}
message UniqueActivesObservationHistory {
@@ -117,6 +129,27 @@
map<uint32, HistoryByWindowSize> by_event_code = 1;
}
+message PerDeviceCountObservationHistory {
+ // The history of PerDeviceCount Observations. Keyed by component string.
+ map<string, HistoryByEventCode> by_component = 1;
+}
+
+message ReportParticipationObservationHistory {
+ // The last day index for which a ReportParticipationObservation was
+ // generated for this report.
+ //
+ // TODO(pesk): if ReportParticipationObservation has a window_size
+ // field in the future, consider changing this to a repeated field
+ // where the k-th last_generated field holds the value for window
+ // size k.
+ uint32 last_generated = 1;
+}
+
+message HistoryByEventCode {
+ // Keyed by event code.
+ map<uint32, HistoryByWindowSize> by_event_code = 1;
+}
+
message HistoryByWindowSize {
// Keyed by window size. The value at a window size is the latest day index
// for which an Observation has been generated for this report, event code,
diff --git a/logger/logger_test_utils.cc b/logger/logger_test_utils.cc
index b8eeba4..a998fac 100644
--- a/logger/logger_test_utils.cc
+++ b/logger/logger_test_utils.cc
@@ -24,6 +24,8 @@
namespace cobalt {
+using crypto::byte;
+using crypto::hash::DIGEST_SIZE;
using encoder::ClientSecret;
using rappor::BasicRapporEncoder;
using rappor::RapporConfigHelper;
@@ -33,6 +35,27 @@
namespace logger {
namespace testing {
+namespace {
+// Populates |*hash_out| with the SHA256 of |component|, unless |component|
+// is empty in which case *hash_out is set to the empty string also. An
+// empty string indicates that the component_name feature is not being used.
+// We expect this to be a common case and in this case there is no point
+// in using 32 bytes to represent the empty string. Returns true on success
+// and false on failure (unexpected).
+bool HashComponentNameIfNotEmpty(const std::string& component,
+ std::string* hash_out) {
+ CHECK(hash_out);
+ if (component.empty()) {
+ hash_out->resize(0);
+ return true;
+ }
+ hash_out->resize(DIGEST_SIZE);
+ return cobalt::crypto::hash::Hash(
+ reinterpret_cast<const byte*>(component.data()), component.size(),
+ reinterpret_cast<byte*>(&hash_out->front()));
+}
+} // namespace
+
bool PopulateMetricDefinitions(const char metric_string[],
MetricDefinitions* metric_definitions) {
google::protobuf::TextFormat::Parser parser;
@@ -87,6 +110,17 @@
return expected_obs;
}
+ExpectedReportParticipationObservations
+MakeExpectedReportParticipationObservations(
+ const ExpectedAggregationParams& expected_params, uint32_t day_index) {
+ ExpectedReportParticipationObservations expected_obs;
+
+ for (const auto& report_pair : expected_params.window_sizes) {
+ expected_obs.insert({report_pair.first, day_index});
+ }
+ return expected_obs;
+}
+
bool FetchObservations(std::vector<Observation2>* observations,
const std::vector<uint32_t>& expected_report_ids,
FakeObservationStore* observation_store,
@@ -261,8 +295,8 @@
// when this method is called.
ExpectedAggregationParams expected_params;
// A container for the strings expected to appear in the |data| field of the
- // BasicRapporObservation wrapped by the UniqueActivesObservation for a given
- // MetricReportId, day index, window size, and event code.
+ // BasicRapporObservation wrapped by the UniqueActivesObservation for a
+ // given MetricReportId, day index, window size, and event code.
std::map<std::pair<MetricReportId, uint32_t>,
std::map<uint32_t, std::map<uint32_t, std::string>>>
expected_values;
@@ -348,6 +382,123 @@
return true;
}
+bool CheckPerDeviceCountObservations(
+ ExpectedPerDeviceCountObservations expected_per_device_count_obs,
+ ExpectedReportParticipationObservations expected_report_participation_obs,
+ FakeObservationStore* observation_store,
+ TestUpdateRecipient* update_recipient) {
+ // An ExpectedAggregationParams struct describing the number of
+ // Observations for each report ID which are expected to be
+ // in the FakeObservationStore when this method is called.
+ ExpectedAggregationParams expected_params;
+ std::map<std::string, std::string> component_hashes;
+ for (const auto& id_pair : expected_report_participation_obs) {
+ expected_params.daily_num_obs++;
+ expected_params.num_obs_per_report[id_pair.first]++;
+ expected_params.metric_report_ids.insert(id_pair.first);
+ }
+ for (const auto& id_pair : expected_per_device_count_obs) {
+ for (const auto& window_size_pair : id_pair.second) {
+ auto window_size = window_size_pair.first;
+ expected_params.window_sizes[id_pair.first.first].insert(window_size);
+ for (const auto& expected_obs : window_size_pair.second) {
+ expected_params.daily_num_obs++;
+ expected_params.num_obs_per_report[id_pair.first.first]++;
+ std::string component = std::get<0>(expected_obs);
+ std::string component_hash;
+ HashComponentNameIfNotEmpty(component, &component_hash);
+ component_hashes[component_hash] = component;
+ }
+ }
+ }
+ // Fetch the contents of the ObservationStore and check that each
+ // received Observation corresponds to an element of |expected_values|.
+ std::vector<Observation2> observations;
+ if (!FetchAggregatedObservations(&observations, expected_params,
+ observation_store, update_recipient)) {
+ return false;
+ }
+ std::vector<Observation2> report_participation_obs;
+ std::vector<ObservationMetadata> report_participation_metadata;
+ std::vector<Observation2> per_device_count_obs;
+ std::vector<ObservationMetadata> per_device_count_metadata;
+ for (size_t i = 0; i < observations.size(); i++) {
+ if (observations.at(i).has_report_participation()) {
+ report_participation_obs.push_back(observations.at(i));
+ report_participation_metadata.push_back(
+ *observation_store->metadata_received[i]);
+ } else if (observations.at(i).has_per_device_count()) {
+ per_device_count_obs.push_back(observations.at(i));
+ per_device_count_metadata.push_back(
+ *observation_store->metadata_received[i]);
+ } else {
+ return false;
+ }
+ }
+ // Check the received PerDeviceCountObservations
+ for (size_t i = 0; i < per_device_count_obs.size(); i++) {
+ auto obs_key =
+ std::make_pair(MetricReportId(per_device_count_metadata[i].metric_id(),
+ per_device_count_metadata[i].report_id()),
+ per_device_count_metadata[i].day_index());
+ auto report_iter = expected_per_device_count_obs.find(obs_key);
+ if (report_iter == expected_per_device_count_obs.end()) {
+ return false;
+ }
+ auto obs = per_device_count_obs.at(i);
+ uint32_t obs_window_size = obs.per_device_count().window_size();
+ auto window_iter = report_iter->second.find(obs_window_size);
+ if (window_iter == report_iter->second.end()) {
+ return false;
+ }
+ std::string obs_component_hash =
+ obs.per_device_count().integer_event_obs().component_name_hash();
+ std::string obs_component;
+ auto hash_iter = component_hashes.find(obs_component_hash);
+ if (hash_iter == component_hashes.end()) {
+ return false;
+ } else {
+ obs_component = component_hashes[obs_component_hash];
+ }
+ auto obs_tuple = std::make_tuple(
+ obs_component, obs.per_device_count().integer_event_obs().event_code(),
+ obs.per_device_count().integer_event_obs().value());
+ auto obs_iter = window_iter->second.find(obs_tuple);
+ if (obs_iter == window_iter->second.end()) {
+ return false;
+ }
+ expected_per_device_count_obs.at(obs_key)
+ .at(obs_window_size)
+ .erase(obs_tuple);
+ if (expected_per_device_count_obs.at(obs_key).at(obs_window_size).empty()) {
+ expected_per_device_count_obs.at(obs_key).erase(obs_window_size);
+ }
+ if (expected_per_device_count_obs.at(obs_key).empty()) {
+ expected_per_device_count_obs.erase(obs_key);
+ }
+ }
+ if (!expected_per_device_count_obs.empty()) {
+ return false;
+ }
+
+ // Check the received ReportParticipationObservations
+ for (size_t i = 0; i < report_participation_obs.size(); i++) {
+ auto obs_key = std::make_pair(
+ MetricReportId(report_participation_metadata[i].metric_id(),
+ report_participation_metadata[i].report_id()),
+ report_participation_metadata[i].day_index());
+ if (expected_report_participation_obs.count(obs_key) == 0) {
+ return false;
+ }
+ expected_report_participation_obs.erase(obs_key);
+ }
+ if (!expected_report_participation_obs.empty()) {
+ return false;
+ }
+
+ return true;
+}
+
} // namespace testing
} // namespace logger
} // namespace cobalt
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index 16a25b6..a10ccad 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -9,6 +9,7 @@
#include <memory>
#include <set>
#include <string>
+#include <tuple>
#include <utility>
#include <vector>
@@ -129,6 +130,25 @@
std::map<uint32_t, std::vector<bool>>>
ExpectedUniqueActivesObservations;
+// A representation of a set of expected PerDeviceCountObservations. Used to
+// check the values of PerDeviceCountObservations generated by the
+// EventAggregator.
+//
+// The outer map is keyed by pairs (MetricReportId, day_index), where the day
+// index represents the day index of the expected Observation.
+typedef std::map<
+ std::pair<MetricReportId, uint32_t>,
+ std::map<uint32_t, std::set<std::tuple<std::string, uint32_t, int64_t>>>>
+ ExpectedPerDeviceCountObservations;
+
+// A representation of a set of expected ReportParticipationObservations. Used
+// to check the values of ReportParticipationObservations generated by the
+// EventAggregator. The first element of each pair is the MetricReportId of a
+// report, and the second element represents the day index of an expected
+// Observation for that report. a pair is a a set of window sizes.
+typedef std::set<std::pair<MetricReportId, uint32_t>>
+ ExpectedReportParticipationObservations;
+
// Populates a MetricDefinitions proto message from a serialized representation.
bool PopulateMetricDefinitions(const char metric_string[],
MetricDefinitions* metric_definitions);
@@ -156,6 +176,14 @@
ExpectedUniqueActivesObservations MakeNullExpectedUniqueActivesObservations(
const ExpectedAggregationParams& expected_params, uint32_t day_index);
+// Given an ExpectedAggregationParams struct |expected_params|, return an
+// ExpectedReportParticipationObservations containing a pair
+// (|metric_report_id|, |day_index|) for each MetricReportId |metric_report_id|
+// in |expected_params|.
+ExpectedReportParticipationObservations
+MakeExpectedReportParticipationObservations(
+ const ExpectedAggregationParams& expected_params, uint32_t day_index);
+
// Populates |observations| with the contents of a FakeObservationStore.
// |observations| should be a vector whose size is equal to the number
// of expected observations. Checks the the ObservationStore contains
@@ -207,6 +235,18 @@
FakeObservationStore* observation_store,
TestUpdateRecipient* update_recipient);
+// Checks that the Observations contained in a FakeObservationStore are exactly
+// the PerDeviceCountObservations and ReportParticipationObservations that
+// should be generated for a single day index given a representation of the
+// expected activity indicators for that day, for each PerDeviceCount report,
+// for each window size and event code, for a config whose locally aggregated
+// reports are all of type PER_DEVICE_COUNT_STATS.
+bool CheckPerDeviceCountObservations(
+ ExpectedPerDeviceCountObservations expected_per_device_count_obs,
+ ExpectedReportParticipationObservations expected_report_participation_obs,
+ FakeObservationStore* observation_store,
+ TestUpdateRecipient* update_recipient);
+
} // namespace testing
} // namespace logger
} // namespace cobalt