[EventAggregator] Generate PerDeviceCountObservations

The EventAggregator's GenerateObservations() method now
generates observations for PER_DEVICE_COUNT_STATS reports.

Change-Id: I158ccdb81e015886457e61aa19808274520d0c23
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index 706ef72..26ce7e9 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -5,8 +5,10 @@
 #include "logger/event_aggregator.h"
 
 #include <algorithm>
+#include <map>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "algorithms/rappor/rappor_config_helper.h"
 #include "config/metric_definition.pb.h"
@@ -427,6 +429,12 @@
   if (final_day_index_local == 0u) {
     final_day_index_local = final_day_index_utc;
   }
+  if (std::min(final_day_index_utc, final_day_index_local) <
+      backfill_days_ + kMaxAllowedAggregationWindowSize) {
+    LOG(ERROR) << "GenerateObservations: Day index of Observation must be >= "
+                  "backfill_days_ + kMaxAllowedAggregationWindowSize.";
+    return kInvalidArguments;
+  }
   // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
   // generate observations.
   auto local_aggregate_store = CopyLocalAggregateStore();
@@ -484,10 +492,27 @@
             if (status != kOK) {
               return status;
             }
+            break;
           }
           default:
             continue;
         }
+        break;
+      }
+      case MetricDefinition::EVENT_COUNT: {
+        switch (report.report_type()) {
+          case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+            auto status = GeneratePerDeviceCountObservations(
+                metric_ref, pair.first, pair.second, final_day_index);
+            if (status != kOK) {
+              return status;
+            }
+            break;
+          }
+          default:
+            continue;
+        }
+        break;
       }
       default:
         continue;
@@ -501,6 +526,12 @@
   if (day_index_local == 0u) {
     day_index_local = day_index_utc;
   }
+  if (std::min(day_index_utc, day_index_local) <
+      backfill_days_ + kMaxAllowedAggregationWindowSize) {
+    LOG(ERROR) << "GarbageCollect: Day index must be >= backfill_days_ + "
+                  "kMaxAllowedAggregationWindowSize.";
+    return kInvalidArguments;
+  }
   auto locked = protected_aggregate_store_.lock();
   for (auto pair : locked->local_aggregate_store.by_report_key()) {
     uint32_t day_index;
@@ -532,10 +563,6 @@
                     "window size.";
       return kInvalidConfig;
     }
-    if (day_index < backfill_days_ + max_window_size) {
-      LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size.";
-      return kInvalidArguments;
-    }
     // For each ReportAggregates, descend to and iterate over the sub-map of
     // local aggregates keyed by day index. Keep buckets with day indices
     // greater than |day_index| - |backfill_days_| - |max_window_size|, and
@@ -723,6 +750,10 @@
     const MetricRef metric_ref, const std::string& report_key,
     const ReportAggregates& report_aggregates, uint32_t num_event_codes,
     uint32_t final_day_index) {
+  // The earliest day index for which we might need to generate an Observation.
+  // GenerateObservations() has checked that this value is > 0.
+  auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
+
   for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
     auto daily_aggregates =
         report_aggregates.unique_actives_aggregates().by_event_code().find(
@@ -746,8 +777,7 @@
       // generated on this invocation.
       auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
                                                          window_size);
-      auto first_day_index =
-          std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_));
+      auto first_day_index = std::max(last_gen + 1, backfill_period_start);
       // The latest day index on which |event_type| is known to have
       // occurred, so far. This value will be updated as we search
       // forward from the earliest day index belonging to a window of
@@ -794,5 +824,203 @@
   return kOK;
 }
 
+////////// GeneratePerDeviceCountObservations and helper methods /////////////
+
+uint32_t EventAggregator::PerDeviceCountLastGeneratedDayIndex(
+    const std::string& report_key, const std::string& component,
+    uint32_t event_code, uint32_t window_size) const {
+  const auto& report_history = obs_history_.by_report_key().find(report_key);
+  if (report_history == obs_history_.by_report_key().end()) {
+    return 0u;
+  }
+  if (!report_history->second.has_per_device_count_history()) {
+    return 0u;
+  }
+  const auto& component_history =
+      report_history->second.per_device_count_history().by_component().find(
+          component);
+  if (component_history ==
+      report_history->second.per_device_count_history().by_component().end()) {
+    return 0u;
+  }
+  const auto& event_code_history =
+      component_history->second.by_event_code().find(event_code);
+  if (event_code_history == component_history->second.by_event_code().end()) {
+    return 0u;
+  }
+  const auto& window_size_history =
+      event_code_history->second.by_window_size().find(window_size);
+  if (window_size_history ==
+      event_code_history->second.by_window_size().end()) {
+    return 0u;
+  }
+  return window_size_history->second;
+}
+
+Status EventAggregator::GenerateSinglePerDeviceCountObservation(
+    const MetricRef metric_ref, const ReportDefinition* report,
+    uint32_t obs_day_index, const std::string& component, uint32_t event_code,
+    uint32_t window_size, int64_t count) const {
+  auto encoder_result = encoder_->EncodePerDeviceCountObservation(
+      metric_ref, report, obs_day_index, component, event_code, count,
+      window_size);
+  if (encoder_result.status != kOK) {
+    return encoder_result.status;
+  }
+  if (encoder_result.observation == nullptr ||
+      encoder_result.metadata == nullptr) {
+    LOG(ERROR) << "Failed to encode PerDeviceCountObservation";
+    return kOther;
+  }
+
+  const auto& writer_status = observation_writer_->WriteObservation(
+      *encoder_result.observation, std::move(encoder_result.metadata));
+  if (writer_status != kOK) {
+    return writer_status;
+  }
+  return kOK;
+}
+
+Status EventAggregator::GeneratePerDeviceCountObservations(
+    const MetricRef metric_ref, const std::string& report_key,
+    const ReportAggregates& report_aggregates, uint32_t final_day_index) {
+  // Get the window sizes for this report and sort them in increasing order.
+  // TODO(pesk): Instead, have UpdateAggregationConfigs() store the window sizes
+  // in increasing order.
+  std::vector<uint32_t> window_sizes;
+  for (uint32_t window_size :
+       report_aggregates.aggregation_config().report().window_size()) {
+    if (window_size > kMaxAllowedAggregationWindowSize) {
+      LOG(WARNING) << "GeneratePerDeviceCountObservations ignoring a window "
+                      "size exceeding the maximum allowed value";
+      continue;
+    }
+    window_sizes.push_back(window_size);
+  }
+  std::sort(window_sizes.begin(), window_sizes.end());
+
+  // The first day index for which we might have to generate an Observation.
+  // GenerateObservations() has checked that this value is > 0.
+  auto backfill_period_start = uint32_t(final_day_index - backfill_days_);
+
+  // Generate any necessary PerDeviceCountObservations for this report.
+  for (const auto& component_pair :
+       report_aggregates.count_aggregates().by_component()) {
+    const auto& component = component_pair.first;
+    for (const auto& event_code_pair : component_pair.second.by_event_code()) {
+      auto event_code = event_code_pair.first;
+      const auto& event_code_aggregates = event_code_pair.second;
+      // Populate a helper map keyed by day indices which belong to the range
+      // [|backfill_period_start|, |final_day_index|]. The value at a day index
+      // is the list of window sizes, in increasing order, for which an
+      // Observation should be generated for that day index.
+      std::map<uint32_t, std::vector<uint32_t>> window_sizes_by_obs_day;
+      for (auto window_size : window_sizes) {
+        auto last_gen = PerDeviceCountLastGeneratedDayIndex(
+            report_key, component, event_code, window_size);
+        auto first_day_index = std::max(last_gen + 1, backfill_period_start);
+        for (auto obs_day_index = first_day_index;
+             obs_day_index <= final_day_index; obs_day_index++) {
+          window_sizes_by_obs_day[obs_day_index].push_back(window_size);
+        }
+      }
+      // Iterate over the day indices |obs_day_index| for which we might need to
+      // generate an Observation. For each day index, generate an Observation
+      // for each needed window size.
+      //
+      // More precisely, for each needed window size, compute the count over the
+      // window and generate a PerDeviceCountObservation only if the count is
+      // nonzero. Whether or not the count was zero, update the
+      // AggregatedObservationHistory for this report, component, event code,
+      // and window size with |obs_day_index| as the most recent date of
+      // Observation generation. This reflects the fact that the count was
+      // computed for the window ending on that date, even though an Observation
+      // is only sent if the count is nonzero.
+      for (auto obs_day_index = backfill_period_start;
+           obs_day_index <= final_day_index; obs_day_index++) {
+        const auto& window_sizes = window_sizes_by_obs_day.find(obs_day_index);
+        if (window_sizes == window_sizes_by_obs_day.end()) {
+          continue;
+        }
+        int64_t count = 0;
+        uint32_t num_days = 0;
+        for (auto window_size : window_sizes->second) {
+          while (num_days < window_size) {
+            const auto& day_aggregates =
+                event_code_aggregates.by_day_index().find(obs_day_index -
+                                                          num_days);
+            if (day_aggregates != event_code_aggregates.by_day_index().end()) {
+              count += day_aggregates->second.count_daily_aggregate().count();
+            }
+            num_days++;
+          }
+          if (count != 0) {
+            auto status = GenerateSinglePerDeviceCountObservation(
+                metric_ref, &report_aggregates.aggregation_config().report(),
+                obs_day_index, component, event_code, window_size, count);
+            if (status != kOK) {
+              return status;
+            }
+          }
+          // Update |obs_history_| with the latest date of Observation
+          // generation for this report, component, event code, and window
+          // size.
+          (*(*(*(*obs_history_.mutable_by_report_key())[report_key]
+                    .mutable_per_device_count_history()
+                    ->mutable_by_component())[component]
+                  .mutable_by_event_code())[event_code]
+                .mutable_by_window_size())[window_size] = obs_day_index;
+        }
+      }
+    }
+  }
+  // Generate any necessary ReportParticipationObservations for this report.
+  auto participation_last_gen =
+      ReportParticipationLastGeneratedDayIndex(report_key);
+  auto participation_first_day_index =
+      std::max(participation_last_gen + 1, backfill_period_start);
+  for (auto obs_day_index = participation_first_day_index;
+       obs_day_index <= final_day_index; obs_day_index++) {
+    GenerateSingleReportParticipationObservation(
+        metric_ref, &report_aggregates.aggregation_config().report(),
+        obs_day_index);
+    (*obs_history_.mutable_by_report_key())[report_key]
+        .mutable_report_participation_history()
+        ->set_last_generated(obs_day_index);
+  }
+  return kOK;
+}
+
+uint32_t EventAggregator::ReportParticipationLastGeneratedDayIndex(
+    const std::string& report_key) const {
+  const auto& report_history = obs_history_.by_report_key().find(report_key);
+  if (report_history == obs_history_.by_report_key().end()) {
+    return 0u;
+  }
+  return report_history->second.report_participation_history().last_generated();
+}
+
+Status EventAggregator::GenerateSingleReportParticipationObservation(
+    const MetricRef metric_ref, const ReportDefinition* report,
+    uint32_t obs_day_index) const {
+  auto encoder_result = encoder_->EncodeReportParticipationObservation(
+      metric_ref, report, obs_day_index);
+  if (encoder_result.status != kOK) {
+    return encoder_result.status;
+  }
+  if (encoder_result.observation == nullptr ||
+      encoder_result.metadata == nullptr) {
+    LOG(ERROR) << "Failed to encode ReportParticipationObservation";
+    return kOther;
+  }
+
+  const auto& writer_status = observation_writer_->WriteObservation(
+      *encoder_result.observation, std::move(encoder_result.metadata));
+  if (writer_status != kOK) {
+    return writer_status;
+  }
+  return kOK;
+}
+
 }  // namespace logger
 }  // namespace cobalt
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h
index 8fbaf24..a742ce3 100644
--- a/logger/event_aggregator.h
+++ b/logger/event_aggregator.h
@@ -27,20 +27,19 @@
 namespace cobalt {
 namespace logger {
 
-// The EventAggregator class manages an in-memory store of aggregated values
-// of Events logged for locally aggregated report types. For each day, this
-// LocalAggregateStore contains an aggregate of the values of logged Events of
-// a given event code over that day. These daily aggregates are used to
-// produce Observations of values aggregated over a rolling window of size
-// specified in the ReportDefinition.
+// The EventAggregator manages an in-memory store of aggregated Event values,
+// indexed by report, day index, and other dimensions specific to the report
+// type (e.g. event code). Periodically, this data is used to generate
+// Observations representing aggregates of Event values over a day, week, month,
+// etc.
 //
 // Each Logger interacts with the EventAggregator in the following way:
 // (1) When the Logger is created, it calls UpdateAggregationConfigs() to
-// provide the EventAggregator with the configurations of its locally
-// aggregated reports.
-// (2) When logging an Event for a locally aggregated report, a Logger calls an
-// Update*Aggregation() method with the Event and the ReportAggregationKey of
-// the report.
+// provide the EventAggregator with its view of Cobalt's metric and report
+// registry.
+// (2) When logging an Event for a locally aggregated report, a Logger
+// calls an Update*Aggregation() method with the Event and the
+// ReportAggregationKey of the report.
 //
 // A worker thread does the following tasks at intervals specified in the
 // EventAggregator's constructor:
@@ -115,8 +114,8 @@
   // Start the worker thread.
   void Start();
 
-  // Updates the EventAggregator's view of the set of locally aggregated
-  // report configurations.
+  // Updates the EventAggregator's view of the Cobalt metric and report
+  // registry.
   //
   // This method may be called multiple times during the EventAggregator's
   // lifetime. If the EventAggregator is provided with a report whose tuple
@@ -266,6 +265,22 @@
                                               uint32_t event_code,
                                               uint32_t window_size) const;
 
+  // Returns the most recent day index for which an Observation was generated
+  // for a given PER_DEVICE_COUNT_STATS report, component, event code,
+  // and window size, according to |obs_history_|. Returns 0 if no Observation
+  // has been generated for the given arguments.
+  uint32_t PerDeviceCountLastGeneratedDayIndex(const std::string& report_key,
+                                               const std::string& component,
+                                               uint32_t event_code,
+                                               uint32_t window_size) const;
+
+  // Returns the most recent day index for which a
+  // ReportParticipationObservation was generated for a given report, according
+  // to |obs_history_|. Returns 0 if no Observation has been generated for the
+  // given arguments.
+  uint32_t ReportParticipationLastGeneratedDayIndex(
+      const std::string& report_key) const;
+
   // For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
   // for each event code of the parent metric, for each window size of the
   // report, for the window of that size ending on |final_day_index|, unless
@@ -290,6 +305,40 @@
                                                 uint32_t window_size,
                                                 bool was_active) const;
 
+  // For a fixed report of type PER_DEVICE_COUNT_STATS, generates a
+  // PerDeviceCountObservation for each tuple (component, event code, window
+  // size) for which a positive number of events with that event code occurred
+  // with that component during the window of that size ending on
+  // |final_day_index|, unless an Observation with those parameters has been
+  // generated in the past. Also generates PerDeviceCountObservations for days
+  // in the backfill period if needed.
+  //
+  // In addition to PerDeviceCountObservations, generates a
+  // ReportParticipationObservation for |final_day_index| and any needed days in
+  // the backfill period. These ReportParticipationObservations are used by the
+  // PerDeviceCount report generator to infer the fleet-wide number of devices
+  // for which the event count associated to each tuple (component, event code,
+  // window size) was zero.
+  //
+  // Observations are not generated for aggregation windows larger than
+  // |kMaxAllowedAggregationWindowSize|.
+  Status GeneratePerDeviceCountObservations(
+      const MetricRef metric_ref, const std::string& report_key,
+      const ReportAggregates& report_aggregates, uint32_t final_day_index);
+
+  // Helper method called by GeneratePerDeviceCountObservations() to generate
+  // and write a single PerDeviceCountObservation.
+  Status GenerateSinglePerDeviceCountObservation(
+      const MetricRef metric_ref, const ReportDefinition* report,
+      uint32_t obs_day_index, const std::string& component, uint32_t event_code,
+      uint32_t window_size, int64_t count) const;
+
+  // Helper method called by GeneratePerDeviceCountObservations() to generate
+  // and write a single ReportParticipationObservation.
+  Status GenerateSingleReportParticipationObservation(
+      const MetricRef metric_ref, const ReportDefinition* report,
+      uint32_t obs_day_index) const;
+
   // Returns a copy of the LocalAggregateStore. Does not assume that the
   // caller holds the mutex of |protected_aggregate_store_|.
   LocalAggregateStore CopyLocalAggregateStore() {
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 6ba882e..fe266bc 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -34,13 +34,17 @@
 
 namespace logger {
 
+using testing::CheckPerDeviceCountObservations;
 using testing::CheckUniqueActivesObservations;
 using testing::ExpectedAggregationParams;
+using testing::ExpectedPerDeviceCountObservations;
+using testing::ExpectedReportParticipationObservations;
 using testing::ExpectedUniqueActivesObservations;
 using testing::FakeObservationStore;
 using testing::FetchAggregatedObservations;
 using testing::MakeAggregationConfig;
 using testing::MakeAggregationKey;
+using testing::MakeExpectedReportParticipationObservations;
 using testing::MakeNullExpectedUniqueActivesObservations;
 using testing::MockConsistentProtoStore;
 using testing::PopulateMetricDefinitions;
@@ -122,14 +126,14 @@
 static const ExpectedAggregationParams kExpectedParams = {
     /* The total number of locally aggregated Observations that should be
        generated for each day index. */
-    9,
+    10,
     /* The MetricReportIds of the locally aggregated reports in this
        configuration. */
     {kErrorsOccurredMetricReportId, kConnectionFailuresMetricReportId},
     /* The number of Observations which should be generated for each day index,
        broken down by MetricReportId. */
     {{kErrorsOccurredMetricReportId, 9},
-     {kConnectionFailuresMetricReportId, 0}},
+     {kConnectionFailuresMetricReportId, 1}},
     /* The number of event codes for each report of type UNIQUE_N_DAY_ACTIVES,
        by MetricReportId. */
     {{kErrorsOccurredMetricReportId, 3}},
@@ -345,21 +349,19 @@
 // Properties of the locally aggregated Observations which should be generated
 // for the reports in |kMetricDefinitions|, assuming that no events have ever
 // been logged for those reports.
-//
-// TODO(pesk): update these fields once the EventAggregator is generating
-// observations for PerDeviceCount reports.
 static const ExpectedAggregationParams kPerDeviceCountExpectedParams = {
     /* The total number of Observations that should be generated for a day
        index. */
-    0,
+    2,
     /* The MetricReportIds of the locally aggregated reports in this
        configuration. */
     {kConnectionFailuresMetricReportId, kSettingsChangedMetricReportId},
     /* The number of Observations which should be generated for a day index,
        broken down by MetricReportId. */
-    {{kConnectionFailuresMetricReportId, 0},
-     {kSettingsChangedMetricReportId, 0}},
-    /* Omitted because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
+    {{kConnectionFailuresMetricReportId, 1},
+     {kSettingsChangedMetricReportId, 1}},
+    /* The number of event codes for each UNIQUE_N_DAY_ACTIVES report. Omitted
+       because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
     {},
     /* The set of window sizes for each MetricReportId. */
     {{kConnectionFailuresMetricReportId, {1}},
@@ -472,11 +474,11 @@
     event_aggregator_.reset(new EventAggregator(
         encoder_.get(), observation_writer_.get(),
         local_aggregate_proto_store_.get(), obs_history_proto_store_.get()));
-    // Provide the EventAggregator with a mock clock starting at 1 year after
+    // Provide the EventAggregator with a mock clock starting at 10 years after
     // the beginning of time.
     mock_clock_ = new IncrementingClock(std::chrono::system_clock::duration(0));
-    mock_clock_->set_time(
-        std::chrono::system_clock::time_point(std::chrono::seconds(kYear)));
+    mock_clock_->set_time(std::chrono::system_clock::time_point(
+        std::chrono::seconds(10 * kYear)));
     event_aggregator_->SetClock(mock_clock_);
     day_store_created_ = CurrentDayIndex();
   }
@@ -2101,7 +2103,7 @@
 // Tests that the expected UniqueActivesObservations are generated when events
 // are logged over multiple days and when Observations are backfilled for some
 // days during that period, and when the LocalAggregateStore is
-// garbage-collected after each all to GenerateObservations().
+// garbage-collected after each call to GenerateObservations().
 //
 // The test sets the number of backfill days to 3.
 //
@@ -2361,6 +2363,851 @@
   }
 }
 
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated after
+// some CountEvents have been logged for PerDeviceCount reports, without any
+// garbage collection.
+//
+// For 35 days, logs a positive number of events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Each day, calls GenerateObservations() with the day index of the previous
+// day. Checks that a positive status is returned and that the
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservations) {
+  int num_days = 1;
+  std::vector<Observation2> observations(0);
+  ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams;
+  for (int offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    observations.clear();
+    ResetObservationStore();
+    EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+    EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+                                            observation_store_.get(),
+                                            update_recipient_.get()));
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                            day_index, "component_A", 0u, 1));
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                            day_index, "component_B", 0u, 5));
+    }
+    // If this is the first time we're logging events, update the expected
+    // numbers of generated Observations to account for the logged events.
+    // For each report, for each window size, expect 1 Observation more than if
+    // no events had been logged.
+    if (offset == 0) {
+      expected_params.daily_num_obs += 3;
+      expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] +=
+          1;
+      expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2;
+    }
+    AdvanceClock(kDay);
+  }
+  observations.clear();
+  ResetObservationStore();
+  EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex() - 1));
+  EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+                                          observation_store_.get(),
+                                          update_recipient_.get()));
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated after
+// some CountEvents have been logged for PerDeviceCount reports over multiple
+// days, and when the LocalAggregateStore is garbage-collected each day.
+//
+// For 35 days, logs a positive number of events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Each day, calls GenerateObservations() with the day index of the previous
+// day. Checks that a positive status is returned and that the
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kPerDeviceCountMetricDefinitions|.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithGc) {
+  int num_days = 35;
+  std::vector<Observation2> observations(0);
+  ExpectedAggregationParams expected_params = kPerDeviceCountExpectedParams;
+  for (int offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    observations.clear();
+    ResetObservationStore();
+    EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+    EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+                                            observation_store_.get(),
+                                            update_recipient_.get()));
+    EXPECT_EQ(kOK, GarbageCollect(day_index));
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                            day_index, "component_A", 0u, 1));
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                            day_index, "component_B", 0u, 5));
+    }
+    // If this is the first time we're logging events, update the expected
+    // numbers of generated Observations to account for the logged events.
+    // For each report, for each window size, expect 1 Observation more than if
+    // no events had been logged.
+    if (offset == 0) {
+      expected_params.daily_num_obs += 3;
+      expected_params.num_obs_per_report[kConnectionFailuresMetricReportId] +=
+          1;
+      expected_params.num_obs_per_report[kSettingsChangedMetricReportId] += 2;
+    }
+    AdvanceClock(kDay);
+  }
+  observations.clear();
+  ResetObservationStore();
+  auto day_index = CurrentDayIndex();
+  EXPECT_EQ(kOK, GenerateObservations(day_index - 1));
+  EXPECT_TRUE(FetchAggregatedObservations(&observations, expected_params,
+                                          observation_store_.get(),
+                                          update_recipient_.get()));
+  EXPECT_EQ(kOK, GarbageCollect(day_index));
+}
+
+// Tests that GenerateObservations() returns a positive status and that the
+// expected number of Observations is generated when events are logged over
+// multiple days and some of those days' Observations are backfilled, without
+// any garbage collection of the LocalAggregateStore.
+//
+// Sets the |backfill_days_| field of the EventAggregator to 3.
+//
+// Logging pattern:
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
+//
+// Observation generation pattern:
+// Calls GenerateObservations() on the 1st through 5th and the 7th out of
+// every 10 days, for 35 days.
+//
+// Expected numbers of Observations:
+// It is expected that 4 days' worth of Observations are generated on
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
+// are generated on the 2nd through 5th day of every 10, that 2 days'
+// worth of Observations are generated on the 7th day of every 10 (the
+// day index for which GenerateObservations() was called, plus 1 day of
+// backfill), and that no Observations are generated on the remaining days.
+TEST_F(PerDeviceCountEventAggregatorTest, GenerateObservationsWithBackfill) {
+  // Set |backfill_days_| to 3.
+  size_t backfill_days = 3;
+  SetBackfillDays(backfill_days);
+  // Log 2 events each day for 35 days. Call GenerateObservations() on the
+  // first 5 day indices, and the 7th, out of every 10.
+  for (int offset = 0; offset < 35; offset++) {
+    auto day_index = CurrentDayIndex();
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                            day_index, "component_A", 0u, 1));
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                            day_index, "component_B", 0u, 5));
+    }
+    auto num_obs_before = observation_store_->messages_received.size();
+    if (offset % 10 < 5 || offset % 10 == 6) {
+      EXPECT_EQ(kOK, GenerateObservations(day_index));
+    }
+    auto num_obs_after = observation_store_->messages_received.size();
+    EXPECT_GE(num_obs_after, num_obs_before);
+    // Check that the expected daily number of Observations was generated.
+    switch (offset % 10) {
+      case 0:
+        // If this is the first day of logging, expect 3 Observations for each
+        // day in the backfill period and 6 Observations for the current day.
+        if (offset == 0) {
+          EXPECT_EQ(
+              (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) +
+                  kPerDeviceCountExpectedParams.daily_num_obs + 3,
+              num_obs_after - num_obs_before);
+        } else {
+          // If this is another day whose offset is a multiple of 10, expect 6
+          // Observations for each day in the backfill period as well as the
+          // current day.
+          EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) *
+                        (backfill_days + 1),
+                    num_obs_after - num_obs_before);
+        }
+        break;
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+        // Expect 6 Observations for this day.
+        EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3,
+                  num_obs_after - num_obs_before);
+        break;
+      case 6:
+        // Expect 6 Observations for each of today and yesterday.
+        EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2,
+                  num_obs_after - num_obs_before);
+        break;
+      default:
+        EXPECT_EQ(num_obs_after, num_obs_before);
+    }
+    AdvanceClock(kDay);
+  }
+}
+
+// Tests that GenerateObservations() returns a positive status and that the
+// expected number of Observations is generated when events are logged over
+// multiple days and some of those days' Observations are backfilled, and when
+// the LocalAggregateStore is garbage-collected after each call to
+// GenerateObservations().
+//
+// Sets the |backfill_days_| field of the EventAggregator to 3.
+//
+// Logging pattern:
+// For 35 days, logs 2 events each day for the
+// ConnectionFailures_PerDeviceCount report with "component_A" and 2 events for
+// the SettingsChanged_PerDeviceCount report with "component_B", all with event
+// code 0.
+//
+// Observation generation pattern:
+// Calls GenerateObservations() on the 1st through 5th and the 7th out of
+// every 10 days, for 35 days. Garbage-collects the LocalAggregateStore after
+// each call.
+//
+// Expected numbers of Observations:
+// It is expected that 4 days' worth of Observations are generated on
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
+// are generated on the 2nd through 5th day of every 10, that 2 days'
+// worth of Observations are generated on the 7th day of every 10 (the
+// day index for which GenerateObservations() was called, plus 1 day of
+// backfill), and that no Observations are generated on the remaining days.
+TEST_F(PerDeviceCountEventAggregatorTest,
+       GenerateObservationsWithBackfillAndGc) {
+  int num_days = 35;
+  // Set |backfill_days_| to 3.
+  size_t backfill_days = 3;
+  SetBackfillDays(backfill_days);
+  // Log 2 events each day for 35 days. Call GenerateObservations() on the
+  // first 5 day indices, and the 7th, out of every 10.
+  for (int offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                            day_index, "component_A", 0u, 1));
+      EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                            day_index, "component_B", 0u, 5));
+    }
+    auto num_obs_before = observation_store_->messages_received.size();
+    if (offset % 10 < 5 || offset % 10 == 6) {
+      EXPECT_EQ(kOK, GenerateObservations(day_index));
+      EXPECT_EQ(kOK, GarbageCollect(day_index));
+    }
+    auto num_obs_after = observation_store_->messages_received.size();
+    EXPECT_GE(num_obs_after, num_obs_before);
+    // Check that the expected daily number of Observations was generated.
+    switch (offset % 10) {
+      case 0:
+        // If this is the first day of logging, expect 3 Observations for each
+        // day in the backfill period and 6 Observations for the current day.
+        if (offset == 0) {
+          EXPECT_EQ(
+              (kPerDeviceCountExpectedParams.daily_num_obs * backfill_days) +
+                  kPerDeviceCountExpectedParams.daily_num_obs + 3,
+              num_obs_after - num_obs_before);
+        } else {
+          // If this is another day whose offset is a multiple of 10, expect 6
+          // Observations for each day in the backfill period as well as the
+          // current day.
+          EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) *
+                        (backfill_days + 1),
+                    num_obs_after - num_obs_before);
+        }
+        break;
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+        // Expect 6 Observations for this day.
+        EXPECT_EQ(kPerDeviceCountExpectedParams.daily_num_obs + 3,
+                  num_obs_after - num_obs_before);
+        break;
+      case 6:
+        // Expect 6 Observations for each of today and yesterday.
+        EXPECT_EQ((kPerDeviceCountExpectedParams.daily_num_obs + 3) * 2,
+                  num_obs_after - num_obs_before);
+        break;
+      default:
+        EXPECT_EQ(num_obs_after, num_obs_before);
+    }
+    AdvanceClock(kDay);
+  }
+}
+
+// Generate Observations without logging any events, and check that the
+// resulting Observations are as expected: 1 ReportParticipationObservation for
+// each PER_DEVICE_COUNT_STATS report in the config, and no
+// PerDeviceCountObservations.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesNoEvents) {
+  auto current_day_index = CurrentDayIndex();
+  EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+  auto expected_report_participation_obs =
+      MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams,
+                                                  current_day_index);
+  EXPECT_TRUE(CheckPerDeviceCountObservations(
+      {}, expected_report_participation_obs, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Check that the expected PerDeviceCountObservations and
+// ReportParticipationObservations are generated when GenerateObservations() is
+// called after logging some events for PER_DEVICE_COUNT_STATS reports over a
+// single day index.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesSingleDay) {
+  auto day_index = CurrentDayIndex();
+  // Log several events on |day_index|.
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                        day_index, "component_A", 0u, 5));
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                        day_index, "component_B", 0u, 5));
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                        day_index, "component_A", 0u, 5));
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                        day_index, "component_A", 1u, 5));
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                        day_index, "component_C", 0u, 5));
+  EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                        day_index, "component_C", 0u, 5));
+  // Generate locally aggregated Observations for |day_index|.
+  EXPECT_EQ(kOK, GenerateObservations(day_index));
+
+  // Form the expected Observations.
+  auto expected_report_participation_obs =
+      MakeExpectedReportParticipationObservations(kPerDeviceCountExpectedParams,
+                                                  day_index);
+  ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+  expected_per_device_count_obs[{kConnectionFailuresMetricReportId,
+                                 day_index}][1] = {
+      {"component_A", 0u, 10}, {"component_A", 1u, 5}, {"component_B", 0u, 5}};
+  expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}]
+                               [7] = {{"component_C", 0u, 10}};
+  expected_per_device_count_obs[{kSettingsChangedMetricReportId, day_index}]
+                               [30] = {{"component_C", 0u, 10}};
+  EXPECT_TRUE(CheckPerDeviceCountObservations(
+      expected_per_device_count_obs, expected_report_participation_obs,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+// Checks that PerDeviceCountObservations with the expected values are
+// generated when some events have been logged for a PER_DEVICE_COUNT
+// report for over multiple days and GenerateObservations() is called each
+// day, without garbage collection or backfill.
+//
+// Logged events for the SettingsChanged_PerDeviceCount metric on the i-th day:
+//
+//  i            (component, event code, count)
+// -----------------------------------------------------------------------
+//  0
+//  1          ("A", 1, 3)
+//  2          ("A", 1, 3), ("A", 2, 3), ("B", 1, 2)
+//  3          ("A", 1, 3)
+//  4          ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2)
+//  5          ("A", 1, 3)
+//  6          ("A", 1, 3), ("A", 2, 3), ("B", 1, 2)
+//  7          ("A", 1, 3)
+//  8          ("A", 1, 3), ("A", 2, 3), ("B", 1, 2), ("B", 2, 2)
+//  9          ("A", 1, 3)
+//
+// Expected PerDeviceCountObservations for the SettingsChanged_PerDeviceCount
+// report on the i-th day:
+//
+// (i, window size)          (component, event code, count)
+// -----------------------------------------------------------------------
+// (0, 7)
+// (0, 30)
+// (1, 7)     ("A", 1,  3)
+// (1, 30)    ("A", 1,  3)
+// (2, 7)     ("A", 1,  6),  ("A", 2,  3), ("B", 1, 2)
+// (2, 30)    ("A", 1,  6),  ("A", 2,  3), ("B", 1, 2)
+// (3, 7)     ("A", 1,  9),  ("A", 2,  3), ("B", 1, 2)
+// (3, 30)    ("A", 1,  9),  ("A", 2,  3), ("B", 1, 2)
+// (4, 7)     ("A", 1, 12),  ("A", 2,  6), ("B", 1, 4), ("B", 2, 2)
+// (4, 30)    ("A", 1, 12),  ("A", 2,  6), ("B", 1, 4), ("B", 2, 2)
+// (5, 7)     ("A", 1, 15),  ("A", 2,  6), ("B", 1, 4), ("B", 2, 2)
+// (5, 30)    ("A", 1, 15),  ("A", 2,  6), ("B", 1, 4), ("B", 2, 2)
+// (6, 7)     ("A", 1, 18),  ("A", 2,  9), ("B", 1, 6), ("B", 2, 2)
+// (6, 30)    ("A", 1, 18),  ("A", 2,  9), ("B", 1, 6), ("B", 2, 2)
+// (7, 7)     ("A", 1, 21),  ("A", 2,  9), ("B", 1, 6), ("B", 2, 2)
+// (7, 30)    ("A", 1, 21),  ("A", 2,  9), ("B", 1, 6), ("B", 2, 2)
+// (8, 7)     ("A", 1, 21),  ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+// (8, 30)    ("A", 1, 24),  ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+// (9, 7)     ("A", 1, 21),  ("A", 2,  9), ("B", 1, 6), ("B", 2, 4)
+// (9, 30)    ("A", 1, 27),  ("A", 2, 12), ("B", 1, 8), ("B", 2, 4)
+//
+// In addition, expect 2 ReportParticipationObservations each day, 1 for each
+// of ConnectionFailures_PerDeviceCount and SettingsChanged_PerDeviceCount.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesMultiDay) {
+  auto start_day_index = CurrentDayIndex();
+  // Form expected Observations for the 10 days of logging.
+  uint32_t num_days = 10;
+  std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs(
+      num_days);
+  std::vector<ExpectedReportParticipationObservations>
+      expected_report_participation_obs(num_days);
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    expected_report_participation_obs[offset] =
+        MakeExpectedReportParticipationObservations(
+            kPerDeviceCountExpectedParams, start_day_index + offset);
+  }
+  expected_per_device_count_obs[0] = {};
+  expected_per_device_count_obs[1][{kSettingsChangedMetricReportId,
+                                    start_day_index + 1}] = {
+      {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}};
+  expected_per_device_count_obs[2][{kSettingsChangedMetricReportId,
+                                    start_day_index + 2}] = {
+      {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+      {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+  expected_per_device_count_obs[3][{kSettingsChangedMetricReportId,
+                                    start_day_index + 3}] = {
+      {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+      {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+  expected_per_device_count_obs[4][{kSettingsChangedMetricReportId,
+                                    start_day_index + 4}] = {
+      {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[5][{kSettingsChangedMetricReportId,
+                                    start_day_index + 5}] = {
+      {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[6][{kSettingsChangedMetricReportId,
+                                    start_day_index + 6}] = {
+      {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[7][{kSettingsChangedMetricReportId,
+                                    start_day_index + 7}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[8][{kSettingsChangedMetricReportId,
+                                    start_day_index + 8}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+      {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+  expected_per_device_count_obs[9][{kSettingsChangedMetricReportId,
+                                    start_day_index + 9}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}},
+      {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+
+  for (uint32_t offset = 0; offset < 1; offset++) {
+    auto day_index = CurrentDayIndex();
+    for (uint32_t event_code = 1; event_code < 3; event_code++) {
+      if (offset > 0 && offset % event_code == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "A", event_code, 3));
+      }
+      if (offset > 0 && offset % (2 * event_code) == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "B", event_code, 2));
+      }
+    }
+    // Clear the FakeObservationStore.
+    ResetObservationStore();
+    // Generate locally aggregated Observations.
+    EXPECT_EQ(kOK, GenerateObservations(day_index));
+    EXPECT_TRUE(CheckPerDeviceCountObservations(
+        expected_per_device_count_obs[offset],
+        expected_report_participation_obs[offset], observation_store_.get(),
+        update_recipient_.get()))
+        << "offset = " << offset;
+    AdvanceClock(kDay);
+  }
+}
+
+// Repeat the CheckObservationValuesMultiDay test, this time calling
+// GarbageCollect() after each call to GenerateObservations.
+//
+// The logging pattern and set of Observations for each day index is the same
+// as in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest,
+       CheckObservationValuesMultiDayWithGarbageCollection) {
+  auto start_day_index = CurrentDayIndex();
+  // Form expected Observations for the 10 days of logging.
+  uint32_t num_days = 10;
+  std::vector<ExpectedPerDeviceCountObservations> expected_per_device_count_obs(
+      num_days);
+  std::vector<ExpectedReportParticipationObservations>
+      expected_report_participation_obs(num_days);
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    expected_report_participation_obs[offset] =
+        MakeExpectedReportParticipationObservations(
+            kPerDeviceCountExpectedParams, start_day_index + offset);
+  }
+  expected_per_device_count_obs[0] = {};
+  expected_per_device_count_obs[1][{kSettingsChangedMetricReportId,
+                                    start_day_index + 1}] = {
+      {7, {{"A", 1u, 3}}}, {30, {{"A", 1u, 3}}}};
+  expected_per_device_count_obs[2][{kSettingsChangedMetricReportId,
+                                    start_day_index + 2}] = {
+      {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+      {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+  expected_per_device_count_obs[3][{kSettingsChangedMetricReportId,
+                                    start_day_index + 3}] = {
+      {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+      {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+  expected_per_device_count_obs[4][{kSettingsChangedMetricReportId,
+                                    start_day_index + 4}] = {
+      {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[5][{kSettingsChangedMetricReportId,
+                                    start_day_index + 5}] = {
+      {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[6][{kSettingsChangedMetricReportId,
+                                    start_day_index + 6}] = {
+      {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[7][{kSettingsChangedMetricReportId,
+                                    start_day_index + 7}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+      {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+  expected_per_device_count_obs[8][{kSettingsChangedMetricReportId,
+                                    start_day_index + 8}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+      {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+  expected_per_device_count_obs[9][{kSettingsChangedMetricReportId,
+                                    start_day_index + 9}] = {
+      {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 4}}},
+      {30, {{"A", 1u, 27}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+
+  for (uint32_t offset = 0; offset < 10; offset++) {
+    auto day_index = CurrentDayIndex();
+    for (uint32_t event_code = 1; event_code < 3; event_code++) {
+      if (offset > 0 && offset % event_code == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "A", event_code, 3));
+      }
+      if (offset > 0 && offset % (2 * event_code) == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "B", event_code, 2));
+      }
+    }
+    // Advance |mock_clock_| by 1 day.
+    AdvanceClock(kDay);
+    // Clear the FakeObservationStore.
+    ResetObservationStore();
+    // Generate locally aggregated Observations and garbage-collect the
+    // LocalAggregateStore, both for the previous day as measured by
+    // |mock_clock_|. Back up the LocalAggregateStore and
+    // AggregatedObservationHistoryStore.
+    DoScheduledTasksNow();
+    EXPECT_TRUE(CheckPerDeviceCountObservations(
+        expected_per_device_count_obs[offset],
+        expected_report_participation_obs[offset], observation_store_.get(),
+        update_recipient_.get()));
+  }
+}
+
+// Tests that the expected PerDeviceCountObservations are generated when events
+// are logged over multiple days and when Observations are backfilled for some
+// days during that period, without any garbage-collection of the
+// LocalAggregateStore.
+//
+// The logging pattern and set of Observations for each day index is the same as
+// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest, CheckObservationValuesWithBackfill) {
+  auto start_day_index = CurrentDayIndex();
+  // Set |backfill_days_| to 3.
+  size_t backfill_days = 3;
+  SetBackfillDays(backfill_days);
+  // Log events for 9 days. Call GenerateObservations() on the first 6 day
+  // indices, and the 9th.
+  uint32_t num_days = 9;
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    ResetObservationStore();
+    for (uint32_t event_code = 1; event_code < 3; event_code++) {
+      if (offset > 0 && (offset % event_code == 0)) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "A", event_code, 3));
+      }
+      if (offset > 0 && offset % (2 * event_code) == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "B", event_code, 2));
+      }
+    }
+    if (offset < 6 || offset == 8) {
+      EXPECT_EQ(kOK, GenerateObservations(day_index));
+    }
+    // Make the set of Observations which are expected to be generated on
+    // |start_day_index + offset| and check it against the contents of the
+    // FakeObservationStore.
+    ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+    ExpectedReportParticipationObservations expected_report_participation_obs;
+    switch (offset) {
+      case 0: {
+        for (uint32_t day_index = start_day_index - backfill_days;
+             day_index <= start_day_index; day_index++) {
+          for (const auto& pair : MakeExpectedReportParticipationObservations(
+                   kPerDeviceCountExpectedParams, day_index)) {
+            expected_report_participation_obs.insert(pair);
+          }
+        }
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 1: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {{7, {{"A", 1u, 3}}},
+                                                      {30, {{"A", 1u, 3}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 2: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+            {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 3: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+            {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 4: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 5: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 8: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 6}] = {
+            {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 7}] = {
+            {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 8}] = {
+            {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+            {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+        for (uint32_t day_index = start_day_index + 6;
+             day_index <= start_day_index + 8; day_index++) {
+          for (const auto& pair : MakeExpectedReportParticipationObservations(
+                   kPerDeviceCountExpectedParams, day_index)) {
+            expected_report_participation_obs.insert(pair);
+          }
+        }
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      default:
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+    }
+    AdvanceClock(kDay);
+  }
+}
+
+// Tests that the expected Observations are generated for PerDeviceCount reports
+// when events are logged over multiple days and when Observations are
+// backfilled for some days during that period, and when the
+// LocalAggregatedStore is garbage-collected after each call to
+// GenerateObservations().
+//
+// The logging pattern and set of Observations for each day index is the same as
+// in PerDeviceCountEventAggregatorTest::CheckObservationValuesMultiDay. See
+// that test for documentation.
+TEST_F(PerDeviceCountEventAggregatorTest,
+       CheckObservationValuesWithBackfillAndGc) {
+  auto start_day_index = CurrentDayIndex();
+  // Set |backfill_days_| to 3.
+  size_t backfill_days = 3;
+  SetBackfillDays(backfill_days);
+  // Log events for 9 days. Call GenerateObservations() on the first 6 day
+  // indices, and the 9th.
+  uint32_t num_days = 9;
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    ResetObservationStore();
+    for (uint32_t event_code = 1; event_code < 3; event_code++) {
+      if (offset > 0 && (offset % event_code == 0)) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "A", event_code, 3));
+      }
+      if (offset > 0 && offset % (2 * event_code) == 0) {
+        EXPECT_EQ(kOK, LogPerDeviceCountEvent(kSettingsChangedMetricReportId,
+                                              day_index, "B", event_code, 2));
+      }
+    }
+    // Advance |mock_clock_| by 1 day.
+    AdvanceClock(kDay);
+    if (offset < 6 || offset == 8) {
+      // Generate Observations and garbage-collect, both for the previous day
+      // index according to |mock_clock_|. Back up the LocalAggregateStore and
+      // the AggregatedObservationHistoryStore.
+      DoScheduledTasksNow();
+    }
+    // Make the set of Observations which are expected to be generated on
+    // |start_day_index + offset| and check it against the contents of the
+    // FakeObservationStore.
+    ExpectedPerDeviceCountObservations expected_per_device_count_obs;
+    ExpectedReportParticipationObservations expected_report_participation_obs;
+    switch (offset) {
+      case 0: {
+        for (uint32_t day_index = start_day_index - backfill_days;
+             day_index <= start_day_index; day_index++) {
+          for (const auto& pair : MakeExpectedReportParticipationObservations(
+                   kPerDeviceCountExpectedParams, day_index)) {
+            expected_report_participation_obs.insert(pair);
+          }
+        }
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 1: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {{7, {{"A", 1u, 3}}},
+                                                      {30, {{"A", 1u, 3}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 2: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}},
+            {30, {{"A", 1u, 6}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 3: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}},
+            {30, {{"A", 1u, 9}, {"A", 2u, 3}, {"B", 1u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 4: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 12}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 5: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       day_index}] = {
+            {7, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 15}, {"A", 2u, 6}, {"B", 1u, 4}, {"B", 2u, 2}}}};
+        expected_report_participation_obs =
+            MakeExpectedReportParticipationObservations(
+                kPerDeviceCountExpectedParams, day_index);
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      case 8: {
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 6}] = {
+            {7, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 18}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 7}] = {
+            {7, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}},
+            {30, {{"A", 1u, 21}, {"A", 2u, 9}, {"B", 1u, 6}, {"B", 2u, 2}}}};
+        expected_per_device_count_obs[{kSettingsChangedMetricReportId,
+                                       start_day_index + 8}] = {
+            {7, {{"A", 1u, 21}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}},
+            {30, {{"A", 1u, 24}, {"A", 2u, 12}, {"B", 1u, 8}, {"B", 2u, 4}}}};
+        for (uint32_t day_index = start_day_index + 6;
+             day_index <= start_day_index + 8; day_index++) {
+          for (const auto& pair : MakeExpectedReportParticipationObservations(
+                   kPerDeviceCountExpectedParams, day_index)) {
+            expected_report_participation_obs.insert(pair);
+          }
+        }
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+        break;
+      }
+      default:
+        EXPECT_TRUE(CheckPerDeviceCountObservations(
+            expected_per_device_count_obs, expected_report_participation_obs,
+            observation_store_.get(), update_recipient_.get()));
+    }
+  }
+}
+
 // Tests GenerateObservations() and GarbageCollect() in the case where the
 // LocalAggregateStore contains aggregates for metrics with both UTC and LOCAL
 // time zone policies, and where the day index in local time may be less than
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto
index 28ceb2f..0f7837d 100644
--- a/logger/local_aggregation.proto
+++ b/logger/local_aggregation.proto
@@ -98,18 +98,30 @@
   ReportDefinition report = 3;
 }
 
-// A container used by the EventAggregator to store the latest day index for
-// which an Observation has been generated for each report, event code, and
-// window size.
+// A container used by the EventAggregator to store the history of generated
+// Observations, indexed by report.
 message AggregatedObservationHistoryStore {
   // Keyed by base64-encoded serializations of ReportAggregationKey messages.
   map<string, AggregatedObservationHistory> by_report_key = 1;
 }
 
+// A container for the history of Observations that have been generated for a
+// single report.
 message AggregatedObservationHistory {
+  // If this is a locally aggregated report, then AggregatedObservationHistory
+  // has one of the following types.
   oneof type {
+    // A container for the history of UniqueActivesObservations generated
+    // for a UNIQUE_N_DAY_ACTIVES report.
     UniqueActivesObservationHistory unique_actives_history = 1;
+    // A container for the history of PerDeviceCountObservations generated
+    // for a PER_DEVICE_COUNT_STATS report.
+    PerDeviceCountObservationHistory per_device_count_history = 2;
   }
+  // A container for the history of ReportParticipationObservations generated
+  // for this report. Unset if ReportParticipationObservations should not be
+  // generated for this report.
+  ReportParticipationObservationHistory report_participation_history = 100;
 }
 
 message UniqueActivesObservationHistory {
@@ -117,6 +129,27 @@
   map<uint32, HistoryByWindowSize> by_event_code = 1;
 }
 
+message PerDeviceCountObservationHistory {
+  // The history of PerDeviceCount Observations. Keyed by component string.
+  map<string, HistoryByEventCode> by_component = 1;
+}
+
+message ReportParticipationObservationHistory {
+  // The last day index for which a ReportParticipationObservation was
+  // generated for this report.
+  //
+  // TODO(pesk): if ReportParticipationObservation has a window_size
+  // field in the future, consider changing this to a repeated field
+  // where the k-th last_generated field holds the value for window
+  // size k.
+  uint32 last_generated = 1;
+}
+
+message HistoryByEventCode {
+  // Keyed by event code.
+  map<uint32, HistoryByWindowSize> by_event_code = 1;
+}
+
 message HistoryByWindowSize {
   // Keyed by window size. The value at a window size is the latest day index
   // for which an Observation has been generated for this report, event code,
diff --git a/logger/logger_test_utils.cc b/logger/logger_test_utils.cc
index b8eeba4..a998fac 100644
--- a/logger/logger_test_utils.cc
+++ b/logger/logger_test_utils.cc
@@ -24,6 +24,8 @@
 
 namespace cobalt {
 
+using crypto::byte;
+using crypto::hash::DIGEST_SIZE;
 using encoder::ClientSecret;
 using rappor::BasicRapporEncoder;
 using rappor::RapporConfigHelper;
@@ -33,6 +35,27 @@
 namespace logger {
 namespace testing {
 
+namespace {
+// Populates |*hash_out| with the SHA256 of |component|, unless |component|
+// is empty in which case *hash_out is set to the empty string also. An
+// empty string indicates that the component_name feature is not being used.
+// We expect this to be a common case and in this case there is no point
+// in using 32 bytes to represent the empty string. Returns true on success
+// and false on failure (unexpected).
+bool HashComponentNameIfNotEmpty(const std::string& component,
+                                 std::string* hash_out) {
+  CHECK(hash_out);
+  if (component.empty()) {
+    hash_out->resize(0);
+    return true;
+  }
+  hash_out->resize(DIGEST_SIZE);
+  return cobalt::crypto::hash::Hash(
+      reinterpret_cast<const byte*>(component.data()), component.size(),
+      reinterpret_cast<byte*>(&hash_out->front()));
+}
+}  // namespace
+
 bool PopulateMetricDefinitions(const char metric_string[],
                                MetricDefinitions* metric_definitions) {
   google::protobuf::TextFormat::Parser parser;
@@ -87,6 +110,17 @@
   return expected_obs;
 }
 
+ExpectedReportParticipationObservations
+MakeExpectedReportParticipationObservations(
+    const ExpectedAggregationParams& expected_params, uint32_t day_index) {
+  ExpectedReportParticipationObservations expected_obs;
+
+  for (const auto& report_pair : expected_params.window_sizes) {
+    expected_obs.insert({report_pair.first, day_index});
+  }
+  return expected_obs;
+}
+
 bool FetchObservations(std::vector<Observation2>* observations,
                        const std::vector<uint32_t>& expected_report_ids,
                        FakeObservationStore* observation_store,
@@ -261,8 +295,8 @@
   // when this method is called.
   ExpectedAggregationParams expected_params;
   // A container for the strings expected to appear in the |data| field of the
-  // BasicRapporObservation wrapped by the UniqueActivesObservation for a given
-  // MetricReportId, day index, window size, and event code.
+  // BasicRapporObservation wrapped by the UniqueActivesObservation for a
+  // given MetricReportId, day index, window size, and event code.
   std::map<std::pair<MetricReportId, uint32_t>,
            std::map<uint32_t, std::map<uint32_t, std::string>>>
       expected_values;
@@ -348,6 +382,123 @@
   return true;
 }
 
+bool CheckPerDeviceCountObservations(
+    ExpectedPerDeviceCountObservations expected_per_device_count_obs,
+    ExpectedReportParticipationObservations expected_report_participation_obs,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient) {
+  // An ExpectedAggregationParams struct describing the number of
+  // Observations for each report ID which are expected to be
+  // in the FakeObservationStore when this method is called.
+  ExpectedAggregationParams expected_params;
+  std::map<std::string, std::string> component_hashes;
+  for (const auto& id_pair : expected_report_participation_obs) {
+    expected_params.daily_num_obs++;
+    expected_params.num_obs_per_report[id_pair.first]++;
+    expected_params.metric_report_ids.insert(id_pair.first);
+  }
+  for (const auto& id_pair : expected_per_device_count_obs) {
+    for (const auto& window_size_pair : id_pair.second) {
+      auto window_size = window_size_pair.first;
+      expected_params.window_sizes[id_pair.first.first].insert(window_size);
+      for (const auto& expected_obs : window_size_pair.second) {
+        expected_params.daily_num_obs++;
+        expected_params.num_obs_per_report[id_pair.first.first]++;
+        std::string component = std::get<0>(expected_obs);
+        std::string component_hash;
+        HashComponentNameIfNotEmpty(component, &component_hash);
+        component_hashes[component_hash] = component;
+      }
+    }
+  }
+  // Fetch the contents of the ObservationStore and check that each
+  // received Observation corresponds to an element of |expected_values|.
+  std::vector<Observation2> observations;
+  if (!FetchAggregatedObservations(&observations, expected_params,
+                                   observation_store, update_recipient)) {
+    return false;
+  }
+  std::vector<Observation2> report_participation_obs;
+  std::vector<ObservationMetadata> report_participation_metadata;
+  std::vector<Observation2> per_device_count_obs;
+  std::vector<ObservationMetadata> per_device_count_metadata;
+  for (size_t i = 0; i < observations.size(); i++) {
+    if (observations.at(i).has_report_participation()) {
+      report_participation_obs.push_back(observations.at(i));
+      report_participation_metadata.push_back(
+          *observation_store->metadata_received[i]);
+    } else if (observations.at(i).has_per_device_count()) {
+      per_device_count_obs.push_back(observations.at(i));
+      per_device_count_metadata.push_back(
+          *observation_store->metadata_received[i]);
+    } else {
+      return false;
+    }
+  }
+  // Check the received PerDeviceCountObservations
+  for (size_t i = 0; i < per_device_count_obs.size(); i++) {
+    auto obs_key =
+        std::make_pair(MetricReportId(per_device_count_metadata[i].metric_id(),
+                                      per_device_count_metadata[i].report_id()),
+                       per_device_count_metadata[i].day_index());
+    auto report_iter = expected_per_device_count_obs.find(obs_key);
+    if (report_iter == expected_per_device_count_obs.end()) {
+      return false;
+    }
+    auto obs = per_device_count_obs.at(i);
+    uint32_t obs_window_size = obs.per_device_count().window_size();
+    auto window_iter = report_iter->second.find(obs_window_size);
+    if (window_iter == report_iter->second.end()) {
+      return false;
+    }
+    std::string obs_component_hash =
+        obs.per_device_count().integer_event_obs().component_name_hash();
+    std::string obs_component;
+    auto hash_iter = component_hashes.find(obs_component_hash);
+    if (hash_iter == component_hashes.end()) {
+      return false;
+    } else {
+      obs_component = component_hashes[obs_component_hash];
+    }
+    auto obs_tuple = std::make_tuple(
+        obs_component, obs.per_device_count().integer_event_obs().event_code(),
+        obs.per_device_count().integer_event_obs().value());
+    auto obs_iter = window_iter->second.find(obs_tuple);
+    if (obs_iter == window_iter->second.end()) {
+      return false;
+    }
+    expected_per_device_count_obs.at(obs_key)
+        .at(obs_window_size)
+        .erase(obs_tuple);
+    if (expected_per_device_count_obs.at(obs_key).at(obs_window_size).empty()) {
+      expected_per_device_count_obs.at(obs_key).erase(obs_window_size);
+    }
+    if (expected_per_device_count_obs.at(obs_key).empty()) {
+      expected_per_device_count_obs.erase(obs_key);
+    }
+  }
+  if (!expected_per_device_count_obs.empty()) {
+    return false;
+  }
+
+  // Check the received ReportParticipationObservations
+  for (size_t i = 0; i < report_participation_obs.size(); i++) {
+    auto obs_key = std::make_pair(
+        MetricReportId(report_participation_metadata[i].metric_id(),
+                       report_participation_metadata[i].report_id()),
+        report_participation_metadata[i].day_index());
+    if (expected_report_participation_obs.count(obs_key) == 0) {
+      return false;
+    }
+    expected_report_participation_obs.erase(obs_key);
+  }
+  if (!expected_report_participation_obs.empty()) {
+    return false;
+  }
+
+  return true;
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace cobalt
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index 16a25b6..a10ccad 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -9,6 +9,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -129,6 +130,25 @@
                  std::map<uint32_t, std::vector<bool>>>
     ExpectedUniqueActivesObservations;
 
+// A representation of a set of expected PerDeviceCountObservations. Used to
+// check the values of PerDeviceCountObservations generated by the
+// EventAggregator.
+//
+// The outer map is keyed by pairs (MetricReportId, day_index), where the day
+// index represents the day index of the expected Observation.
+typedef std::map<
+    std::pair<MetricReportId, uint32_t>,
+    std::map<uint32_t, std::set<std::tuple<std::string, uint32_t, int64_t>>>>
+    ExpectedPerDeviceCountObservations;
+
+// A representation of a set of expected ReportParticipationObservations. Used
+// to check the values of ReportParticipationObservations generated by the
+// EventAggregator. The first element of each pair is the MetricReportId of a
+// report, and the second element represents the day index of an expected
+// Observation for that report. a pair is a a set of window sizes.
+typedef std::set<std::pair<MetricReportId, uint32_t>>
+    ExpectedReportParticipationObservations;
+
 // Populates a MetricDefinitions proto message from a serialized representation.
 bool PopulateMetricDefinitions(const char metric_string[],
                                MetricDefinitions* metric_definitions);
@@ -156,6 +176,14 @@
 ExpectedUniqueActivesObservations MakeNullExpectedUniqueActivesObservations(
     const ExpectedAggregationParams& expected_params, uint32_t day_index);
 
+// Given an ExpectedAggregationParams struct |expected_params|, return an
+// ExpectedReportParticipationObservations containing a pair
+// (|metric_report_id|, |day_index|) for each MetricReportId |metric_report_id|
+// in |expected_params|.
+ExpectedReportParticipationObservations
+MakeExpectedReportParticipationObservations(
+    const ExpectedAggregationParams& expected_params, uint32_t day_index);
+
 // Populates |observations| with the contents of a FakeObservationStore.
 // |observations| should be a vector whose size is equal to the number
 // of expected observations. Checks the the ObservationStore contains
@@ -207,6 +235,18 @@
     FakeObservationStore* observation_store,
     TestUpdateRecipient* update_recipient);
 
+// Checks that the Observations contained in a FakeObservationStore are exactly
+// the PerDeviceCountObservations and ReportParticipationObservations that
+// should be generated for a single day index given a representation of the
+// expected activity indicators for that day, for each PerDeviceCount report,
+// for each window size and event code, for a config whose locally aggregated
+// reports are all of type PER_DEVICE_COUNT_STATS.
+bool CheckPerDeviceCountObservations(
+    ExpectedPerDeviceCountObservations expected_per_device_count_obs,
+    ExpectedReportParticipationObservations expected_report_participation_obs,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient);
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace cobalt