[EventAggregator] Generate PerDeviceCountObservations The EventAggregator's GenerateObservations() method now generates observations for PER_DEVICE_COUNT_STATS reports. Change-Id: I158ccdb81e015886457e61aa19808274520d0c23
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc index 706ef72..26ce7e9 100644 --- a/logger/event_aggregator.cc +++ b/logger/event_aggregator.cc
@@ -5,8 +5,10 @@ #include "logger/event_aggregator.h" #include <algorithm> +#include <map> #include <string> #include <utility> +#include <vector> #include "algorithms/rappor/rappor_config_helper.h" #include "config/metric_definition.pb.h" @@ -427,6 +429,12 @@ if (final_day_index_local == 0u) { final_day_index_local = final_day_index_utc; } + if (std::min(final_day_index_utc, final_day_index_local) < + backfill_days_ + kMaxAllowedAggregationWindowSize) { + LOG(ERROR) << "GenerateObservations: Day index of Observation must be >= " + "backfill_days_ + kMaxAllowedAggregationWindowSize."; + return kInvalidArguments; + } // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to // generate observations. auto local_aggregate_store = CopyLocalAggregateStore(); @@ -484,10 +492,27 @@ if (status != kOK) { return status; } + break; } default: continue; } + break; + } + case MetricDefinition::EVENT_COUNT: { + switch (report.report_type()) { + case ReportDefinition::PER_DEVICE_COUNT_STATS: { + auto status = GeneratePerDeviceCountObservations( + metric_ref, pair.first, pair.second, final_day_index); + if (status != kOK) { + return status; + } + break; + } + default: + continue; + } + break; } default: continue; @@ -501,6 +526,12 @@ if (day_index_local == 0u) { day_index_local = day_index_utc; } + if (std::min(day_index_utc, day_index_local) < + backfill_days_ + kMaxAllowedAggregationWindowSize) { + LOG(ERROR) << "GarbageCollect: Day index must be >= backfill_days_ + " + "kMaxAllowedAggregationWindowSize."; + return kInvalidArguments; + } auto locked = protected_aggregate_store_.lock(); for (auto pair : locked->local_aggregate_store.by_report_key()) { uint32_t day_index; @@ -532,10 +563,6 @@ "window size."; return kInvalidConfig; } - if (day_index < backfill_days_ + max_window_size) { - LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size."; - return kInvalidArguments; - } // For each ReportAggregates, descend to and iterate over the sub-map of // local aggregates keyed by day index. Keep buckets with day indices // greater than |day_index| - |backfill_days_| - |max_window_size|, and @@ -723,6 +750,10 @@ const MetricRef metric_ref, const std::string& report_key, const ReportAggregates& report_aggregates, uint32_t num_event_codes, uint32_t final_day_index) { + // The earliest day index for which we might need to generate an Observation. + // GenerateObservations() has checked that this value is > 0. + auto backfill_period_start = uint32_t(final_day_index - backfill_days_); + for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) { auto daily_aggregates = report_aggregates.unique_actives_aggregates().by_event_code().find( @@ -746,8 +777,7 @@ // generated on this invocation. auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code, window_size); - auto first_day_index = - std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_)); + auto first_day_index = std::max(last_gen + 1, backfill_period_start); // The latest day index on which |event_type| is known to have // occurred, so far. This value will be updated as we search // forward from the earliest day index belonging to a window of @@ -794,5 +824,203 @@ return kOK; } +////////// GeneratePerDeviceCountObservations and helper methods ///////////// + +uint32_t EventAggregator::PerDeviceCountLastGeneratedDayIndex( + const std::string& report_key, const std::string& component, + uint32_t event_code, uint32_t window_size) const { + const auto& report_history = obs_history_.by_report_key().find(report_key); + if (report_history == obs_history_.by_report_key().end()) { + return 0u; + } + if (!report_history->second.has_per_device_count_history()) { + return 0u; + } + const auto& component_history = + report_history->second.per_device_count_history().by_component().find( + component); + if (component_history == + report_history->second.per_device_count_history().by_component().end()) { + return 0u; + } + const auto& event_code_history = + component_history->second.by_event_code().find(event_code); + if (event_code_history == component_history->second.by_event_code().end()) { + return 0u; + } + const auto& window_size_history = + event_code_history->second.by_window_size().find(window_size); + if (window_size_history == + event_code_history->second.by_window_size().end()) { + return 0u; + } + return window_size_history->second; +} + +Status EventAggregator::GenerateSinglePerDeviceCountObservation( + const MetricRef metric_ref, const ReportDefinition* report, + uint32_t obs_day_index, const std::string& component, uint32_t event_code, + uint32_t window_size, int64_t count) const { + auto encoder_result = encoder_->EncodePerDeviceCountObservation( + metric_ref, report, obs_day_index, component, event_code, count, + window_size); + if (encoder_result.status != kOK) { + return encoder_result.status; + } + if (encoder_result.observation == nullptr || + encoder_result.metadata == nullptr) { + LOG(ERROR) << "Failed to encode PerDeviceCountObservation"; + return kOther; + } + + const auto& writer_status = observation_writer_->WriteObservation( + *encoder_result.observation, std::move(encoder_result.metadata)); + if (writer_status != kOK) { + return writer_status; + } + return kOK; +} + +Status EventAggregator::GeneratePerDeviceCountObservations( + const MetricRef metric_ref, const std::string& report_key, + const ReportAggregates& report_aggregates, uint32_t final_day_index) { + // Get the window sizes for this report and sort them in increasing order. + // TODO(pesk): Instead, have UpdateAggregationConfigs() store the window sizes + // in increasing order. + std::vector<uint32_t> window_sizes; + for (uint32_t window_size : + report_aggregates.aggregation_config().report().window_size()) { + if (window_size > kMaxAllowedAggregationWindowSize) { + LOG(WARNING) << "GeneratePerDeviceCountObservations ignoring a window " + "size exceeding the maximum allowed value"; + continue; + } + window_sizes.push_back(window_size); + } + std::sort(window_sizes.begin(), window_sizes.end()); + + // The first day index for which we might have to generate an Observation. + // GenerateObservations() has checked that this value is > 0. + auto backfill_period_start = uint32_t(final_day_index - backfill_days_); + + // Generate any necessary PerDeviceCountObservations for this report. + for (const auto& component_pair : + report_aggregates.count_aggregates().by_component()) { + const auto& component = component_pair.first; + for (const auto& event_code_pair : component_pair.second.by_event_code()) { + auto event_code = event_code_pair.first; + const auto& event_code_aggregates = event_code_pair.second; + // Populate a helper map keyed by day indices which belong to the range + // [|backfill_period_start|, |final_day_index|]. The value at a day index + // is the list of window sizes, in increasing order, for which an + // Observation should be generated for that day index. + std::map<uint32_t, std::vector<uint32_t>> window_sizes_by_obs_day; + for (auto window_size : window_sizes) { + auto last_gen = PerDeviceCountLastGeneratedDayIndex( + report_key, component, event_code, window_size); + auto first_day_index = std::max(last_gen + 1, backfill_period_start); + for (auto obs_day_index = first_day_index; + obs_day_index <= final_day_index; obs_day_index++) { + window_sizes_by_obs_day[obs_day_index].push_back(window_size); + } + } + // Iterate over the day indices |obs_day_index| for which we might need to + // generate an Observation. For each day index, generate an Observation + // for each needed window size. + // + // More precisely, for each needed window size, compute the count over the + // window and generate a PerDeviceCountObservation only if the count is + // nonzero. Whether or not the count was zero, update the + // AggregatedObservationHistory for this report, component, event code, + // and window size with |obs_day_index| as the most recent date of + // Observation generation. This reflects the fact that the count was + // computed for the window ending on that date, even though an Observation + // is only sent if the count is nonzero. + for (auto obs_day_index = backfill_period_start; + obs_day_index <= final_day_index; obs_day_index++) { + const auto& window_sizes = window_sizes_by_obs_day.find(obs_day_index); + if (window_sizes == window_sizes_by_obs_day.end()) { + continue; + } + int64_t count = 0; + uint32_t num_days = 0; + for (auto window_size : window_sizes->second) { + while (num_days < window_size) { + const auto& day_aggregates = + event_code_aggregates.by_day_index().find(obs_day_index - + num_days); + if (day_aggregates != event_code_aggregates.by_day_index().end()) { + count += day_aggregates->second.count_daily_aggregate().count(); + } + num_days++; + } + if (count != 0) { + auto status = GenerateSinglePerDeviceCountObservation( + metric_ref, &report_aggregates.aggregation_config().report(), + obs_day_index, component, event_code, window_size, count); + if (status != kOK) { + return status; + } + } + // Update |obs_history_| with the latest date of Observation + // generation for this report, component, event code, and window + // size. + (*(*(*(*obs_history_.mutable_by_report_key())[report_key] + .mutable_per_device_count_history() + ->mutable_by_component())[component] + .mutable_by_event_code())[event_code] + .mutable_by_window_size())[window_size] = obs_day_index; + } + } + } + } + // Generate any necessary ReportParticipationObservations for this report. + auto participation_last_gen = + ReportParticipationLastGeneratedDayIndex(report_key); + auto participation_first_day_index = + std::max(participation_last_gen + 1, backfill_period_start); + for (auto obs_day_index = participation_first_day_index; + obs_day_index <= final_day_index; obs_day_index++) { + GenerateSingleReportParticipationObservation( + metric_ref, &report_aggregates.aggregation_config().report(), + obs_day_index); + (*obs_history_.mutable_by_report_key())[report_key] + .mutable_report_participation_history() + ->set_last_generated(obs_day_index); + } + return kOK; +} + +uint32_t EventAggregator::ReportParticipationLastGeneratedDayIndex( + const std::string& report_key) const { + const auto& report_history = obs_history_.by_report_key().find(report_key); + if (report_history == obs_history_.by_report_key().end()) { + return 0u; + } + return report_history->second.report_participation_history().last_generated(); +} + +Status EventAggregator::GenerateSingleReportParticipationObservation( + const MetricRef metric_ref, const ReportDefinition* report, + uint32_t obs_day_index) const { + auto encoder_result = encoder_->EncodeReportParticipationObservation( + metric_ref, report, obs_day_index); + if (encoder_result.status != kOK) { + return encoder_result.status; + } + if (encoder_result.observation == nullptr || + encoder_result.metadata == nullptr) { + LOG(ERROR) << "Failed to encode ReportParticipationObservation"; + return kOther; + } + + const auto& writer_status = observation_writer_->WriteObservation( + *encoder_result.observation, std::move(encoder_result.metadata)); + if (writer_status != kOK) { + return writer_status; + } + return kOK; +} + } // namespace logger } // namespace cobalt
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h index 8fbaf24..a742ce3 100644 --- a/logger/event_aggregator.h +++ b/logger/event_aggregator.h
@@ -27,20 +27,19 @@ namespace cobalt { namespace logger { -// The EventAggregator class manages an in-memory store of aggregated values -// of Events logged for locally aggregated report types. For each day, this -// LocalAggregateStore contains an aggregate of the values of logged Events of -// a given event code over that day. These daily aggregates are used to -// produce Observations of values aggregated over a rolling window of size -// specified in the ReportDefinition. +// The EventAggregator manages an in-memory store of aggregated Event values, +// indexed by report, day index, and other dimensions specific to the report +// type (e.g. event code). Periodically, this data is used to generate +// Observations representing aggregates of Event values over a day, week, month, +// etc. // // Each Logger interacts with the EventAggregator in the following way: // (1) When the Logger is created, it calls UpdateAggregationConfigs() to -// provide the EventAggregator with the configurations of its locally -// aggregated reports. -// (2) When logging an Event for a locally aggregated report, a Logger calls an -// Update*Aggregation() method with the Event and the ReportAggregationKey of -// the report. +// provide the EventAggregator with its view of Cobalt's metric and report +// registry. +// (2) When logging an Event for a locally aggregated report, a Logger +// calls an Update*Aggregation() method with the Event and the +// ReportAggregationKey of the report. // // A worker thread does the following tasks at intervals specified in the // EventAggregator's constructor: @@ -115,8 +114,8 @@ // Start the worker thread. void Start(); - // Updates the EventAggregator's view of the set of locally aggregated - // report configurations. + // Updates the EventAggregator's view of the Cobalt metric and report + // registry. // // This method may be called multiple times during the EventAggregator's // lifetime. If the EventAggregator is provided with a report whose tuple @@ -266,6 +265,22 @@ uint32_t event_code, uint32_t window_size) const; + // Returns the most recent day index for which an Observation was generated + // for a given PER_DEVICE_COUNT_STATS report, component, event code, + // and window size, according to |obs_history_|. Returns 0 if no Observation + // has been generated for the given arguments. + uint32_t PerDeviceCountLastGeneratedDayIndex(const std::string& report_key, + const std::string& component, + uint32_t event_code, + uint32_t window_size) const; + + // Returns the most recent day index for which a + // ReportParticipationObservation was generated for a given report, according + // to |obs_history_|. Returns 0 if no Observation has been generated for the + // given arguments. + uint32_t ReportParticipationLastGeneratedDayIndex( + const std::string& report_key) const; + // For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation // for each event code of the parent metric, for each window size of the // report, for the window of that size ending on |final_day_index|, unless @@ -290,6 +305,40 @@ uint32_t window_size, bool was_active) const; + // For a fixed report of type PER_DEVICE_COUNT_STATS, generates a + // PerDeviceCountObservation for each tuple (component, event code, window + // size) for which a positive number of events with that event code occurred + // with that component during the window of that size ending on + // |final_day_index|, unless an Observation with those parameters has been + // generated in the past. Also generates PerDeviceCountObservations for days + // in the backfill period if needed. + // + // In addition to PerDeviceCountObservations, generates a + // ReportParticipationObservation for |final_day_index| and any needed days in + // the backfill period. These ReportParticipationObservations are used by the + // PerDeviceCount report generator to infer the fleet-wide number of devices + // for which the event count associated to each tuple (component, event code, + // window size) was zero. + // + // Observations are not generated for aggregation windows larger than + // |kMaxAllowedAggregationWindowSize|. + Status GeneratePerDeviceCountObservations( + const MetricRef metric_ref, const std::string& report_key, + const ReportAggregates& report_aggregates, uint32_t final_day_index); + + // Helper method called by GeneratePerDeviceCountObservations() to generate + // and write a single PerDeviceCountObservation. + Status GenerateSinglePerDeviceCountObservation( + const MetricRef metric_ref, const ReportDefinition* report, + uint32_t obs_day_index, const std::string& component, uint32_t event_code, + uint32_t window_size, int64_t count) const; + + // Helper method called by GeneratePerDeviceCountObservations() to generate + // and write a single ReportParticipationObservation. + Status GenerateSingleReportParticipationObservation( + const MetricRef metric_ref, const ReportDefinition* report, + uint32_t obs_day_index) const; + // Returns a copy of the LocalAggregateStore. Does not assume that the // caller holds the mutex of |protected_aggregate_store_|. LocalAggregateStore CopyLocalAggregateStore() {
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc index 6ba882e..fe266bc 100644 --- a/logger/event_aggregator_test.cc +++ b/logger/event_aggregator_test.cc
@@ -34,13 +34,17 @@ namespace logger { +using testing::CheckPerDeviceCountObservations; using testing::CheckUniqueActivesObservations; using testing::ExpectedAggregationParams; +using testing::ExpectedPerDeviceCountObservations; +using testing::ExpectedReportParticipationObservations; using testing::ExpectedUniqueActivesObservations; using testing::FakeObservationStore; using testing::FetchAggregatedObservations; using testing::MakeAggregationConfig; using testing::MakeAggregationKey; +using testing::MakeExpectedReportParticipationObservations; using testing::MakeNullExpectedUniqueActivesObservations; using testing::MockConsistentProtoStore; using testing::PopulateMetricDefinitions; @@ -122,14 +126,14 @@ static const ExpectedAggregationParams kExpectedParams = { /* The total number of locally aggregated Observations that should be generated for each day index. */ - 9, + 10, /* The MetricReportIds of the locally aggregated reports in this configuration. */ {kErrorsOccurredMetricReportId, kConnectionFailuresMetricReportId}, /* The number of Observations which should be generated for each day index, broken down by MetricReportId. */ {{kErrorsOccurredMetricReportId, 9}, - {kConnectionFailuresMetricReportId, 0}}, + {kConnectionFailuresMetricReportId, 1}}, /* The number of event codes for each report of type UNIQUE_N_DAY_ACTIVES, by MetricReportId. */ {{kErrorsOccurredMetricReportId, 3}}, @@ -345,21 +349,19 @@ // Properties of the locally aggregated Observations which should be generated // for the reports in |kMetricDefinitions|, assuming that no events have ever // been logged for those reports. -// -// TODO(pesk): update these fields once the EventAggregator is generating -// observations for PerDeviceCount reports. static const ExpectedAggregationParams kPerDeviceCountExpectedParams = { /* The total number of Observations that should be generated for a day index. */ - 0, + 2, /* The MetricReportIds of the locally aggregated reports in this configuration. */ {kConnectionFailuresMetricReportId, kSettingsChangedMetricReportId}, /* The number of Observations which should be generated for a day index, broken down by MetricReportId. */ - {{kConnectionFailuresMetricReportId, 0}, - {kSettingsChangedMetricReportId, 0}}, - /* Omitted because this config contains no UNIQUE_N_DAY_ACTIVES reports. */ + {{kConnectionFailuresMetricReportId, 1}, + {kSettingsChangedMetricReportId, 1}}, + /* The number of event codes for each UNIQUE_N_DAY_ACTIVES report. Omitted + because this config contains no UNIQUE_N_DAY_ACTIVES reports. */ {}, /* The set of window sizes for each MetricReportId. */ {{kConnectionFailuresMetricReportId, {1}}, @@ -472,11 +474,11 @@ event_aggregator_.reset(new EventAggregator( encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(), obs_history_proto_store_.get())); - // Provide the EventAggregator with a mock clock starting at 1 year after + // Provide the EventAggregator with a mock clock starting at 10 years after // the beginning of time. mock_clock_ = new IncrementingClock(std::chrono::system_clock::duration(0)); - mock_clock_->set_time( - std::chrono::system_clock::time_point(std::chrono::seconds(kYear))); + mock_clock_->set_time(std::chrono::system_clock::time_point( + std::chrono::seconds(10 * kYear))); event_aggregator_->SetClock(mock_clock_); day_store_created_ = CurrentDayIndex(); } @@ -2101,7 +2103,7 @@ // Tests that the expected UniqueActivesObservations are generated when events // are logged over multiple days and when Observations are backfilled for some // days during that period, and when the LocalAggregateStore is -// garbage-collected after each all to GenerateObservations(). +// garbage-collected after each call to GenerateObservations(). // // The test sets the number of backfill days to 3. // @@ -2361,6 +2363,851 @@ } } +// Tests that EventAggregator::GenerateObservations() returns a positive +// status and that the expected number of Observations is generated after +// some CountEvents have been logged for PerDeviceCount reports, without any +// garbage collection. +// +// For 35 days, logs a positive number of events each day for the +// ConnectionFailures_PerDeviceCount report with "component_A" and for +// the SettingsChanged_PerDeviceCount report with "component_B", all with event +// code 0. +// +// Each day, calls GenerateObservations() with the day index of the previous +// day. Checks that a positive status is returned and that the +// FakeObservationStore has received the expected number of new observations +// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|. +TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservations) { + int num_days = 1; + std::vector<Observation2> observations(0); + ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams; + for (int offset = 0; offset < num_days; offset++) { + auto day_index = CurrentDayIndex(); + observations.clear(); + ResetObservationStore(); + EXPECT_EQ(kOK, GenerateObservations(day_index - 1)); + EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params, + observation_store_.get(), + update_recipient_.get())); + for (int i = 0; i < 2; i++) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 1)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_B", 0u, 5)); + } + // If this is the first time we're logging events, update the expected + // numbers of generated Observations to account for the logged events. + // For each report, for each window size, expect 1 Observation more than if + // no events had been logged. + if (offset == 0) { + expected_params.daily_num_obs += 3; + expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] += + 1; + expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2; + } + AdvanceClock(kDay); + } + observations.clear(); + ResetObservationStore(); + EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex() - 1)); + EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params, + observation_store_.get(), + update_recipient_.get())); +} + +// Tests that EventAggregator::GenerateObservations() returns a positive +// status and that the expected number of Observations is generated after +// some CountEvents have been logged for PerDeviceCount reports over multiple +// days, and when the LocalAggregateStore is garbage-collected each day. +// +// For 35 days, logs a positive number of events each day for the +// ConnectionFailures_PerDeviceCount report with "component_A" and for +// the SettingsChanged_PerDeviceCount report with "component_B", all with event +// code 0. +// +// Each day, calls GenerateObservations() with the day index of the previous +// day. Checks that a positive status is returned and that the +// FakeObservationStore has received the expected number of new observations +// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|. +TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithGc) { + int num_days = 35; + std::vector<Observation2> observations(0); + ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams; + for (int offset = 0; offset < num_days; offset++) { + auto day_index = CurrentDayIndex(); + observations.clear(); + ResetObservationStore(); + EXPECT_EQ(kOK, GenerateObservations(day_index - 1)); + EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params, + observation_store_.get(), + update_recipient_.get())); + EXPECT_EQ(kOK, GarbageCollect(day_index)); + for (int i = 0; i < 2; i++) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 1)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_B", 0u, 5)); + } + // If this is the first time we're logging events, update the expected + // numbers of generated Observations to account for the logged events. + // For each report, for each window size, expect 1 Observation more than if + // no events had been logged. + if (offset == 0) { + expected_params.daily_num_obs += 3; + expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] += + 1; + expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2; + } + AdvanceClock(kDay); + } + observations.clear(); + ResetObservationStore(); + auto day_index = CurrentDayIndex(); + EXPECT_EQ(kOK, GenerateObservations(day_index - 1)); + EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params, + observation_store_.get(), + update_recipient_.get())); + EXPECT_EQ(kOK, GarbageCollect(day_index)); +} + +// Tests that GenerateObservations() returns a positive status and that the +// expected number of Observations is generated when events are logged over +// multiple days and some of those days' Observations are backfilled, without +// any garbage collection of the LocalAggregateStore. +// +// Sets the |backfill_days_| field of the EventAggregator to 3. +// +// Logging pattern: +// For 35 days, logs 2 events each day for the +// SomeErrorsOccurred_UniqueDevices report and 2 events for the +// SomeFeaturesActive_Unique_Devices report, all with event code 0. +// +// Observation generation pattern: +// Calls GenerateObservations() on the 1st through 5th and the 7th out of +// every 10 days, for 35 days. +// +// Expected numbers of Observations: +// It is expected that 4 days' worth of Observations are generated on +// the first day of every 10 (the day index for which GenerateObservations() +// was called, plus 3 days of backfill), that 1 day's worth of Observations +// are generated on the 2nd through 5th day of every 10, that 2 days' +// worth of Observations are generated on the 7th day of every 10 (the +// day index for which GenerateObservations() was called, plus 1 day of +// backfill), and that no Observations are generated on the remaining days. +TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithBackfill) { + // Set |backfill_days_| to 3. + size_t backfill_days = 3; + SetBackfillDays(backfill_days); + // Log 2 events each day for 35 days. Call GenerateObservations() on the + // first 5 day indices, and the 7th, out of every 10. + for (int offset = 0; offset < 35; offset++) { + auto day_index = CurrentDayIndex(); + for (int i = 0; i < 2; i++) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 1)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_B", 0u, 5)); + } + auto num_obs_before = observation_store_->messages_received.size(); + if (offset % 10 < 5 || offset % 10 == 6) { + EXPECT_EQ(kOK, GenerateObservations(day_index)); + } + auto num_obs_after = observation_store_->messages_received.size(); + EXPECT_GE(num_obs_after, num_obs_before); + // Check that the expected daily number of Observations was generated. + switch (offset % 10) { + case 0: + // If this is the first day of logging, expect 3 Observations for each + // day in the backfill period and 6 Observations for the current day. + if (offset == 0) { + EXPECT_EQ( + (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) + + kPerDeviceCountExpectedParams.daily_num_obs + 3, + num_obs_after - num_obs_before); + } else { + // If this is another day whose offset is a multiple of 10, expect 6 + // Observations for each day in the backfill period as well as the + // current day. + EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * + (backfill_days + 1), + num_obs_after - num_obs_before); + } + break; + case 1: + case 2: + case 3: + case 4: + // Expect 6 Observations for this day. + EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3, + num_obs_after - num_obs_before); + break; + case 6: + // Expect 6 Observations for each of today and yesterday. + EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2, + num_obs_after - num_obs_before); + break; + default: + EXPECT_EQ(num_obs_after, num_obs_before); + } + AdvanceClock(kDay); + } +} + +// Tests that GenerateObservations() returns a positive status and that the +// expected number of Observations is generated when events are logged over +// multiple days and some of those days' Observations are backfilled, and when +// the LocalAggregateStore is garbage-collected after each call to +// GenerateObservations(). +// +// Sets the |backfill_days_| field of the EventAggregator to 3. +// +// Logging pattern: +// For 35 days, logs 2 events each day for the +// ConnectionFailures_PerDeviceCount report with "component_A" and 2 events for +// the SettingsChanged_PerDeviceCount report with "component_B", all with event +// code 0. +// +// Observation generation pattern: +// Calls GenerateObservations() on the 1st through 5th and the 7th out of +// every 10 days, for 35 days. Garbage-collects the LocalAggregateStore after +// each call. +// +// Expected numbers of Observations: +// It is expected that 4 days' worth of Observations are generated on +// the first day of every 10 (the day index for which GenerateObservations() +// was called, plus 3 days of backfill), that 1 day's worth of Observations +// are generated on the 2nd through 5th day of every 10, that 2 days' +// worth of Observations are generated on the 7th day of every 10 (the +// day index for which GenerateObservations() was called, plus 1 day of +// backfill), and that no Observations are generated on the remaining days. +TEST_F(PerDeviceCountEventAggregatorTest, + GenerateObservationsWithBackfillAndGc) { + int num_days = 35; + // Set |backfill_days_| to 3. + size_t backfill_days = 3; + SetBackfillDays(backfill_days); + // Log 2 events each day for 35 days. Call GenerateObservations() on the + // first 5 day indices, and the 7th, out of every 10. + for (int offset = 0; offset < num_days; offset++) { + auto day_index = CurrentDayIndex(); + for (int i = 0; i < 2; i++) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 1)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_B", 0u, 5)); + } + auto num_obs_before = observation_store_->messages_received.size(); + if (offset % 10 < 5 || offset % 10 == 6) { + EXPECT_EQ(kOK, GenerateObservations(day_index)); + EXPECT_EQ(kOK, GarbageCollect(day_index)); + } + auto num_obs_after = observation_store_->messages_received.size(); + EXPECT_GE(num_obs_after, num_obs_before); + // Check that the expected daily number of Observations was generated. + switch (offset % 10) { + case 0: + // If this is the first day of logging, expect 3 Observations for each + // day in the backfill period and 6 Observations for the current day. + if (offset == 0) { + EXPECT_EQ( + (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) + + kPerDeviceCountExpectedParams.daily_num_obs + 3, + num_obs_after - num_obs_before); + } else { + // If this is another day whose offset is a multiple of 10, expect 6 + // Observations for each day in the backfill period as well as the + // current day. + EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * + (backfill_days + 1), + num_obs_after - num_obs_before); + } + break; + case 1: + case 2: + case 3: + case 4: + // Expect 6 Observations for this day. + EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3, + num_obs_after - num_obs_before); + break; + case 6: + // Expect 6 Observations for each of today and yesterday. + EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2, + num_obs_after - num_obs_before); + break; + default: + EXPECT_EQ(num_obs_after, num_obs_before); + } + AdvanceClock(kDay); + } +} + +// Generate Observations without logging any events, and check that the +// resulting Observations are as expected: 1 ReportParticipationObservation for +// each PER_DEVICE_COUNT_STATS report in the config, and no +// PerDeviceCountObservations. +TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesNoEvents) { + auto current_day_index = CurrentDayIndex(); + EXPECT_EQ(kOK, GenerateObservations(current_day_index)); + auto expected_report_participation_obs = + MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams, + current_day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + {}, expected_report_participation_obs, observation_store_.get(), + update_recipient_.get())); +} + +// Check that the expected PerDeviceCountObservations and +// ReportParticipationObservations are generated when GenerateObservations() is +// called after logging some events for PER_DEVICE_COUNT_STATS reports over a +// single day index. +TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesSingleDay) { + auto day_index = CurrentDayIndex(); + // Log several events on |day_index|. + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 5)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_B", 0u, 5)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 0u, 5)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId, + day_index, "component_A", 1u, 5)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_C", 0u, 5)); + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "component_C", 0u, 5)); + // Generate locally aggregated Observations for |day_index|. + EXPECT_EQ(kOK, GenerateObservations(day_index)); + + // Form the expected Observations. + auto expected_report_participation_obs = + MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams, + day_index); + ExpectedPerDeviceCountObservations expected_per_device_count_obs; + expected_per_device_count_obs[{kConnectionFailuresMetricReportId, + day_index}][1] = { + {"component_A", 0u, 10}, {"component_A", 1u, 5}, {"component_B", 0u, 5}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}] + [7] = {{"component_C", 0u, 10}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}] + [30] = {{"component_C", 0u, 10}}; + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); +} + +// Checks that PerDeviceCountObservations with the expected values are +// generated when some events have been logged for a PER_DEVICE_COUNT +// report for over multiple days and GenerateObservations() is called each +// day, without garbage collection or backfill. +// +// Logged events for the SettingsChanged_PerDeviceCount metric on the i-th day: +// +// i (component, event code, count) +// ----------------------------------------------------------------------- +// 0 +// 1 ("A", 1, 3) +// 2 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2) +// 3 ("A", 1, 3) +// 4 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2) +// 5 ("A", 1, 3) +// 6 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2) +// 7 ("A", 1, 3) +// 8 ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2) +// 9 ("A", 1, 3) +// +// Expected PerDeviceCountObservations for the SettingsChanged_PerDeviceCount +// report on the i-th day: +// +// (i, window size) (component, event code, count) +// ----------------------------------------------------------------------- +// (0, 7) +// (0, 30) +// (1, 7) ("A", 1, 3) +// (1, 30) ("A", 1, 3) +// (2, 7) ("A", 1, 6), ("A", 2, 3), ("B", 1, 2) +// (2, 30) ("A", 1, 6), ("A", 2, 3), ("B", 1, 2) +// (3, 7) ("A", 1, 9), ("A", 2, 3), ("B", 1, 2) +// (3, 30) ("A", 1, 9), ("A", 2, 3), ("B", 1, 2) +// (4, 7) ("A", 1, 12), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2) +// (4, 30) ("A", 1, 12), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2) +// (5, 7) ("A", 1, 15), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2) +// (5, 30) ("A", 1, 15), ("A", 2, 6), ("B", 1, 4), ("B", 2, 2) +// (6, 7) ("A", 1, 18), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2) +// (6, 30) ("A", 1, 18), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2) +// (7, 7) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2) +// (7, 30) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 2) +// (8, 7) ("A", 1, 21), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4) +// (8, 30) ("A", 1, 24), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4) +// (9, 7) ("A", 1, 21), ("A", 2, 9), ("B", 1, 6), ("B", 2, 4) +// (9, 30) ("A", 1, 27), ("A", 2, 12), ("B", 1, 8), ("B", 2, 4) +// +// In addition, expect 2 ReportParticipationObservations each day, 1 for each +// of ConnectionFailures_PerDeviceCount and SettingsChanged_PerDeviceCount. +TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesMultiDay) { + auto start_day_index = CurrentDayIndex(); + // Form expected Observations for the 10 days of logging. + uint32_t num_days = 10; + std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs( + num_days); + std::vector<ExpectedReportParticipationObservations> + expected_report_participation_obs(num_days); + for (uint32_t offset = 0; offset < num_days; offset++) { + expected_report_participation_obs[offset] = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, start_day_index + offset); + } + expected_per_device_count_obs[0] = {}; + expected_per_device_count_obs[1][{kSettingsChangedMetricReportId, + start_day_index + 1}] = { + {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}}; + expected_per_device_count_obs[2][{kSettingsChangedMetricReportId, + start_day_index + 2}] = { + {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_per_device_count_obs[3][{kSettingsChangedMetricReportId, + start_day_index + 3}] = { + {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_per_device_count_obs[4][{kSettingsChangedMetricReportId, + start_day_index + 4}] = { + {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[5][{kSettingsChangedMetricReportId, + start_day_index + 5}] = { + {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[6][{kSettingsChangedMetricReportId, + start_day_index + 6}] = { + {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[7][{kSettingsChangedMetricReportId, + start_day_index + 7}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[8][{kSettingsChangedMetricReportId, + start_day_index + 8}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + expected_per_device_count_obs[9][{kSettingsChangedMetricReportId, + start_day_index + 9}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + + for (uint32_t offset = 0; offset < 1; offset++) { + auto day_index = CurrentDayIndex(); + for (uint32_t event_code = 1; event_code < 3; event_code++) { + if (offset > 0 && offset % event_code == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "A", event_code, 3)); + } + if (offset > 0 && offset % (2 * event_code) == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "B", event_code, 2)); + } + } + // Clear the FakeObservationStore. + ResetObservationStore(); + // Generate locally aggregated Observations. + EXPECT_EQ(kOK, GenerateObservations(day_index)); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs[offset], + expected_report_participation_obs[offset], observation_store_.get(), + update_recipient_.get())) + << "offset = " << offset; + AdvanceClock(kDay); + } +} + +// Repeat the CheckObservationValuesMultiDay test, this time calling +// GarbageCollect() after each call to GenerateObservations. +// +// The logging pattern and set of Observations for each day index is the same +// as in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See +// that test for documentation. +TEST_F(PerDeviceCountEventAggregatorTest, + CheckObservationValuesMultiDayWithGarbageCollection) { + auto start_day_index = CurrentDayIndex(); + // Form expected Observations for the 10 days of logging. + uint32_t num_days = 10; + std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs( + num_days); + std::vector<ExpectedReportParticipationObservations> + expected_report_participation_obs(num_days); + for (uint32_t offset = 0; offset < num_days; offset++) { + expected_report_participation_obs[offset] = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, start_day_index + offset); + } + expected_per_device_count_obs[0] = {}; + expected_per_device_count_obs[1][{kSettingsChangedMetricReportId, + start_day_index + 1}] = { + {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}}; + expected_per_device_count_obs[2][{kSettingsChangedMetricReportId, + start_day_index + 2}] = { + {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_per_device_count_obs[3][{kSettingsChangedMetricReportId, + start_day_index + 3}] = { + {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_per_device_count_obs[4][{kSettingsChangedMetricReportId, + start_day_index + 4}] = { + {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[5][{kSettingsChangedMetricReportId, + start_day_index + 5}] = { + {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[6][{kSettingsChangedMetricReportId, + start_day_index + 6}] = { + {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[7][{kSettingsChangedMetricReportId, + start_day_index + 7}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[8][{kSettingsChangedMetricReportId, + start_day_index + 8}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + expected_per_device_count_obs[9][{kSettingsChangedMetricReportId, + start_day_index + 9}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + + for (uint32_t offset = 0; offset < 10; offset++) { + auto day_index = CurrentDayIndex(); + for (uint32_t event_code = 1; event_code < 3; event_code++) { + if (offset > 0 && offset % event_code == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "A", event_code, 3)); + } + if (offset > 0 && offset % (2 * event_code) == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "B", event_code, 2)); + } + } + // Advance |mock_clock_| by 1 day. + AdvanceClock(kDay); + // Clear the FakeObservationStore. + ResetObservationStore(); + // Generate locally aggregated Observations and garbage-collect the + // LocalAggregateStore, both for the previous day as measured by + // |mock_clock_|. Back up the LocalAggregateStore and + // AggregatedObservationHistoryStore. + DoScheduledTasksNow(); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs[offset], + expected_report_participation_obs[offset], observation_store_.get(), + update_recipient_.get())); + } +} + +// Tests that the expected PerDeviceCountObservations are generated when events +// are logged over multiple days and when Observations are backfilled for some +// days during that period, without any garbage-collection of the +// LocalAggregateStore. +// +// The logging pattern and set of Observations for each day index is the same as +// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See +// that test for documentation. +TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesWithBackfill) { + auto start_day_index = CurrentDayIndex(); + // Set |backfill_days_| to 3. + size_t backfill_days = 3; + SetBackfillDays(backfill_days); + // Log events for 9 days. Call GenerateObservations() on the first 6 day + // indices, and the 9th. + uint32_t num_days = 9; + for (uint32_t offset = 0; offset < num_days; offset++) { + auto day_index = CurrentDayIndex(); + ResetObservationStore(); + for (uint32_t event_code = 1; event_code < 3; event_code++) { + if (offset > 0 && (offset % event_code == 0)) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "A", event_code, 3)); + } + if (offset > 0 && offset % (2 * event_code) == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "B", event_code, 2)); + } + } + if (offset < 6 || offset == 8) { + EXPECT_EQ(kOK, GenerateObservations(day_index)); + } + // Make the set of Observations which are expected to be generated on + // |start_day_index + offset| and check it against the contents of the + // FakeObservationStore. + ExpectedPerDeviceCountObservations expected_per_device_count_obs; + ExpectedReportParticipationObservations expected_report_participation_obs; + switch (offset) { + case 0: { + for (uint32_t day_index = start_day_index - backfill_days; + day_index <= start_day_index; day_index++) { + for (const auto& pair : MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index)) { + expected_report_participation_obs.insert(pair); + } + } + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 1: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = {{7, {{"A", 1u, 3}}}, + {30, {{"A", 1u, 3}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 2: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 3: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 4: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 5: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 8: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 6}] = { + {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 7}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 8}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + for (uint32_t day_index = start_day_index + 6; + day_index <= start_day_index + 8; day_index++) { + for (const auto& pair : MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index)) { + expected_report_participation_obs.insert(pair); + } + } + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + default: + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + } + AdvanceClock(kDay); + } +} + +// Tests that the expected Observations are generated for PerDeviceCount reports +// when events are logged over multiple days and when Observations are +// backfilled for some days during that period, and when the +// LocalAggregatedStore is garbage-collected after each call to +// GenerateObservations(). +// +// The logging pattern and set of Observations for each day index is the same as +// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See +// that test for documentation. +TEST_F(PerDeviceCountEventAggregatorTest, + CheckObservationValuesWithBackfillAndGc) { + auto start_day_index = CurrentDayIndex(); + // Set |backfill_days_| to 3. + size_t backfill_days = 3; + SetBackfillDays(backfill_days); + // Log events for 9 days. Call GenerateObservations() on the first 6 day + // indices, and the 9th. + uint32_t num_days = 9; + for (uint32_t offset = 0; offset < num_days; offset++) { + auto day_index = CurrentDayIndex(); + ResetObservationStore(); + for (uint32_t event_code = 1; event_code < 3; event_code++) { + if (offset > 0 && (offset % event_code == 0)) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "A", event_code, 3)); + } + if (offset > 0 && offset % (2 * event_code) == 0) { + EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId, + day_index, "B", event_code, 2)); + } + } + // Advance |mock_clock_| by 1 day. + AdvanceClock(kDay); + if (offset < 6 || offset == 8) { + // Generate Observations and garbage-collect, both for the previous day + // index according to |mock_clock_|. Back up the LocalAggregateStore and + // the AggregatedObservationHistoryStore. + DoScheduledTasksNow(); + } + // Make the set of Observations which are expected to be generated on + // |start_day_index + offset| and check it against the contents of the + // FakeObservationStore. + ExpectedPerDeviceCountObservations expected_per_device_count_obs; + ExpectedReportParticipationObservations expected_report_participation_obs; + switch (offset) { + case 0: { + for (uint32_t day_index = start_day_index - backfill_days; + day_index <= start_day_index; day_index++) { + for (const auto& pair : MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index)) { + expected_report_participation_obs.insert(pair); + } + } + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 1: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = {{7, {{"A", 1u, 3}}}, + {30, {{"A", 1u, 3}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 2: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 3: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}, + {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 4: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 5: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + day_index}] = { + {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}}; + expected_report_participation_obs = + MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index); + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + case 8: { + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 6}] = { + {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 7}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}, + {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}}; + expected_per_device_count_obs[{kSettingsChangedMetricReportId, + start_day_index + 8}] = { + {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}, + {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}}; + for (uint32_t day_index = start_day_index + 6; + day_index <= start_day_index + 8; day_index++) { + for (const auto& pair : MakeExpectedReportParticipationObservations( + kPerDeviceCountExpectedParams, day_index)) { + expected_report_participation_obs.insert(pair); + } + } + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + break; + } + default: + EXPECT_TRUE(CheckPerDeviceCountObservations( + expected_per_device_count_obs, expected_report_participation_obs, + observation_store_.get(), update_recipient_.get())); + } + } +} + // Tests GenerateObservations() and GarbageCollect() in the case where the // LocalAggregateStore contains aggregates for metrics with both UTC and LOCAL // time zone policies, and where the day index in local time may be less than
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto index 28ceb2f..0f7837d 100644 --- a/logger/local_aggregation.proto +++ b/logger/local_aggregation.proto
@@ -98,18 +98,30 @@ ReportDefinition report = 3; } -// A container used by the EventAggregator to store the latest day index for -// which an Observation has been generated for each report, event code, and -// window size. +// A container used by the EventAggregator to store the history of generated +// Observations, indexed by report. message AggregatedObservationHistoryStore { // Keyed by base64-encoded serializations of ReportAggregationKey messages. map<string, AggregatedObservationHistory> by_report_key = 1; } +// A container for the history of Observations that have been generated for a +// single report. message AggregatedObservationHistory { + // If this is a locally aggregated report, then AggregatedObservationHistory + // has one of the following types. oneof type { + // A container for the history of UniqueActivesObservations generated + // for a UNIQUE_N_DAY_ACTIVES report. UniqueActivesObservationHistory unique_actives_history = 1; + // A container for the history of PerDeviceCountObservations generated + // for a PER_DEVICE_COUNT_STATS report. + PerDeviceCountObservationHistory per_device_count_history = 2; } + // A container for the history of ReportParticipationObservations generated + // for this report. Unset if ReportParticipationObservations should not be + // generated for this report. + ReportParticipationObservationHistory report_participation_history = 100; } message UniqueActivesObservationHistory { @@ -117,6 +129,27 @@ map<uint32, HistoryByWindowSize> by_event_code = 1; } +message PerDeviceCountObservationHistory { + // The history of PerDeviceCount Observations. Keyed by component string. + map<string, HistoryByEventCode> by_component = 1; +} + +message ReportParticipationObservationHistory { + // The last day index for which a ReportParticipationObservation was + // generated for this report. + // + // TODO(pesk): if ReportParticipationObservation has a window_size + // field in the future, consider changing this to a repeated field + // where the k-th last_generated field holds the value for window + // size k. + uint32 last_generated = 1; +} + +message HistoryByEventCode { + // Keyed by event code. + map<uint32, HistoryByWindowSize> by_event_code = 1; +} + message HistoryByWindowSize { // Keyed by window size. The value at a window size is the latest day index // for which an Observation has been generated for this report, event code,
diff --git a/logger/logger_test_utils.cc b/logger/logger_test_utils.cc index b8eeba4..a998fac 100644 --- a/logger/logger_test_utils.cc +++ b/logger/logger_test_utils.cc
@@ -24,6 +24,8 @@ namespace cobalt { +using crypto::byte; +using crypto::hash::DIGEST_SIZE; using encoder::ClientSecret; using rappor::BasicRapporEncoder; using rappor::RapporConfigHelper; @@ -33,6 +35,27 @@ namespace logger { namespace testing { +namespace { +// Populates |*hash_out| with the SHA256 of |component|, unless |component| +// is empty in which case *hash_out is set to the empty string also. An +// empty string indicates that the component_name feature is not being used. +// We expect this to be a common case and in this case there is no point +// in using 32 bytes to represent the empty string. Returns true on success +// and false on failure (unexpected). +bool HashComponentNameIfNotEmpty(const std::string& component, + std::string* hash_out) { + CHECK(hash_out); + if (component.empty()) { + hash_out->resize(0); + return true; + } + hash_out->resize(DIGEST_SIZE); + return cobalt::crypto::hash::Hash( + reinterpret_cast<const byte*>(component.data()), component.size(), + reinterpret_cast<byte*>(&hash_out->front())); +} +} // namespace + bool PopulateMetricDefinitions(const char metric_string[], MetricDefinitions* metric_definitions) { google::protobuf::TextFormat::Parser parser; @@ -87,6 +110,17 @@ return expected_obs; } +ExpectedReportParticipationObservations +MakeExpectedReportParticipationObservations( + const ExpectedAggregationParams& expected_params, uint32_t day_index) { + ExpectedReportParticipationObservations expected_obs; + + for (const auto& report_pair : expected_params.window_sizes) { + expected_obs.insert({report_pair.first, day_index}); + } + return expected_obs; +} + bool FetchObservations(std::vector<Observation2>* observations, const std::vector<uint32_t>& expected_report_ids, FakeObservationStore* observation_store, @@ -261,8 +295,8 @@ // when this method is called. ExpectedAggregationParams expected_params; // A container for the strings expected to appear in the |data| field of the - // BasicRapporObservation wrapped by the UniqueActivesObservation for a given - // MetricReportId, day index, window size, and event code. + // BasicRapporObservation wrapped by the UniqueActivesObservation for a + // given MetricReportId, day index, window size, and event code. std::map<std::pair<MetricReportId, uint32_t>, std::map<uint32_t, std::map<uint32_t, std::string>>> expected_values; @@ -348,6 +382,123 @@ return true; } +bool CheckPerDeviceCountObservations( + ExpectedPerDeviceCountObservations expected_per_device_count_obs, + ExpectedReportParticipationObservations expected_report_participation_obs, + FakeObservationStore* observation_store, + TestUpdateRecipient* update_recipient) { + // An ExpectedAggregationParams struct describing the number of + // Observations for each report ID which are expected to be + // in the FakeObservationStore when this method is called. + ExpectedAggregationParams expected_params; + std::map<std::string, std::string> component_hashes; + for (const auto& id_pair : expected_report_participation_obs) { + expected_params.daily_num_obs++; + expected_params.num_obs_per_report[id_pair.first]++; + expected_params.metric_report_ids.insert(id_pair.first); + } + for (const auto& id_pair : expected_per_device_count_obs) { + for (const auto& window_size_pair : id_pair.second) { + auto window_size = window_size_pair.first; + expected_params.window_sizes[id_pair.first.first].insert(window_size); + for (const auto& expected_obs : window_size_pair.second) { + expected_params.daily_num_obs++; + expected_params.num_obs_per_report[id_pair.first.first]++; + std::string component = std::get<0>(expected_obs); + std::string component_hash; + HashComponentNameIfNotEmpty(component, &component_hash); + component_hashes[component_hash] = component; + } + } + } + // Fetch the contents of the ObservationStore and check that each + // received Observation corresponds to an element of |expected_values|. + std::vector<Observation2> observations; + if (!FetchAggregatedObservations(&observations, expected_params, + observation_store, update_recipient)) { + return false; + } + std::vector<Observation2> report_participation_obs; + std::vector<ObservationMetadata> report_participation_metadata; + std::vector<Observation2> per_device_count_obs; + std::vector<ObservationMetadata> per_device_count_metadata; + for (size_t i = 0; i < observations.size(); i++) { + if (observations.at(i).has_report_participation()) { + report_participation_obs.push_back(observations.at(i)); + report_participation_metadata.push_back( + *observation_store->metadata_received[i]); + } else if (observations.at(i).has_per_device_count()) { + per_device_count_obs.push_back(observations.at(i)); + per_device_count_metadata.push_back( + *observation_store->metadata_received[i]); + } else { + return false; + } + } + // Check the received PerDeviceCountObservations + for (size_t i = 0; i < per_device_count_obs.size(); i++) { + auto obs_key = + std::make_pair(MetricReportId(per_device_count_metadata[i].metric_id(), + per_device_count_metadata[i].report_id()), + per_device_count_metadata[i].day_index()); + auto report_iter = expected_per_device_count_obs.find(obs_key); + if (report_iter == expected_per_device_count_obs.end()) { + return false; + } + auto obs = per_device_count_obs.at(i); + uint32_t obs_window_size = obs.per_device_count().window_size(); + auto window_iter = report_iter->second.find(obs_window_size); + if (window_iter == report_iter->second.end()) { + return false; + } + std::string obs_component_hash = + obs.per_device_count().integer_event_obs().component_name_hash(); + std::string obs_component; + auto hash_iter = component_hashes.find(obs_component_hash); + if (hash_iter == component_hashes.end()) { + return false; + } else { + obs_component = component_hashes[obs_component_hash]; + } + auto obs_tuple = std::make_tuple( + obs_component, obs.per_device_count().integer_event_obs().event_code(), + obs.per_device_count().integer_event_obs().value()); + auto obs_iter = window_iter->second.find(obs_tuple); + if (obs_iter == window_iter->second.end()) { + return false; + } + expected_per_device_count_obs.at(obs_key) + .at(obs_window_size) + .erase(obs_tuple); + if (expected_per_device_count_obs.at(obs_key).at(obs_window_size).empty()) { + expected_per_device_count_obs.at(obs_key).erase(obs_window_size); + } + if (expected_per_device_count_obs.at(obs_key).empty()) { + expected_per_device_count_obs.erase(obs_key); + } + } + if (!expected_per_device_count_obs.empty()) { + return false; + } + + // Check the received ReportParticipationObservations + for (size_t i = 0; i < report_participation_obs.size(); i++) { + auto obs_key = std::make_pair( + MetricReportId(report_participation_metadata[i].metric_id(), + report_participation_metadata[i].report_id()), + report_participation_metadata[i].day_index()); + if (expected_report_participation_obs.count(obs_key) == 0) { + return false; + } + expected_report_participation_obs.erase(obs_key); + } + if (!expected_report_participation_obs.empty()) { + return false; + } + + return true; +} + } // namespace testing } // namespace logger } // namespace cobalt
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h index 16a25b6..a10ccad 100644 --- a/logger/logger_test_utils.h +++ b/logger/logger_test_utils.h
@@ -9,6 +9,7 @@ #include <memory> #include <set> #include <string> +#include <tuple> #include <utility> #include <vector> @@ -129,6 +130,25 @@ std::map<uint32_t, std::vector<bool>>> ExpectedUniqueActivesObservations; +// A representation of a set of expected PerDeviceCountObservations. Used to +// check the values of PerDeviceCountObservations generated by the +// EventAggregator. +// +// The outer map is keyed by pairs (MetricReportId, day_index), where the day +// index represents the day index of the expected Observation. +typedef std::map< + std::pair<MetricReportId, uint32_t>, + std::map<uint32_t, std::set<std::tuple<std::string, uint32_t, int64_t>>>> + ExpectedPerDeviceCountObservations; + +// A representation of a set of expected ReportParticipationObservations. Used +// to check the values of ReportParticipationObservations generated by the +// EventAggregator. The first element of each pair is the MetricReportId of a +// report, and the second element represents the day index of an expected +// Observation for that report. a pair is a a set of window sizes. +typedef std::set<std::pair<MetricReportId, uint32_t>> + ExpectedReportParticipationObservations; + // Populates a MetricDefinitions proto message from a serialized representation. bool PopulateMetricDefinitions(const char metric_string[], MetricDefinitions* metric_definitions); @@ -156,6 +176,14 @@ ExpectedUniqueActivesObservations MakeNullExpectedUniqueActivesObservations( const ExpectedAggregationParams& expected_params, uint32_t day_index); +// Given an ExpectedAggregationParams struct |expected_params|, return an +// ExpectedReportParticipationObservations containing a pair +// (|metric_report_id|, |day_index|) for each MetricReportId |metric_report_id| +// in |expected_params|. +ExpectedReportParticipationObservations +MakeExpectedReportParticipationObservations( + const ExpectedAggregationParams& expected_params, uint32_t day_index); + // Populates |observations| with the contents of a FakeObservationStore. // |observations| should be a vector whose size is equal to the number // of expected observations. Checks the the ObservationStore contains @@ -207,6 +235,18 @@ FakeObservationStore* observation_store, TestUpdateRecipient* update_recipient); +// Checks that the Observations contained in a FakeObservationStore are exactly +// the PerDeviceCountObservations and ReportParticipationObservations that +// should be generated for a single day index given a representation of the +// expected activity indicators for that day, for each PerDeviceCount report, +// for each window size and event code, for a config whose locally aggregated +// reports are all of type PER_DEVICE_COUNT_STATS. +bool CheckPerDeviceCountObservations( + ExpectedPerDeviceCountObservations expected_per_device_count_obs, + ExpectedReportParticipationObservations expected_report_participation_obs, + FakeObservationStore* observation_store, + TestUpdateRecipient* update_recipient); + } // namespace testing } // namespace logger } // namespace cobalt