[EventAggregator] Log events for PER_DEVICE_COUNT_STATS reports

Adds a method to log CountEvents to the EventAggregator for reports
of type PER_DEVICE_COUNT_STATS. Updates the config update
method and the garbage collection method to handle the new type of
local aggregates.

Observation generation for PER_DEVICE_COUNT_STATS reports
will be added in a follow-up CL.

Change-Id: Ic522f07e1264540ba02cb1dc721e72c1b033b38b
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
index cd7a20b..706ef72 100644
--- a/logger/event_aggregator.cc
+++ b/logger/event_aggregator.cc
@@ -26,6 +26,80 @@
 
 namespace logger {
 
+namespace {
+
+// Creates an AggregationConfig from a ProjectContext, MetricDefinition, and
+// ReportDefinition and populates the aggregation_config field of a specified
+// ReportAggregates. Also sets the type of the ReportAggregates based on the
+// ReportDefinition's type.
+bool PopulateReportAggregates(const ProjectContext& project_context,
+                              const MetricDefinition& metric,
+                              const ReportDefinition& report,
+                              ReportAggregates* report_aggregates) {
+  AggregationConfig* aggregation_config =
+      report_aggregates->mutable_aggregation_config();
+  *aggregation_config->mutable_project() = project_context.project();
+  *aggregation_config->mutable_metric() =
+      *project_context.GetMetric(metric.id());
+  *aggregation_config->mutable_report() = report;
+  switch (report.report_type()) {
+    case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+      report_aggregates->set_allocated_unique_actives_aggregates(
+          new UniqueActivesReportAggregates);
+      return true;
+    }
+    case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+      report_aggregates->set_allocated_count_aggregates(
+          new PerDeviceCountReportAggregates);
+      return true;
+    }
+    default:
+      return false;
+  }
+}
+
+// Populates a ReportAggregationKey proto message and then populates a string
+// with the base64 encoding of the serialized proto.
+bool PopulateReportKey(uint32_t customer_id, uint32_t project_id,
+                       uint32_t metric_id, uint32_t report_id,
+                       std::string* key) {
+  ReportAggregationKey key_data;
+  key_data.set_customer_id(customer_id);
+  key_data.set_project_id(project_id);
+  key_data.set_metric_id(metric_id);
+  key_data.set_report_id(report_id);
+  return SerializeToBase64(key_data, key);
+}
+
+// Given a ProjectContext, MetricDefinition, and ReportDefinition and a pointer
+// to the LocalAggregateStore, checks whether a key with the same customer,
+// project, metric, and report ID already exists in the LocalAggregateStore. If
+// not, creates and inserts a new key and value. Returns kInvalidArguments if
+// creation of the key or value fails, and kOK otherwise. The caller should hold
+// the mutex protecting the LocalAggregateStore.
+Status MaybeInsertReportConfig(const ProjectContext& project_context,
+                               const MetricDefinition& metric,
+                               const ReportDefinition& report,
+                               LocalAggregateStore* store) {
+  std::string key;
+  if (!PopulateReportKey(project_context.project().customer_id(),
+                         project_context.project().project_id(), metric.id(),
+                         report.id(), &key)) {
+    return kInvalidArguments;
+  }
+  ReportAggregates report_aggregates;
+  if (store->by_report_key().count(key) == 0) {
+    if (!PopulateReportAggregates(project_context, metric, report,
+                                  &report_aggregates)) {
+      return kInvalidArguments;
+    }
+    (*store->mutable_by_report_key())[key] = report_aggregates;
+  }
+  return kOK;
+}
+
+}  // namespace
+
 EventAggregator::EventAggregator(
     const Encoder* encoder, const ObservationWriter* observation_writer,
     ConsistentProtoStore* local_aggregate_proto_store,
@@ -42,9 +116,9 @@
          "generate_obs_interval";
   CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
       << "aggregate_backup_interval must be less than or equal to gc_interval";
-  CHECK_LE(backfill_days, kEventAggregatorMaxAllowedBackfillDays)
+  CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
       << "backfill_days must be less than or equal to "
-      << kEventAggregatorMaxAllowedBackfillDays;
+      << kMaxAllowedBackfillDays;
   aggregate_backup_interval_ = aggregate_backup_interval;
   generate_obs_interval_ = generate_obs_interval;
   gc_interval_ = gc_interval;
@@ -76,14 +150,14 @@
   auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
   switch (restore_history_status.error_code()) {
     case StatusCode::OK: {
-      VLOG(4) << "Read AggregatedObservationHistory from disk.";
+      VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
       break;
     }
     case StatusCode::NOT_FOUND: {
-      VLOG(4)
-          << "No file found for obs_history_proto_store. Proceeding "
-             "with empty AggregatedObservationHistory. File will be created on "
-             "first snapshot of the AggregatedObservationHistory.";
+      VLOG(4) << "No file found for obs_history_proto_store. Proceeding "
+                 "with empty AggregatedObservationHistoryStore. File will be "
+                 "created on first snapshot of the "
+                 "AggregatedObservationHistoryStore.";
       break;
     }
     default: {
@@ -93,7 +167,7 @@
                  << "\nError details: "
                  << restore_history_status.error_details()
                  << "\nProceeding with empty AggregatedObservationHistory.";
-      obs_history_ = AggregatedObservationHistory();
+      obs_history_ = AggregatedObservationHistoryStore();
     }
   }
   clock_.reset(new SystemClock());
@@ -111,38 +185,40 @@
 // sizes are <= |kMaxAllowedAggregationWindowSize|. Additionally, have
 // this method filter out any window sizes larger than
 // |kMaxAllowedAggregationWindowSize|.
+//
+// TODO(pesk): update the EventAggregator's view of a Metric
+// or ReportDefinition when appropriate.
 Status EventAggregator::UpdateAggregationConfigs(
     const ProjectContext& project_context) {
   auto locked = protected_aggregate_store_.lock();
-  std::string key;
-  ReportAggregationKey key_data;
-  key_data.set_customer_id(project_context.project().customer_id());
-  key_data.set_project_id(project_context.project().project_id());
+  Status status;
   for (const auto& metric : project_context.metric_definitions()->metric()) {
     switch (metric.metric_type()) {
       case MetricDefinition::EVENT_OCCURRED: {
-        key_data.set_metric_id(metric.id());
         for (const auto& report : metric.reports()) {
           switch (report.report_type()) {
             case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
-              key_data.set_report_id(report.id());
-              if (!SerializeToBase64(key_data, &key)) {
-                return kInvalidArguments;
+              status =
+                  MaybeInsertReportConfig(project_context, metric, report,
+                                          &(locked->local_aggregate_store));
+              if (status != kOK) {
+                return status;
               }
-              // TODO(pesk): update the EventAggregator's view of a Metric
-              // or ReportDefinition when appropriate.
-              if (locked->local_aggregate_store.aggregates().count(key) == 0) {
-                AggregationConfig aggregation_config;
-                *aggregation_config.mutable_project() =
-                    project_context.project();
-                *aggregation_config.mutable_metric() =
-                    *project_context.GetMetric(metric.id());
-                *aggregation_config.mutable_report() = report;
-                ReportAggregates report_aggregates;
-                *report_aggregates.mutable_aggregation_config() =
-                    aggregation_config;
-                (*locked->local_aggregate_store.mutable_aggregates())[key] =
-                    report_aggregates;
+            }
+            default:
+              continue;
+          }
+        }
+      }
+      case MetricDefinition::EVENT_COUNT: {
+        for (const auto& report : metric.reports()) {
+          switch (report.report_type()) {
+            case ReportDefinition::PER_DEVICE_COUNT_STATS: {
+              status =
+                  MaybeInsertReportConfig(project_context, metric, report,
+                                          &(locked->local_aggregate_store));
+              if (status != kOK) {
+                return status;
               }
             }
             default:
@@ -164,30 +240,73 @@
                   "accept OccurrenceEvents.";
     return kInvalidArguments;
   }
-  ReportAggregationKey key_data;
-  key_data.set_customer_id(event_record->metric->customer_id());
-  key_data.set_project_id(event_record->metric->project_id());
-  key_data.set_metric_id(event_record->metric->id());
-  key_data.set_report_id(report_id);
   std::string key;
-  if (!SerializeToBase64(key_data, &key)) {
+  if (!PopulateReportKey(event_record->metric->customer_id(),
+                         event_record->metric->project_id(),
+                         event_record->metric->id(), report_id, &key)) {
     return kInvalidArguments;
   }
   auto locked = protected_aggregate_store_.lock();
   auto aggregates =
-      locked->local_aggregate_store.mutable_aggregates()->find(key);
-  if (aggregates == locked->local_aggregate_store.mutable_aggregates()->end()) {
+      locked->local_aggregate_store.mutable_by_report_key()->find(key);
+  if (aggregates ==
+      locked->local_aggregate_store.mutable_by_report_key()->end()) {
     LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
     return kInvalidArguments;
   }
-  (*(*aggregates->second.mutable_by_event_code())
-        [event_record->event->occurrence_event().event_code()]
-            .mutable_by_day_index())[event_record->event->day_index()]
+  if (!aggregates->second.has_unique_actives_aggregates()) {
+    LOG(ERROR) << "The local aggregates for this report key are not of type "
+                  "UniqueActivesReportAggregates.";
+    return kInvalidArguments;
+  }
+  (*(*aggregates->second.mutable_unique_actives_aggregates()
+          ->mutable_by_event_code())[event_record->event->occurrence_event()
+                                         .event_code()]
+        .mutable_by_day_index())[event_record->event->day_index()]
       .mutable_activity_daily_aggregate()
       ->set_activity_indicator(true);
   return kOK;
 }
 
+Status EventAggregator::LogPerDeviceCountEvent(uint32_t report_id,
+                                               EventRecord* event_record) {
+  if (!event_record->event->has_count_event()) {
+    LOG(ERROR) << "EventAggregator::LogPerDeviceCountEvent can only accept "
+                  "CountEvents.";
+    return kInvalidArguments;
+  }
+  std::string key;
+  if (!PopulateReportKey(event_record->metric->customer_id(),
+                         event_record->metric->project_id(),
+                         event_record->metric->id(), report_id, &key)) {
+    return kInvalidArguments;
+  }
+  auto locked = protected_aggregate_store_.lock();
+  auto aggregates =
+      locked->local_aggregate_store.mutable_by_report_key()->find(key);
+  if (aggregates ==
+      locked->local_aggregate_store.mutable_by_report_key()->end()) {
+    LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
+    return kInvalidArguments;
+  }
+  if (!aggregates->second.has_count_aggregates()) {
+    LOG(ERROR) << "The local aggregates for this report key are not of type "
+                  "PerDeviceCountReportAggregates.";
+    return kInvalidArguments;
+  }
+  auto daily_aggregate =
+      (*(*(*aggregates->second.mutable_count_aggregates()
+                ->mutable_by_component())[event_record->event->count_event()
+                                              .component()]
+              .mutable_by_event_code())[event_record->event->count_event()
+                                            .event_code(0)]
+            .mutable_by_day_index())[event_record->event->day_index()]
+          .mutable_count_daily_aggregate();
+  daily_aggregate->set_count(daily_aggregate->count() +
+                             event_record->event->count_event().count());
+  return kOK;
+}
+
 Status EventAggregator::GenerateObservationsNoWorker(
     uint32_t final_day_index_utc, uint32_t final_day_index_local) {
   if (worker_thread_.joinable()) {
@@ -216,7 +335,7 @@
 Status EventAggregator::BackUpObservationHistory() {
   auto status = obs_history_proto_store_->Write(obs_history_);
   if (!status.ok()) {
-    LOG(ERROR) << "Failed to back up the AggregatedObservationHistory. "
+    LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
                   "::cobalt::util::Status error code: "
                << status.error_code()
                << "\nError message: " << status.error_message()
@@ -248,19 +367,22 @@
   // Acquire the mutex protecting the shutdown flag and condition variable.
   auto locked = protected_shutdown_flag_.lock();
   while (true) {
-    // If shutdown has been requested, back up the LocalAggregateStore and exit.
+    // If shutdown has been requested, back up the LocalAggregateStore and
+    // exit.
     if (locked->shut_down) {
       BackUpLocalAggregateStore();
       return;
     }
-    // Sleep until the next scheduled backup of the LocalAggregateStore or until
-    // notified of shutdown. Back up the LocalAggregateStore after waking.
+    // Sleep until the next scheduled backup of the LocalAggregateStore or
+    // until notified of shutdown. Back up the LocalAggregateStore after
+    // waking.
     auto shutdown_requested = locked.wait_for_with(
         &(locked->shutdown_notifier), aggregate_backup_interval_,
         [&locked]() { return locked->shut_down; });
     BackUpLocalAggregateStore();
-    // If the worker thread was woken up by a shutdown request, exit. Otherwise,
-    // complete any scheduled Observation generation and garbage collection.
+    // If the worker thread was woken up by a shutdown request, exit.
+    // Otherwise, complete any scheduled Observation generation and garbage
+    // collection.
     if (shutdown_requested) {
       return;
     }
@@ -308,7 +430,7 @@
   // Lock, copy the LocalAggregateStore, and release the lock. Use the copy to
   // generate observations.
   auto local_aggregate_store = CopyLocalAggregateStore();
-  for (auto pair : local_aggregate_store.aggregates()) {
+  for (auto pair : local_aggregate_store.by_report_key()) {
     const auto& config = pair.second.aggregation_config();
 
     const auto& metric = config.metric();
@@ -380,7 +502,7 @@
     day_index_local = day_index_utc;
   }
   auto locked = protected_aggregate_store_.lock();
-  for (auto pair : locked->local_aggregate_store.aggregates()) {
+  for (auto pair : locked->local_aggregate_store.by_report_key()) {
     uint32_t day_index;
     switch (pair.second.aggregation_config().metric().time_zone_policy()) {
       case MetricDefinition::UTC: {
@@ -414,41 +536,114 @@
       LOG(ERROR) << "day_index must be >= backfill_days_ + max_window_size.";
       return kInvalidArguments;
     }
-    // For each event code, iterate over the sub-map of local aggregates
-    // keyed by day index. Keep buckets with day indices greater than
-    // |day_index| - |backfill_days_| - |max_window_size|, and remove
-    // all buckets with smaller day indices.
-    for (auto event_code_aggregates : pair.second.by_event_code()) {
-      for (auto day_aggregates : event_code_aggregates.second.by_day_index()) {
-        if (day_aggregates.first <=
-            day_index - backfill_days_ - max_window_size) {
-          locked->local_aggregate_store.mutable_aggregates()
-              ->at(pair.first)
-              .mutable_by_event_code()
-              ->at(event_code_aggregates.first)
-              .mutable_by_day_index()
-              ->erase(day_aggregates.first);
+    // For each ReportAggregates, descend to and iterate over the sub-map of
+    // local aggregates keyed by day index. Keep buckets with day indices
+    // greater than |day_index| - |backfill_days_| - |max_window_size|, and
+    // remove all buckets with smaller day indices.
+    switch (pair.second.type_case()) {
+      case ReportAggregates::kUniqueActivesAggregates: {
+        for (auto event_code_aggregates :
+             pair.second.unique_actives_aggregates().by_event_code()) {
+          for (auto day_aggregates :
+               event_code_aggregates.second.by_day_index()) {
+            if (day_aggregates.first <=
+                day_index - backfill_days_ - max_window_size) {
+              locked->local_aggregate_store.mutable_by_report_key()
+                  ->at(pair.first)
+                  .mutable_unique_actives_aggregates()
+                  ->mutable_by_event_code()
+                  ->at(event_code_aggregates.first)
+                  .mutable_by_day_index()
+                  ->erase(day_aggregates.first);
+            }
+          }
+          // If the day index map under this event code is empty, remove the
+          // event code from the event code-keyed map under this
+          // ReportAggregationKey.
+          if (locked->local_aggregate_store.by_report_key()
+                  .at(pair.first)
+                  .unique_actives_aggregates()
+                  .by_event_code()
+                  .at(event_code_aggregates.first)
+                  .by_day_index()
+                  .empty()) {
+            locked->local_aggregate_store.mutable_by_report_key()
+                ->at(pair.first)
+                .mutable_unique_actives_aggregates()
+                ->mutable_by_event_code()
+                ->erase(event_code_aggregates.first);
+          }
         }
+        break;
       }
-      // If the day index map under this event code is empty, remove the event
-      // code from the event code map under this ReportAggregationKey.
-      if (locked->local_aggregate_store.aggregates()
-              .at(pair.first)
-              .by_event_code()
-              .at(event_code_aggregates.first)
-              .by_day_index()
-              .empty()) {
-        locked->local_aggregate_store.mutable_aggregates()
-            ->at(pair.first)
-            .mutable_by_event_code()
-            ->erase(event_code_aggregates.first);
+      case ReportAggregates::kCountAggregates: {
+        for (auto component_aggregates :
+             pair.second.count_aggregates().by_component()) {
+          for (auto event_code_aggregates :
+               component_aggregates.second.by_event_code()) {
+            for (auto day_aggregates :
+                 event_code_aggregates.second.by_day_index()) {
+              if (day_aggregates.first <=
+                  day_index - backfill_days_ - max_window_size) {
+                locked->local_aggregate_store.mutable_by_report_key()
+                    ->at(pair.first)
+                    .mutable_count_aggregates()
+                    ->mutable_by_component()
+                    ->at(component_aggregates.first)
+                    .mutable_by_event_code()
+                    ->at(event_code_aggregates.first)
+                    .mutable_by_day_index()
+                    ->erase(day_aggregates.first);
+              }
+            }
+            // If the day index map under this event code is empty, remove the
+            // event code from the event code-keyed map under this
+            // ReportAggregationKey.
+            if (locked->local_aggregate_store.by_report_key()
+                    .at(pair.first)
+                    .count_aggregates()
+                    .by_component()
+                    .at(component_aggregates.first)
+                    .by_event_code()
+                    .at(event_code_aggregates.first)
+                    .by_day_index()
+                    .empty()) {
+              locked->local_aggregate_store.mutable_by_report_key()
+                  ->at(pair.first)
+                  .mutable_count_aggregates()
+                  ->mutable_by_component()
+                  ->at(component_aggregates.first)
+                  .mutable_by_event_code()
+                  ->erase(event_code_aggregates.first);
+            }
+          }
+          // If the event code map under this component string is empty,
+          // remove the component string from the component-keyed map under
+          // this ReportAggregationKey.
+          if (locked->local_aggregate_store.by_report_key()
+                  .at(pair.first)
+                  .count_aggregates()
+                  .by_component()
+                  .at(component_aggregates.first)
+                  .by_event_code()
+                  .empty()) {
+            locked->local_aggregate_store.mutable_by_report_key()
+                ->at(pair.first)
+                .mutable_count_aggregates()
+                ->mutable_by_component()
+                ->erase(component_aggregates.first);
+          }
+        }
+        break;
       }
+      default:
+        continue;
     }
   }
   return kOK;
 }
 
-////////// GenerateUniqueActivesObservations and helper methods ////////////////
+////////// GenerateUniqueActivesObservations and helper methods ///////////////
 
 // Given the set of daily aggregates for a fixed event code, and the size and
 // end date of an aggregation window, returns the first day index within that
@@ -478,16 +673,18 @@
           active_day_index > obs_day_index - window_size);
 }
 
-uint32_t EventAggregator::LastGeneratedDayIndex(const std::string& report_key,
-                                                uint32_t event_code,
-                                                uint32_t window_size) const {
+uint32_t EventAggregator::UniqueActivesLastGeneratedDayIndex(
+    const std::string& report_key, uint32_t event_code,
+    uint32_t window_size) const {
   auto report_history = obs_history_.by_report_key().find(report_key);
   if (report_history == obs_history_.by_report_key().end()) {
     return 0u;
   }
   auto event_code_history =
-      report_history->second.by_event_code().find(event_code);
-  if (event_code_history == report_history->second.by_event_code().end()) {
+      report_history->second.unique_actives_history().by_event_code().find(
+          event_code);
+  if (event_code_history ==
+      report_history->second.unique_actives_history().by_event_code().end()) {
     return 0u;
   }
   auto window_size_history =
@@ -527,10 +724,13 @@
     const ReportAggregates& report_aggregates, uint32_t num_event_codes,
     uint32_t final_day_index) {
   for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
-    auto daily_aggregates = report_aggregates.by_event_code().find(event_code);
+    auto daily_aggregates =
+        report_aggregates.unique_actives_aggregates().by_event_code().find(
+            event_code);
     // Have any events ever been logged for this report and event code?
     bool found_event_code =
-        (daily_aggregates != report_aggregates.by_event_code().end());
+        (daily_aggregates !=
+         report_aggregates.unique_actives_aggregates().by_event_code().end());
     for (uint32_t window_size :
          report_aggregates.aggregation_config().report().window_size()) {
       // Skip any window size larger than
@@ -544,8 +744,8 @@
       // been generated for this report, event code, and window size. If
       // that day index is later than |final_day_index|, no Observation is
       // generated on this invocation.
-      auto last_gen =
-          LastGeneratedDayIndex(report_key, event_code, window_size);
+      auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code,
+                                                         window_size);
       auto first_day_index =
           std::max(last_gen + 1, uint32_t(final_day_index - backfill_days_));
       // The latest day index on which |event_type| is known to have
@@ -585,7 +785,8 @@
         // Update |obs_history_| with the latest date of Observation
         // generation for this report, event code, and window size.
         (*(*(*obs_history_.mutable_by_report_key())[report_key]
-                .mutable_by_event_code())[event_code]
+                .mutable_unique_actives_history()
+                ->mutable_by_event_code())[event_code]
               .mutable_by_window_size())[window_size] = obs_day_index;
       }
     }
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h
index b9b9a2b..8fbaf24 100644
--- a/logger/event_aggregator.h
+++ b/logger/event_aggregator.h
@@ -27,17 +27,6 @@
 namespace cobalt {
 namespace logger {
 
-// Maximum value of |backfill_days| allowed by the constructor of
-// EventAggregator.
-static const size_t kEventAggregatorMaxAllowedBackfillDays = 1000;
-// EventAggregator::GenerateObservations() ignores all aggregation window
-// sizes larger than this value.
-static const size_t kMaxAllowedAggregationWindowSize = 365;
-// The number of seconds in an hour.
-static const size_t kHour = 3600;
-// The number of seconds in a day.
-static const size_t kDay = kHour * 24;
-
 // The EventAggregator class manages an in-memory store of aggregated values
 // of Events logged for locally aggregated report types. For each day, this
 // LocalAggregateStore contains an aggregate of the values of logged Events of
@@ -64,6 +53,12 @@
 // needed to compute aggregates for any windows of interest in the future.
 class EventAggregator {
  public:
+  // Maximum value of |backfill_days| allowed by the constructor.
+  static const size_t kMaxAllowedBackfillDays = 1000;
+  // GenerateObservations() ignores all aggregation window sizes larger than
+  // this value.
+  static const size_t kMaxAllowedAggregationWindowSize = 365;
+
   // Constructs an EventAggregator.
   //
   // An EventAggregator maintains daily aggregates of Events in a
@@ -110,10 +105,9 @@
       util::ConsistentProtoStore* obs_history_proto_store,
       const size_t backfill_days = 0,
       const std::chrono::seconds aggregate_backup_interval =
-          std::chrono::seconds(60),
-      const std::chrono::seconds generate_obs_interval =
-          std::chrono::seconds(kHour),
-      const std::chrono::seconds gc_interval = std::chrono::seconds(kDay));
+          std::chrono::minutes(1),
+      const std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
+      const std::chrono::seconds gc_interval = std::chrono::hours(24));
 
   // Shut down the worker thread before destructing the EventAggregator.
   ~EventAggregator() { ShutDown(); }
@@ -155,6 +149,23 @@
   // called.
   Status LogUniqueActivesEvent(uint32_t report_id, EventRecord* event_record);
 
+  // Logs an Event associated to a report of type
+  // PER_DEVICE_COUNT_STATS to the EventAggregator.
+  //
+  // report_id: the ID of the report associated to the logged Event.
+  //
+  // event_record: an EventRecord wrapping an Event of type CountEvent
+  // and the MetricDefinition for which the Event is to be logged.
+  //
+  // Returns kOK if the LocalAggregateStore was successfully updated, and
+  // kInvalidArguments if either a lookup key corresponding to |report_id| was
+  // not found in the LocalAggregateStore, or if the Event wrapped by
+  // EventRecord is not of type CountEvent.
+  //
+  // Currently compatible only with EVENT_COUNT metrics with a single event code
+  // dimension. TODO(pesk, zmbush): support multiple event codes.
+  Status LogPerDeviceCountEvent(uint32_t report_id, EventRecord* event_record);
+
   // Checks that the worker thread is shut down, and if so, calls the private
   // method GenerateObservations() and returns its result. Returns kOther if the
   // worker thread is joinable. See the documentation on GenerateObservations()
@@ -248,12 +259,12 @@
   Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u);
 
   // Returns the most recent day index for which an Observation was generated
-  // for a given report, event code, and window size, according to
-  // |obs_history_|. Returns 0 if no Observation has been generated for the
-  // given arguments.
-  uint32_t LastGeneratedDayIndex(const std::string& report_key,
-                                 uint32_t event_code,
-                                 uint32_t window_size) const;
+  // for a given UNIQUE_N_DAY_ACTIVES report, event code, and window size,
+  // according to |obs_history_|. Returns 0 if no Observation has been generated
+  // for the given arguments.
+  uint32_t UniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+                                              uint32_t event_code,
+                                              uint32_t window_size) const;
 
   // For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
   // for each event code of the parent metric, for each window size of the
@@ -305,7 +316,7 @@
   util::ConsistentProtoStore* obs_history_proto_store_;
   util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
   // Not protected by a mutex. Should only be accessed by |worker_thread_|.
-  AggregatedObservationHistory obs_history_;
+  AggregatedObservationHistoryStore obs_history_;
   size_t backfill_days_ = 0;
 
   std::thread worker_thread_;
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
index 6958cea..6ba882e 100644
--- a/logger/event_aggregator_test.cc
+++ b/logger/event_aggregator_test.cc
@@ -47,6 +47,8 @@
 using testing::TestUpdateRecipient;
 
 namespace {
+// Number of seconds in a day
+const int kDay = 60 * 60 * 24;
 // Number of seconds in an ideal year
 const int kYear = kDay * 365;
 
@@ -67,6 +69,73 @@
 const MetricReportId kFeaturesActiveMetricReportId = MetricReportId(20, 201);
 const MetricReportId kErrorsOccurredMetricReportId = MetricReportId(30, 302);
 const MetricReportId kEventsOccurredMetricReportId = MetricReportId(40, 402);
+const MetricReportId kConnectionFailuresMetricReportId =
+    MetricReportId(50, 501);
+const MetricReportId kSettingsChangedMetricReportId = MetricReportId(60, 601);
+
+// A set of metric definitions of various types, each with a locally aggregated
+// report.
+static const char kMetricDefinitions[] = R"(
+metric {
+  metric_name: "ErrorsOccurred"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 30
+  max_event_code: 2
+  reports: {
+    report_name: "ErrorsOccurred_SimpleCount"
+    id: 301
+    report_type: SIMPLE_OCCURRENCE_COUNT
+    local_privacy_noise_level: NONE
+  }
+  reports: {
+    report_name: "ErrorsOccurred_UniqueDevices"
+    id: 302
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: LARGE
+    window_size: 1
+    window_size: 7
+    window_size: 30
+  }
+}
+
+metric {
+  metric_name: "ConnectionFailures"
+  metric_type: EVENT_COUNT
+  customer_id: 1
+  project_id: 1
+  id: 50
+  reports: {
+    report_name: "ConnectionFailures_PerDeviceCount"
+    id: 501
+    report_type: PER_DEVICE_COUNT_STATS
+    window_size: 1
+  }
+}
+
+)";
+
+// Properties of the locally aggregated Observations which should be generated
+// for the reports in |kMetricDefinitions|, assuming that no events have ever
+// been logged for those reports.
+static const ExpectedAggregationParams kExpectedParams = {
+    /* The total number of locally aggregated Observations that should be
+       generated for each day index. */
+    9,
+    /* The MetricReportIds of the locally aggregated reports in this
+       configuration. */
+    {kErrorsOccurredMetricReportId, kConnectionFailuresMetricReportId},
+    /* The number of Observations which should be generated for each day index,
+       broken down by MetricReportId. */
+    {{kErrorsOccurredMetricReportId, 9},
+     {kConnectionFailuresMetricReportId, 0}},
+    /* The number of event codes for each report of type UNIQUE_N_DAY_ACTIVES,
+       by MetricReportId. */
+    {{kErrorsOccurredMetricReportId, 3}},
+    /* The set of window sizes for each MetricReportId. */
+    {{kErrorsOccurredMetricReportId, {1, 7, 30}},
+     {kConnectionFailuresMetricReportId, {1}}}};
 
 // A set of metric definitions of type EVENT_OCCURRED, each of which has a
 // UNIQUE_N_DAY_ACTIVES report.
@@ -239,6 +308,63 @@
      {kFeaturesActiveMetricReportId, {1, 7, 30}},
      {kEventsOccurredMetricReportId, {1, 7}}}};
 
+// A set of MetricDefinitions of type EVENT_COUNT, each of which has a
+// ReportDefinition of type PER_DEVICE_COUNT_STATS.
+static const char kPerDeviceCountMetricDefinitions[] = R"(
+metric {
+  metric_name: "ConnectionFailures"
+  metric_type: EVENT_COUNT
+  customer_id: 1
+  project_id: 1
+  id: 50
+  reports: {
+    report_name: "ConnectionFailures_PerDeviceCount"
+    id: 501
+    report_type: PER_DEVICE_COUNT_STATS
+    window_size: 1
+  }
+}
+
+metric {
+  metric_name: "SettingsChanged"
+  metric_type: EVENT_COUNT
+  customer_id: 1
+  project_id: 1
+  id: 60
+  reports: {
+    report_name: "SettingsChanged_PerDeviceCount"
+    id: 601
+    report_type: PER_DEVICE_COUNT_STATS
+    window_size: 7
+    window_size: 30
+  }
+}
+
+)";
+
+// Properties of the locally aggregated Observations which should be generated
+// for the reports in |kMetricDefinitions|, assuming that no events have ever
+// been logged for those reports.
+//
+// TODO(pesk): update these fields once the EventAggregator is generating
+// observations for PerDeviceCount reports.
+static const ExpectedAggregationParams kPerDeviceCountExpectedParams = {
+    /* The total number of Observations that should be generated for a day
+       index. */
+    0,
+    /* The MetricReportIds of the locally aggregated reports in this
+       configuration. */
+    {kConnectionFailuresMetricReportId, kSettingsChangedMetricReportId},
+    /* The number of Observations which should be generated for a day index,
+       broken down by MetricReportId. */
+    {{kConnectionFailuresMetricReportId, 0},
+     {kSettingsChangedMetricReportId, 0}},
+    /* Omitted because this config contains no UNIQUE_N_DAY_ACTIVES reports. */
+    {},
+    /* The set of window sizes for each MetricReportId. */
+    {{kConnectionFailuresMetricReportId, {1}},
+     {kSettingsChangedMetricReportId, {7, 30}}}};
+
 // A set of MetricDefinitions including one with TimeZonePolicy UTC and one with
 // TimeZonePolicy LOCAL.
 static const char kNoiseFreeMixedTimeZoneMetricDefinitions[] = R"(
@@ -300,6 +426,15 @@
 typedef std::map<std::string, std::map<uint32_t, std::set<uint32_t>>>
     LoggedActivity;
 
+// A map used in tests as a record, external to the LocalAggregateStore, of the
+// activity logged for PER_DEVICE_COUNT_STATS reports. The keys are, in
+// descending order, serialized ReportAggregationKeys, components, event codes,
+// and day indices. The innermost value is a count.
+typedef std::map<
+    std::string,
+    std::map<std::string, std::map<uint32_t, std::map<uint32_t, int64_t>>>>
+    LoggedCounts;
+
 // Given a string representing a MetricDefinitions proto message, creates a
 // ProjectContext from that MetricDefinitions and returns a unique pointer.
 std::unique_ptr<ProjectContext> MakeProjectContext(const char metric_string[]) {
@@ -416,7 +551,7 @@
 
   // Given a ProjectContext |project_context| and the MetricReportId of a
   // UNIQUE_N_DAY_ACTIVES report in |project_context|, as well as a day index
-  // and an event code, logs a UniqueActivesEvent to the EventAggregator for
+  // and an event code, logs an OccurrenceEvent to the EventAggregator for
   // that report, day index, and event code. If a non-null LoggedActivity map is
   // provided, updates the map with information about the logged Event.
   Status LogUniqueActivesEvent(const ProjectContext& project_context,
@@ -441,9 +576,43 @@
     return status;
   }
 
-  // Given a LoggedActivity map describing the events that have been logged to
-  // the EventAggregator, checks whether the contents of the LocalAggregateStore
-  // are as expected, accounting for any garbage collection.
+  // Given a ProjectContext |project_context| and the MetricReportId of an
+  // PER_DEVICE_COUNT_STATS report in |project_context|, as well as a
+  // day index, a component string, and an event code, logs a CountEvent to the
+  // EventAggregator for that report, day index, component, and event code. If a
+  // non-null LoggedCounts map is provided, updates the map with information
+  // about the logged Event.
+  Status LogPerDeviceCountEvent(const ProjectContext& project_context,
+                                const MetricReportId& metric_report_id,
+                                uint32_t day_index,
+                                const std::string& component,
+                                uint32_t event_code, int64_t count,
+                                LoggedCounts* logged_counts = nullptr) {
+    EventRecord event_record;
+    event_record.metric = project_context.GetMetric(metric_report_id.first);
+    event_record.event->set_day_index(day_index);
+    auto count_event = event_record.event->mutable_count_event();
+    count_event->set_component(component);
+    count_event->add_event_code(event_code);
+    count_event->set_count(count);
+    auto status = event_aggregator_->LogPerDeviceCountEvent(
+        metric_report_id.second, &event_record);
+    if (logged_counts == nullptr) {
+      return status;
+    }
+    std::string key;
+    if (!SerializeToBase64(
+            MakeAggregationKey(project_context, metric_report_id), &key)) {
+      return kInvalidArguments;
+    }
+    (*logged_counts)[key][component][event_code][day_index] += count;
+    return status;
+  }
+
+  // Given a LoggedActivity map describing the events that have been logged
+  // to the EventAggregator, checks whether the contents of the
+  // LocalAggregateStore are as expected, accounting for any garbage
+  // collection.
   //
   // logged_activity: a LoggedActivity representing event occurrences
   // since the LocalAggregateStore was created. All day indices should be
@@ -452,26 +621,33 @@
   //
   // current_day_index: The day index of the current day in the test's frame
   // of reference.
-  bool CheckAggregateStore(const LoggedActivity& logged_activity,
-                           uint32_t current_day_index) {
+  bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
+                                    uint32_t current_day_index) {
     auto local_aggregate_store = event_aggregator_->CopyLocalAggregateStore();
-    // Check that the LocalAggregateStore contains no more aggregates than
-    // |logged_activity| and |day_last_garbage_collected_| should imply.
-    for (const auto& report_pair : local_aggregate_store.aggregates()) {
-      const auto& report_key = report_pair.first;
+    // Check that the LocalAggregateStore contains no more UniqueActives
+    // aggregates than |logged_activity| and |day_last_garbage_collected_|
+    // should imply.
+    for (const auto& report_pair : local_aggregate_store.by_report_key()) {
       const auto& aggregates = report_pair.second;
+      if (aggregates.type_case() !=
+          ReportAggregates::kUniqueActivesAggregates) {
+        continue;
+      }
+      const auto& report_key = report_pair.first;
       // Check whether this ReportAggregationKey is in |logged_activity|. If
       // not, expect that its by_event_code map is empty.
       auto report_activity = logged_activity.find(report_key);
       if (report_activity == logged_activity.end()) {
-        EXPECT_TRUE(aggregates.by_event_code().empty());
-        if (!aggregates.by_event_code().empty()) {
+        EXPECT_TRUE(
+            aggregates.unique_actives_aggregates().by_event_code().empty());
+        if (!aggregates.unique_actives_aggregates().by_event_code().empty()) {
           return false;
         }
         break;
       }
       auto expected_events = report_activity->second;
-      for (const auto& event_pair : aggregates.by_event_code()) {
+      for (const auto& event_pair :
+           aggregates.unique_actives_aggregates().by_event_code()) {
         // Check that this event code is in |logged_activity| under this
         // ReportAggregationKey.
         auto event_code = event_pair.first;
@@ -508,26 +684,37 @@
     for (const auto& logged_pair : logged_activity) {
       const auto& logged_key = logged_pair.first;
       const auto& logged_event_map = logged_pair.second;
-      // Check that this ReportAggregationKey is in the LocalAggregateStore.
+      // Check that this ReportAggregationKey is in the LocalAggregateStore, and
+      // that the aggregates are of the expected type.
       auto report_aggregates =
-          local_aggregate_store.aggregates().find(logged_key);
-      EXPECT_NE(report_aggregates, local_aggregate_store.aggregates().end());
-      if (report_aggregates == local_aggregate_store.aggregates().end()) {
+          local_aggregate_store.by_report_key().find(logged_key);
+      EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
+      if (report_aggregates == local_aggregate_store.by_report_key().end()) {
         return false;
       }
+      if (report_aggregates->second.type_case() !=
+          ReportAggregates::kUniqueActivesAggregates) {
+        return false;
+      }
+      // Compute the earliest day index that should appear among the aggregates
+      // for this report.
+      auto earliest_allowed = EarliestAllowedDayIndex(
+          report_aggregates->second.aggregation_config());
       for (const auto& logged_event_pair : logged_event_map) {
         const auto& logged_event_code = logged_event_pair.first;
         const auto& logged_days = logged_event_pair.second;
-        auto earliest_allowed = EarliestAllowedDayIndex(
-            report_aggregates->second.aggregation_config());
         // Check whether this event code is in the LocalAggregateStore
         // under this ReportAggregationKey. If not, check that all day indices
         // for this event code are smaller than the day index of the earliest
         // allowed aggregate.
         auto event_code_aggregates =
-            report_aggregates->second.by_event_code().find(logged_event_code);
+            report_aggregates->second.unique_actives_aggregates()
+                .by_event_code()
+                .find(logged_event_code);
         if (event_code_aggregates ==
-            report_aggregates->second.by_event_code().end()) {
+            report_aggregates->second.unique_actives_aggregates()
+                .by_event_code()
+                .end()) {
           for (auto day_index : logged_days) {
             EXPECT_LT(day_index, earliest_allowed);
             if (day_index >= earliest_allowed) {
@@ -565,6 +752,172 @@
     return true;
   }
 
+  bool CheckPerDeviceCountAggregates(const LoggedCounts& logged_counts,
+                                     uint32_t current_day_index) {
+    auto local_aggregate_store = event_aggregator_->CopyLocalAggregateStore();
+    // Check that the LocalAggregateStore contains no more PerDeviceCount
+    // aggregates than |logged_counts| and |day_last_garbage_collected_| should
+    // imply.
+    for (const auto& report_pair : local_aggregate_store.by_report_key()) {
+      const auto& aggregates = report_pair.second;
+      if (aggregates.type_case() != ReportAggregates::kCountAggregates) {
+        continue;
+      }
+      const auto& report_key = report_pair.first;
+      // Check whether this ReportAggregationKey is in |logged_counts|. If not,
+      // expect that its by_component map is empty.
+      auto report_counts = logged_counts.find(report_key);
+      if (report_counts == logged_counts.end()) {
+        EXPECT_TRUE(aggregates.count_aggregates().by_component().empty());
+        if (!aggregates.count_aggregates().by_component().empty()) {
+          return false;
+        }
+        break;
+      }
+      auto expected_components = report_counts->second;
+      for (const auto& component_pair :
+           aggregates.count_aggregates().by_component()) {
+        // Check that this component is in |logged_counts| under this
+        // ReportAggregationKey.
+        auto component = component_pair.first;
+        auto component_counts = expected_components.find(component);
+        EXPECT_NE(component_counts, expected_components.end());
+        if (component_counts == expected_components.end()) {
+          return false;
+        }
+        const auto& expected_events = component_counts->second;
+        for (const auto& event_pair : component_pair.second.by_event_code()) {
+          // Check that this event code is in |logged_counts| under this
+          // ReportAggregationKey and component.
+          const auto& event_code = event_pair.first;
+          auto event_counts = expected_events.find(event_code);
+          EXPECT_NE(event_counts, expected_events.end());
+          if (event_counts == expected_events.end()) {
+            return false;
+          }
+          const auto& expected_days = event_counts->second;
+          for (const auto& day_pair : event_pair.second.by_day_index()) {
+            // Check that this day index is in |logged_counts| under this
+            // ReportAggregationKey, component, and event code.
+            const auto& day_index = day_pair.first;
+            auto day_count = expected_days.find(day_index);
+            EXPECT_NE(day_count, expected_days.end());
+            if (day_count == expected_days.end()) {
+              return false;
+            }
+            // Check that the day index is no earlier than is implied by the
+            // dates of store creation and garbage collection.
+            EXPECT_GE(day_index,
+                      EarliestAllowedDayIndex(aggregates.aggregation_config()));
+            if (day_index <
+                EarliestAllowedDayIndex(aggregates.aggregation_config())) {
+              return false;
+            }
+          }
+        }
+      }
+    }
+
+    // Check that the LocalAggregateStore contains aggregates for all counts in
+    // |logged_counts|, as long as they are recent enough to have survived any
+    // garbage collection.
+    for (const auto& logged_pair : logged_counts) {
+      const auto& logged_key = logged_pair.first;
+      const auto& logged_component_map = logged_pair.second;
+      // Check that this ReportAggregationKey is in the LocalAggregateStore, and
+      // that the aggregates are of the expected type.
+      auto report_aggregates =
+          local_aggregate_store.by_report_key().find(logged_key);
+      EXPECT_NE(report_aggregates, local_aggregate_store.by_report_key().end());
+      if (report_aggregates == local_aggregate_store.by_report_key().end()) {
+        return false;
+      }
+      if (report_aggregates->second.type_case() !=
+          ReportAggregates::kCountAggregates) {
+        return false;
+      }
+      // Compute the earliest day index that should appear among the aggregates
+      // for this report.
+      auto earliest_allowed = EarliestAllowedDayIndex(
+          report_aggregates->second.aggregation_config());
+      for (const auto& logged_component_pair : logged_component_map) {
+        const auto& logged_component = logged_component_pair.first;
+        const auto& logged_event_code_map = logged_component_pair.second;
+        // Check whether this component is in the LocalAggregateStore under this
+        // ReportAggregationKey. If not, check that all day indices for all
+        // entries in |logged_counts| under this component are smaller than the
+        // day index of the earliest allowed aggregate.
+        bool component_found = false;
+        auto component_aggregates =
+            report_aggregates->second.count_aggregates().by_component().find(
+                logged_component);
+        if (component_aggregates !=
+            report_aggregates->second.count_aggregates().by_component().end()) {
+          component_found = true;
+        }
+        for (const auto& logged_event_pair : logged_event_code_map) {
+          const auto& logged_event_code = logged_event_pair.first;
+          const auto& logged_day_map = logged_event_pair.second;
+          // Check whether this event code is in the LocalAggregateStore under
+          // this ReportAggregationKey. If not, check that all day indices in
+          // |logged_counts| under this component are smaller than the day index
+          // of the earliest allowed aggregate.
+          bool event_code_found = false;
+          if (component_found) {
+            auto event_code_aggregates =
+                component_aggregates->second.by_event_code().find(
+                    logged_event_code);
+            if (event_code_aggregates !=
+                component_aggregates->second.by_event_code().end()) {
+              event_code_found = true;
+            }
+            if (event_code_found) {
+              // Check that all of the day indices in |logged_counts| under this
+              // ReportAggregationKey, component, and event code are in the
+              // LocalAggregateStore, as long as they are recent enough to have
+              // survived any garbage collection. Check that each aggregate has
+              // the expected count.
+              for (const auto& logged_day_pair : logged_day_map) {
+                auto logged_day_index = logged_day_pair.first;
+                auto logged_count = logged_day_pair.second;
+                auto day_aggregate =
+                    event_code_aggregates->second.by_day_index().find(
+                        logged_day_index);
+                if (logged_day_index >= earliest_allowed) {
+                  EXPECT_NE(day_aggregate,
+                            event_code_aggregates->second.by_day_index().end());
+                  if (day_aggregate ==
+                      event_code_aggregates->second.by_day_index().end()) {
+                    return false;
+                  }
+                  EXPECT_EQ(
+                      day_aggregate->second.count_daily_aggregate().count(),
+                      logged_count);
+                  if (day_aggregate->second.count_daily_aggregate().count() !=
+                      logged_count) {
+                    return false;
+                  }
+                }
+              }
+            }
+          }
+          if (!component_found | !event_code_found) {
+            for (auto logged_day_pair : logged_day_map) {
+              auto logged_day_index = logged_day_pair.first;
+              EXPECT_LT(logged_day_index, earliest_allowed);
+              if (logged_day_index >= earliest_allowed) {
+                return false;
+              }
+            }
+            break;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
   // Given the AggregationConfig of a locally aggregated report, returns the
   // earliest (smallest) day index for which an aggregate may exist in the
   // LocalAggregateStore for that report, accounting for garbage
@@ -583,9 +936,9 @@
       // Otherwise, it is the later of:
       // (a) The day index on which the store was created minus the number
       // of backfill days.
-      // (b) The day index for which the store was last garbage-collected minus
-      // the number of backfill days, minus the largest window size in the
-      // report associated to |config|, plus 1.
+      // (b) The day index for which the store was last garbage-collected
+      // minus the number of backfill days, minus the largest window size in
+      // the report associated to |config|, plus 1.
       EXPECT_GE(day_last_garbage_collected_, backfill_days)
           << "The day index of last garbage collection must be larger than "
              "the number of backfill days.";
@@ -639,7 +992,7 @@
     event_aggregator_->UpdateAggregationConfigs(*project_context_);
   }
 
-  // Logs a UniqueActivesEvent for the MetricReportId of a locally
+  // Logs an OccurrenceEvent for the MetricReportId of a locally
   // aggregated report in |metric_string|. Overrides the method
   // EventAggregatorTest::LogUniqueActivesEvent.
   Status LogUniqueActivesEvent(const MetricReportId& metric_report_id,
@@ -650,6 +1003,19 @@
         logged_activity);
   }
 
+  // Logs a CountEvent for the MetricReportId of a locally
+  // aggregated report in |metric_string|. Overrides the method
+  // EventAggregatorTest::LogPerDeviceCountEvent.
+  Status LogPerDeviceCountEvent(const MetricReportId& metric_report_id,
+                                uint32_t day_index,
+                                const std::string& component,
+                                uint32_t event_code, int64_t count,
+                                LoggedCounts* logged_counts = nullptr) {
+    return EventAggregatorTest::LogPerDeviceCountEvent(
+        *project_context_, metric_report_id, day_index, component, event_code,
+        count, logged_counts);
+  }
+
  private:
   // A ProjectContext wrapping the MetricDefinitions passed to the
   // constructor in |metric_string|.
@@ -677,6 +1043,16 @@
 };
 
 // Creates an EventAggregator as in EventAggregatorTest and provides it with
+// |kPerDeviceCountMetricDefinitions|.
+class PerDeviceCountEventAggregatorTest
+    : public EventAggregatorTestWithProjectContext {
+ protected:
+  PerDeviceCountEventAggregatorTest()
+      : EventAggregatorTestWithProjectContext(
+            kPerDeviceCountMetricDefinitions) {}
+};
+
+// Creates an EventAggregator as in EventAggregatorTest and provides it with
 // |kNoiseFreeMixedTimeZoneMetricDefinitions|.
 class NoiseFreeMixedTimeZoneEventAggregatorTest
     : public EventAggregatorTestWithProjectContext {
@@ -726,34 +1102,30 @@
 
 // Tests that an empty LocalAggregateStore is updated with
 // ReportAggregationKeys and AggregationConfigs as expected when
-// EventAggregator::UpdateAggregationConfigs is called.
+// EventAggregator::UpdateAggregationConfigs is called with a ProjectContext
+// containing at least one report for each locally aggregated report type.
 TEST_F(EventAggregatorTest, UpdateAggregationConfigs) {
   // Check that the LocalAggregateStore is empty.
-  EXPECT_EQ(0u, CopyLocalAggregateStore().aggregates().size());
-  // Provide |kUniqueActivesMetricDefinitions| to the EventAggregator.
-  auto unique_actives_project_context =
-      MakeProjectContext(kUniqueActivesMetricDefinitions);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
-                     *unique_actives_project_context));
+  EXPECT_EQ(0u, CopyLocalAggregateStore().by_report_key().size());
+  // Provide |kMetricDefinitions| to the EventAggregator.
+  auto project_context = MakeProjectContext(kMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of locally aggregated reports in
-  // |kUniqueActivesMetricDefinitions|.
-  EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
-            CopyLocalAggregateStore().aggregates().size());
+  // |kMetricDefinitions|.
+  EXPECT_EQ(kExpectedParams.metric_report_ids.size(),
+            CopyLocalAggregateStore().by_report_key().size());
   // Check that the LocalAggregateStore contains the expected
   // ReportAggregationKey and AggregationConfig for each locally aggregated
   // report in |kUniqueActivesMetricDefinitions|,
-  for (const auto& metric_report_id :
-       kUniqueActivesExpectedParams.metric_report_ids) {
+  for (const auto& metric_report_id : kExpectedParams.metric_report_ids) {
     std::string key;
-    SerializeToBase64(
-        MakeAggregationKey(*unique_actives_project_context, metric_report_id),
-        &key);
-    auto config = MakeAggregationConfig(*unique_actives_project_context,
-                                        metric_report_id);
+    SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id),
+                      &key);
+    auto config = MakeAggregationConfig(*project_context, metric_report_id);
     LocalAggregateStore local_aggregate_store = CopyLocalAggregateStore();
-    auto report_aggregates = local_aggregate_store.aggregates().find(key);
-    EXPECT_NE(local_aggregate_store.aggregates().end(), report_aggregates);
+    auto report_aggregates = local_aggregate_store.by_report_key().find(key);
+    EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
     EXPECT_TRUE(MessageDifferencer::Equals(
         config, report_aggregates->second.aggregation_config()));
   }
@@ -780,7 +1152,7 @@
   // now equal to the number of locally aggregated reports in
   // |kUniqueActivesMetricDefinitions|.
   EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
-            CopyLocalAggregateStore().aggregates().size());
+            CopyLocalAggregateStore().by_report_key().size());
   // Provide the EventAggregator with
   // |kNoiseFreeUniqueActivesMetricDefinitions|.
   auto noise_free_unique_actives_project_context =
@@ -792,7 +1164,7 @@
   // aggregated reports in |kUniqueActivesMetricDefinitions| and
   // |kNoiseFreeUniqueActivesMetricDefinitions|.
   auto local_aggregate_store = CopyLocalAggregateStore();
-  EXPECT_EQ(4u, local_aggregate_store.aggregates().size());
+  EXPECT_EQ(4u, local_aggregate_store.by_report_key().size());
   // The MetricReportId |kFeaturesActiveMetricReportId| appears in both
   // |kUniqueActivesMetricDefinitions| and
   // |kNoiseFreeUniqueActivesMetricDefinitions|. The associated
@@ -810,8 +1182,8 @@
                         &key));
   auto unique_actives_config = MakeAggregationConfig(
       *unique_actives_project_context, kFeaturesActiveMetricReportId);
-  auto report_aggregates = local_aggregate_store.aggregates().find(key);
-  EXPECT_NE(local_aggregate_store.aggregates().end(), report_aggregates);
+  auto report_aggregates = local_aggregate_store.by_report_key().find(key);
+  EXPECT_NE(local_aggregate_store.by_report_key().end(), report_aggregates);
   EXPECT_TRUE(MessageDifferencer::Equals(
       unique_actives_config, report_aggregates->second.aggregation_config()));
   auto noise_free_config =
@@ -821,39 +1193,84 @@
       noise_free_config, report_aggregates->second.aggregation_config()));
 }
 
-// Tests that EventAggregator::LogUniqueActivesEvent returns
-// |kInvalidArguments| when passed a report ID which is not associated to a
-// key of the LocalAggregateStore, or when passed an EventRecord containing
-// an Event proto message which is not of type OccurrenceEvent.
+// Tests that EventAggregator::Log*Event returns |kInvalidArguments| when
+// passed a report ID which is not associated to a key of the
+// LocalAggregateStore, or when passed an EventRecord containing an Event
+// proto message which is not of the appropriate event type.
 TEST_F(EventAggregatorTest, LogBadEvents) {
-  // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
-  auto unique_actives_project_context =
-      MakeProjectContext(kUniqueActivesMetricDefinitions);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
-                     *unique_actives_project_context));
-  // Attempt to log a UniqueActivesEvent for
-  // |kEventsOccurredMetricReportId|, which is not in
-  // |kUniqueActivesMetricDefinitions|. Check that the result is
-  // |kInvalidArguments|.
+  // Provide the EventAggregator with |kMetricDefinitions|.
+  auto project_context = MakeProjectContext(kUniqueActivesMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  // Attempt to log events for |kEventsOccurredMetricReportId|, which is not
+  // in |kMetricDefinitions|. Check that the result is |kInvalidArguments|.
   auto noise_free_project_context =
       MakeProjectContext(kNoiseFreeUniqueActivesMetricDefinitions);
   EventRecord bad_event_record;
   bad_event_record.metric = noise_free_project_context->GetMetric(
       kEventsOccurredMetricReportId.first);
   bad_event_record.event->set_day_index(CurrentDayIndex());
-  bad_event_record.event->mutable_occurrence_event()->set_event_code(0u);
+  bad_event_record.event->mutable_occurrence_event();
   EXPECT_EQ(kInvalidArguments,
             event_aggregator_->LogUniqueActivesEvent(
                 kEventsOccurredMetricReportId.second, &bad_event_record));
+  bad_event_record.event->mutable_count_event();
+  EXPECT_EQ(kInvalidArguments,
+            event_aggregator_->LogPerDeviceCountEvent(
+                kEventsOccurredMetricReportId.second, &bad_event_record));
   // Attempt to call LogUniqueActivesEvent() with a valid metric and report
   // ID, but with an EventRecord wrapping an Event which is not an
   // OccurrenceEvent. Check that the result is |kInvalidArguments|.
-  bad_event_record.metric = unique_actives_project_context->GetMetric(
-      kFeaturesActiveMetricReportId.first);
+  bad_event_record.metric =
+      project_context->GetMetric(kErrorsOccurredMetricReportId.first);
   bad_event_record.event->mutable_count_event();
   EXPECT_EQ(kInvalidArguments,
             event_aggregator_->LogUniqueActivesEvent(
-                kFeaturesActiveMetricReportId.second, &bad_event_record));
+                kErrorsOccurredMetricReportId.second, &bad_event_record));
+  // Attempt to call LogPerDeviceCountEvent() with a valid metric and report
+  // ID, but with an EventRecord wrapping an Event which is not a
+  // CountEvent. Check that the result is |kInvalidArguments|.
+  bad_event_record.metric =
+      project_context->GetMetric(kConnectionFailuresMetricReportId.first);
+  bad_event_record.event->mutable_occurrence_event();
+  EXPECT_EQ(kInvalidArguments,
+            event_aggregator_->LogPerDeviceCountEvent(
+                kConnectionFailuresMetricReportId.second, &bad_event_record));
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive
+// status and that the expected number of Observations is generated when no
+// Events have been logged to the EventAggregator.
+TEST_F(EventAggregatorTest, GenerateObservationsNoEvents) {
+  // Provide the EventAggregator with |kMetricDefinitions|.
+  auto project_context = MakeProjectContext(kMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  // Generate locally aggregated Observations for the current day index.
+  EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
+  std::vector<Observation2> observations(0);
+  EXPECT_TRUE(FetchAggregatedObservations(&observations, kExpectedParams,
+                                          observation_store_.get(),
+                                          update_recipient_.get()));
+}
+
+// Tests that EventAggregator::GenerateObservations() only generates
+// Observations the first time it is called for a given day index.
+TEST_F(EventAggregatorTest, GenerateObservationsTwice) {
+  // Provide the EventAggregator with |kMetricDefinitions|.
+  auto project_context = MakeProjectContext(kMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  // Check that Observations are generated when GenerateObservations is called
+  // for the current day index for the first time.
+  auto current_day_index = CurrentDayIndex();
+  EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+  std::vector<Observation2> observations(0);
+  EXPECT_TRUE(FetchAggregatedObservations(&observations, kExpectedParams,
+                                          observation_store_.get(),
+                                          update_recipient_.get()));
+  // Check that no Observations are generated when GenerateObservations is
+  // called for the currentday index for the second time.
+  ResetObservationStore();
+  EXPECT_EQ(kOK, GenerateObservations(current_day_index));
+  EXPECT_EQ(0u, observation_store_->messages_received.size());
 }
 
 // Tests that the LocalAggregateStore is updated as expected when
@@ -864,7 +1281,7 @@
 //
 // Logs some valid events each day for 35 days, checking the contents of the
 // LocalAggregateStore each day.
-TEST_F(UniqueActivesEventAggregatorTest, LogUniqueActivesEvents) {
+TEST_F(UniqueActivesEventAggregatorTest, LogEvents) {
   LoggedActivity logged_activity;
   uint32_t num_days = 35;
   for (uint32_t offset = 0; offset < num_days; offset++) {
@@ -874,12 +1291,12 @@
     auto day_index = CurrentDayIndex();
     EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
                                          day_index, 0u, &logged_activity));
-    EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+    EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
     // Log another event for the same report, event code, and day index.
     // Check the contents of the LocalAggregateStore.
     EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
                                          day_index, 0u, &logged_activity));
-    EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+    EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
     // Log several more events for various valid reports and event codes.
     // Check the contents of the LocalAggregateStore.
     EXPECT_EQ(kOK, LogUniqueActivesEvent(kDeviceBootsMetricReportId, day_index,
@@ -888,15 +1305,15 @@
                                          day_index, 4u, &logged_activity));
     EXPECT_EQ(kOK, LogUniqueActivesEvent(kErrorsOccurredMetricReportId,
                                          day_index, 1u, &logged_activity));
-    EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+    EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
     AdvanceClock(kDay);
   }
 }
 
-// Tests the method EventAggregator::GarbageCollect().
+// Tests GarbageCollect() for UniqueActivesReportAggregates.
 //
 // For each value of N in the range [0, 34], logs some UniqueActivesEvents
-// each day for N consecutive days and then garbage-collect the
+// each day for N consecutive days and then garbage-collects the
 // LocalAggregateStore. After garbage collection, verifies the contents of
 // the LocalAggregateStore.
 TEST_F(UniqueActivesEventAggregatorTest, GarbageCollect) {
@@ -915,16 +1332,18 @@
                                              &logged_activity));
         EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 0u,
                                              &logged_activity));
-        // Log 1 event with event code 1.
-        EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 1u,
-                                             &logged_activity));
+        if (offset < 3) {
+          // Log 1 event with event code 1.
+          EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 1u,
+                                               &logged_activity));
+        }
       }
       AdvanceClock(kDay);
     }
     auto end_day_index = CurrentDayIndex();
     EXPECT_EQ(kOK, GarbageCollect(end_day_index));
     day_last_garbage_collected_ = end_day_index;
-    EXPECT_TRUE(CheckAggregateStore(logged_activity, end_day_index));
+    EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, end_day_index));
     TearDown();
   }
 }
@@ -969,8 +1388,8 @@
 //
 // Each day, calls GenerateObservations() with the day index of the previous
 // day. Checks that a positive status is returned and that the
-// FakeObservationStore has received the expected number of new observations for
-// each locally aggregated report ID in |kUniqueActivesMetricDefinitions|.
+// FakeObservationStore has received the expected number of new observations
+// for each locally aggregated report ID in |kUniqueActivesMetricDefinitions|.
 TEST_F(UniqueActivesEventAggregatorTest, GenerateObservations) {
   int num_days = 35;
   std::vector<Observation2> observations(0);
@@ -999,8 +1418,8 @@
 }
 
 // Tests that GenerateObservations() returns a positive status and that the
-// expected number of Observations is generated each day when Events are logged
-// for UNIQUE_N_DAY_ACTIVES reports over multiple days, and when the
+// expected number of Observations is generated each day when Events are
+// logged for UNIQUE_N_DAY_ACTIVES reports over multiple days, and when the
 // LocalAggregateStore is garbage-collected each day.
 //
 // For 35 days, logs 2 events each day for the ErrorsOccurred_UniqueDevices
@@ -1050,9 +1469,9 @@
 // Sets the |backfill_days_| field of the EventAggregator to 3.
 //
 // Logging pattern:
-// For 35 days, logs 2 events each day for the SomeErrorsOccurred_UniqueDevices
-// report and 2 events for the SomeFeaturesActive_Unique_Devices report, all
-// with event code 0.
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
 //
 // Observation generation pattern:
 // Calls GenerateObservations() on the 1st through 5th and the 7th out of
@@ -1060,8 +1479,8 @@
 //
 // Expected numbers of Observations:
 // It is expected that 4 days' worth of Observations are generated on
-// the first day of every 10 (the day index for which GenerateObservations() was
-// called, plus 3 days of backfill), that 1 day's worth of Observations
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
 // are generated on the 2nd through 5th day of every 10, that 2 days'
 // worth of Observations are generated on the 7th day of every 10 (the
 // day index for which GenerateObservations() was called, plus 1 day of
@@ -1070,8 +1489,8 @@
   // Set |backfill_days_| to 3.
   size_t backfill_days = 3;
   SetBackfillDays(backfill_days);
-  // Log 2 events each day for 35 days. Call GenerateObservations() on the first
-  // 5 day indices, and the 7th, out of every 10.
+  // Log 2 events each day for 35 days. Call GenerateObservations() on the
+  // first 5 day indices, and the 7th, out of every 10.
   for (int offset = 0; offset < 35; offset++) {
     auto day_index = CurrentDayIndex();
     for (int i = 0; i < 2; i++) {
@@ -1118,9 +1537,9 @@
 // Sets the |backfill_days_| field of the EventAggregator to 3.
 //
 // Logging pattern:
-// For 35 days, logs 2 events each day for the SomeErrorsOccurred_UniqueDevices
-// report and 2 events for the SomeFeaturesActive_Unique_Devices report, all
-// with event code 0.
+// For 35 days, logs 2 events each day for the
+// SomeErrorsOccurred_UniqueDevices report and 2 events for the
+// SomeFeaturesActive_Unique_Devices report, all with event code 0.
 //
 // Observation generation pattern:
 // Calls GenerateObservations() on the 1st through 5th and the 7th out of
@@ -1129,8 +1548,8 @@
 //
 // Expected numbers of Observations:
 // It is expected that 4 days' worth of Observations are generated on
-// the first day of every 10 (the day index for which GenerateObservations() was
-// called, plus 3 days of backfill), that 1 day's worth of Observations
+// the first day of every 10 (the day index for which GenerateObservations()
+// was called, plus 3 days of backfill), that 1 day's worth of Observations
 // are generated on the 2nd through 5th day of every 10, that 2 days'
 // worth of Observations are generated on the 7th day of every 10 (the
 // day index for which GenerateObservations() was called, plus 1 day of
@@ -1141,8 +1560,8 @@
   // Set |backfill_days_| to 3.
   size_t backfill_days = 3;
   SetBackfillDays(backfill_days);
-  // Log 2 events each day for 35 days. Call GenerateObservations() on the first
-  // 5 day indices, and the 7th, out of every 10.
+  // Log 2 events each day for 35 days. Call GenerateObservations() on the
+  // first 5 day indices, and the 7th, out of every 10.
   for (int offset = 0; offset < num_days; offset++) {
     auto day_index = CurrentDayIndex();
     for (int i = 0; i < 2; i++) {
@@ -1198,15 +1617,15 @@
       expected_obs, observation_store_.get(), update_recipient_.get()));
 }
 
-// Checks that UniqueActivesObservations with the expected values are generated
-// when GenerateObservations() is called for a single day index after logging
-// some events for UNIQUE_N_DAY_ACTIVES reports for that day index, without any
-// garbage collection or backfill.
+// Checks that UniqueActivesObservations with the expected values are
+// generated when GenerateObservations() is called for a single day index
+// after logging some events for UNIQUE_N_DAY_ACTIVES reports for that day
+// index, without any garbage collection or backfill.
 //
 // Logging pattern:
 // Logs 2 occurrences of event code 0 for the FeaturesActives_UniqueDevices
-// report, and 1 occurrence of event code 1 for the EventsOccurred_UniqueDevices
-// report, all on the same day.
+// report, and 1 occurrence of event code 1 for the
+// EventsOccurred_UniqueDevices report, all on the same day.
 //
 // Observation generation pattern:
 // Calls GenerateObservations() after logging all events.
@@ -1265,8 +1684,8 @@
 
 // Checks that UniqueActivesObservations with the expected values are
 // generated when some events have been logged for a UNIQUE_N_DAY_ACTIVES
-// report for over multiple days and GenerateObservations() is called each day,
-// without garbage collection or backfill.
+// report for over multiple days and GenerateObservations() is called each
+// day, without garbage collection or backfill.
 //
 // Logging pattern:
 // Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
@@ -1283,12 +1702,13 @@
 //
 // Expected number of Observations:
 // Each call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of |kNoisefreeUniqueActivesExpectedParams|.
+// equal to the daily_num_obs field of
+// |kNoisefreeUniqueActivesExpectedParams|.
 //
 // Expected Observation values:
 // The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report on the i-th
-// day are:
+// the expected activity indicators of Observations for that report on the
+// i-th day are:
 //
 // (i, window size)            active for event codes
 // ------------------------------------------------------
@@ -1381,9 +1801,9 @@
 
 // Checks that UniqueActivesObservations with the expected values are
 // generated when some events have been logged for a UNIQUE_N_DAY_ACTIVES
-// report for over multiple days and GenerateObservations() is called each day,
-// and when the LocalAggregateStore is garbage-collected after each call to
-// GenerateObservations().
+// report for over multiple days and GenerateObservations() is called each
+// day, and when the LocalAggregateStore is garbage-collected after each call
+// to GenerateObservations().
 //
 // Logging pattern:
 // Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
@@ -1400,12 +1820,13 @@
 //
 // Expected number of Observations:
 // Each call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of |kNoisefreeUniqueActivesExpectedParams|.
+// equal to the daily_num_obs field of
+// |kNoisefreeUniqueActivesExpectedParams|.
 //
 // Expected Observation values:
 // The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report on the i-th
-// day are:
+// the expected activity indicators of Observations for that report on the
+// i-th day are:
 //
 // (i, window size)            active for event codes
 // ------------------------------------------------------
@@ -1491,7 +1912,7 @@
     // Generate locally aggregated Observations and garbage-collect the
     // LocalAggregateStore, both for the previous day as measured by
     // |mock_clock_|. Back up the LocalAggregateStore and
-    // AggregatedObservationHistory.
+    // AggregatedObservationHistoryStore.
     DoScheduledTasksNow();
     // Check the generated Observations against the expectation.
     EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs[offset],
@@ -1516,18 +1937,18 @@
 // |start_day_index + i| and |start_day_index + 2*i|.
 //
 // Observation generation pattern:
-// The test calls GenerateObservations() on day |start_day_index + i| for i = 0
-// through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
+// The test calls GenerateObservations() on day |start_day_index + i| for i =
+// 0 through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
 // |start_day_index + 7|.
 //
 // Expected numbers of Observations:
 // It is expected that 4 days' worth of Observations are generated on the
-// first day (the day index for which GenerateObservations() was called, plus 3
-// days of backfill), that 1 day's worth of Observations is generated on the
-// 2nd through 6th day, that 3 days' worth of Observations are generated on the
-// 9th day (the day index for which GenerateObservations() was called, plus 2
-// days of backfill), and that no Observations are generated on the remaining
-// days.
+// first day (the day index for which GenerateObservations() was called, plus
+// 3 days of backfill), that 1 day's worth of Observations is generated on the
+// 2nd through 6th day, that 3 days' worth of Observations are generated on
+// the 9th day (the day index for which GenerateObservations() was called,
+// plus 2 days of backfill), and that no Observations are generated on the
+// remaining days.
 //
 // Expected Observation values:
 // The expected activity indicators of Observations for the
@@ -1693,18 +2114,18 @@
 // |start_day_index + i| and |start_day_index + 2*i|.
 //
 // Observation generation pattern:
-// The test calls GenerateObservations() on day |start_day_index + i| for i = 0
-// through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
+// The test calls GenerateObservations() on day |start_day_index + i| for i =
+// 0 through i = 5 and for i = 8, skipping the days |start_day_index + 6| and
 // |start_day_index + 7|.
 //
 // Expected numbers of Observations:
 // It is expected that 4 days' worth of Observations are generated on the
-// first day (the day index for which GenerateObservations() was called, plus 3
-// days of backfill), that 1 day's worth of Observations is generated on the
-// 2nd through 6th day, that 3 days' worth of Observations are generated on the
-// 9th day (the day index for which GenerateObservations() was called, plus 2
-// days of backfill), and that no Observations are generated on the remaining
-// days.
+// first day (the day index for which GenerateObservations() was called, plus
+// 3 days of backfill), that 1 day's worth of Observations is generated on the
+// 2nd through 6th day, that 3 days' worth of Observations are generated on
+// the 9th day (the day index for which GenerateObservations() was called,
+// plus 2 days of backfill), and that no Observations are generated on the
+// remaining days.
 //
 // Expected Observation values:
 // The expected activity indicators of Observations for the
@@ -1757,7 +2178,7 @@
     if (offset < 6 || offset == 9) {
       // Generate Observations and garbage-collect, both for the previous day
       // index according to |mock_clock_|. Back up the LocalAggregateStore and
-      // the AggregatedObservationHistory.
+      // the AggregatedObservationHistoryStore.
       DoScheduledTasksNow();
     }
     // Make the set of Observations which are expected to be generated on
@@ -1858,6 +2279,88 @@
   }
 }
 
+// Tests that the LocalAggregateStore is updated as expected when
+// EventAggregator::LogPerDeviceCountEvent() is called with valid arguments;
+// i.e., with a report ID associated to an existing key of the
+// LocalAggregateStore, and with an EventRecord which wraps a CountEvent.
+//
+// Logs some valid events each day for 35 days, checking the contents of the
+// LocalAggregateStore each day.
+TEST_F(PerDeviceCountEventAggregatorTest, LogEvents) {
+  LoggedCounts logged_counts;
+  uint32_t num_days = 35;
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    auto day_index = CurrentDayIndex();
+    EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                          day_index, "component_A", 0u, 5,
+                                          &logged_counts));
+    EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                          day_index, "component_A", 0u, 7,
+                                          &logged_counts));
+    EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                          day_index, "component_A", 1u, 3,
+                                          &logged_counts));
+    EXPECT_EQ(kOK,
+              LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+                                     "component_B", 0u, 10, &logged_counts));
+    EXPECT_EQ(kOK,
+              LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+                                     "component_A", 0u, 2, &logged_counts));
+    EXPECT_EQ(kOK, LogPerDeviceCountEvent(kConnectionFailuresMetricReportId,
+                                          day_index, "component_C", 0u, 15,
+                                          &logged_counts));
+    EXPECT_EQ(kOK,
+              LogPerDeviceCountEvent(kSettingsChangedMetricReportId, day_index,
+                                     "component_B", 0u, 4, &logged_counts));
+    EXPECT_TRUE(CheckPerDeviceCountAggregates(logged_counts, day_index));
+    AdvanceClock(kDay);
+  }
+}
+
+// Tests GarbageCollect() for PerDeviceCountReportAggregates.
+//
+// For each value of N in the range [0, 34], logs some CountEvents for a
+// PerDeviceCount report each day for N consecutive days, and then
+// garbage-collects the LocalAggregateStore. After garbage collection, verifies
+// the contents of the LocalAggregateStore.
+TEST_F(PerDeviceCountEventAggregatorTest, GarbageCollect) {
+  uint32_t max_days_before_gc = 35;
+  for (uint32_t days_before_gc = 0; days_before_gc < max_days_before_gc;
+       days_before_gc++) {
+    SetUp();
+    day_last_garbage_collected_ = 0u;
+    LoggedCounts logged_counts;
+    for (uint32_t offset = 0; offset < days_before_gc; offset++) {
+      auto day_index = CurrentDayIndex();
+      for (const auto& metric_report_id :
+           kPerDeviceCountExpectedParams.metric_report_ids) {
+        for (const auto& component :
+             {"component_A", "component_B", "component_C"}) {
+          // Log 2 events with event code 0, for each component A, B, C.
+          EXPECT_EQ(kOK,
+                    LogPerDeviceCountEvent(metric_report_id, day_index,
+                                           component, 0u, 2, &logged_counts));
+          EXPECT_EQ(kOK,
+                    LogPerDeviceCountEvent(metric_report_id, day_index,
+                                           component, 0u, 3, &logged_counts));
+        }
+        if (offset < 3) {
+          // Log 1 event for component D and event code 1.
+          EXPECT_EQ(kOK, LogPerDeviceCountEvent(metric_report_id, day_index,
+                                                "component_D", 1u, 4,
+                                                &logged_counts));
+        }
+      }
+      AdvanceClock(kDay);
+    }
+    auto end_day_index = CurrentDayIndex();
+    EXPECT_EQ(kOK, GarbageCollect(end_day_index));
+    day_last_garbage_collected_ = end_day_index;
+    EXPECT_TRUE(CheckPerDeviceCountAggregates(logged_counts, end_day_index));
+    TearDown();
+  }
+}
+
 // Tests GenerateObservations() and GarbageCollect() in the case where the
 // LocalAggregateStore contains aggregates for metrics with both UTC and LOCAL
 // time zone policies, and where the day index in local time may be less than
@@ -1889,25 +2392,27 @@
   LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 1u);
   GenerateObservations(start_day_index, start_day_index - 1);
   GarbageCollect(start_day_index, start_day_index - 1);
-  // Form the expected contents of the FakeObservationStore. Since Observations
-  // have already been generated for the DeviceBoots_UniqueDevices report for
-  // |start_day_index - 1|, expect no Observations for that report.
+  // Form the expected contents of the FakeObservationStore. Since
+  // Observations have already been generated for the
+  // DeviceBoots_UniqueDevices report for |start_day_index - 1|, expect no
+  // Observations for that report.
   expected_obs[1][{kFeaturesActiveMetricReportId, start_day_index}] = {
       {1, {true, false, false}}};
   EXPECT_TRUE(CheckUniqueActivesObservations(
       expected_obs[1], observation_store_.get(), update_recipient_.get()));
   ResetObservationStore();
-  // Advance the day index in local time so that it is equal to the day index in
-  // UTC. Log 1 event for event code 2 for each of the 2 reports, then generate
-  // Observations and garbage-collect for the previous day in each of UTC and
-  // local time.
+  // Advance the day index in local time so that it is equal to the day index
+  // in UTC. Log 1 event for event code 2 for each of the 2 reports, then
+  // generate Observations and garbage-collect for the previous day in each of
+  // UTC and local time.
   LogUniqueActivesEvent(kDeviceBootsMetricReportId, start_day_index + 1, 2u);
   LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 2u);
   GenerateObservations(start_day_index, start_day_index);
   GarbageCollect(start_day_index, start_day_index);
-  // Form the expected contents of the FakeObservationStore. Since Observations
-  // have already been generated for the FeaturesActive_UniqueDevices report for
-  // day |start_day_index|, expect no Observations for that report.
+  // Form the expected contents of the FakeObservationStore. Since
+  // Observations have already been generated for the
+  // FeaturesActive_UniqueDevices report for day |start_day_index|, expect no
+  // Observations for that report.
   expected_obs[2][{kDeviceBootsMetricReportId, start_day_index}] = {
       {1, {true, true, false}}};
   EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1945,9 +2450,10 @@
   LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index, 1u);
   GenerateObservations(start_day_index - 1, start_day_index);
   GarbageCollect(start_day_index - 1, start_day_index);
-  // Form the expected contents of the FakeObservationStore. Since Observations
-  // have already been generated for the FeaturesActive_UniqueDevices report for
-  // |start_day_index - 1|, expect no Observations for that report.
+  // Form the expected contents of the FakeObservationStore. Since
+  // Observations have already been generated for the
+  // FeaturesActive_UniqueDevices report for |start_day_index - 1|, expect no
+  // Observations for that report.
   expected_obs[1][{kDeviceBootsMetricReportId, start_day_index}] = {
       {1, {true, false, false}}};
   EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1961,9 +2467,10 @@
   LogUniqueActivesEvent(kFeaturesActiveMetricReportId, start_day_index + 1, 2u);
   GenerateObservations(start_day_index, start_day_index);
   GarbageCollect(start_day_index, start_day_index);
-  // Form the expected contents of the FakeObservationStore. Since Observations
-  // have already been generated for the DeviceBoots_UniqueDevices report for
-  // day |start_day_index|, expect no Observations for that report.
+  // Form the expected contents of the FakeObservationStore. Since
+  // Observations have already been generated for the
+  // DeviceBoots_UniqueDevices report for day |start_day_index|, expect no
+  // Observations for that report.
   expected_obs[2][{kFeaturesActiveMetricReportId, start_day_index}] = {
       {1, {true, true, false}}};
   EXPECT_TRUE(CheckUniqueActivesObservations(
@@ -1971,8 +2478,8 @@
 }
 
 // Starts the worker thread, and destructs the EventAggregator without
-// explicitly shutting down the worker thread. Checks that the shutdown flag and
-// worker thread are in the expected states before and after the thread is
+// explicitly shutting down the worker thread. Checks that the shutdown flag
+// and worker thread are in the expected states before and after the thread is
 // started.
 TEST_F(EventAggregatorWorkerTest, StartWorkerThread) {
   EXPECT_TRUE(in_shutdown_state());
@@ -2003,54 +2510,34 @@
 // EventAggregator::UpdateAggregationConfigs() on the main thread.
 TEST_F(EventAggregatorWorkerTest, UpdateAggregationConfigs) {
   event_aggregator_->Start();
-  // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
-  auto unique_actives_project_context =
-      MakeProjectContext(kUniqueActivesMetricDefinitions);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
-                     *unique_actives_project_context));
+  // Provide the EventAggregator with |kMetricDefinitions|.
+  auto project_context = MakeProjectContext(kMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of locally aggregated reports in
-  // |kUniqueActivesMetricDefinitions|.
-  EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
-            CopyLocalAggregateStore().aggregates().size());
+  // |kMetricDefinitions|.
+  EXPECT_EQ(kExpectedParams.metric_report_ids.size(),
+            CopyLocalAggregateStore().by_report_key().size());
 }
 
 // Starts the worker thread, provides a ProjectContext, logs some events, and
-// shuts down the worker thread. Checks that the LocalAggregateStore was backed
-// up at least once during the lifetime of the worker thread.
+// shuts down the worker thread. Checks that the LocalAggregateStore was
+// backed up at least once during the lifetime of the worker thread.
 TEST_F(EventAggregatorWorkerTest, LogEvents) {
   auto day_index = CurrentDayIndex();
   event_aggregator_->Start();
-  // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
-  auto unique_actives_project_context =
-      MakeProjectContext(kUniqueActivesMetricDefinitions);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
-                     *unique_actives_project_context));
+  // Provide the EventAggregator with |kMetricDefinitions|.
+  auto project_context = MakeProjectContext(kMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
   // Log some events.
   LoggedActivity logged_activity;
-  EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
-                                       kDeviceBootsMetricReportId, day_index,
-                                       0u, &logged_activity));
-  EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
-                                       kFeaturesActiveMetricReportId, day_index,
-                                       4u, &logged_activity));
-  EXPECT_EQ(kOK, LogUniqueActivesEvent(*unique_actives_project_context,
+  EXPECT_EQ(kOK, LogUniqueActivesEvent(*project_context,
                                        kErrorsOccurredMetricReportId, day_index,
                                        1u, &logged_activity));
-  EXPECT_TRUE(CheckAggregateStore(logged_activity, day_index));
+  EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
   ShutDownWorkerThread();
   EXPECT_GE(local_aggregate_proto_store_->write_count_, 1);
 }
 
-// Tests that GenerateObservationsNoWorker returns the error status kOther if
-// called while the worker thread is running.
-TEST_F(EventAggregatorWorkerTest, GenerateObservationsNoWorker) {
-  event_aggregator_->Start();
-  ASSERT_TRUE(in_run_state());
-  EXPECT_EQ(kOther,
-            event_aggregator_->GenerateObservationsNoWorker(CurrentDayIndex()));
-  ShutDownWorkerThread();
-}
-
 }  // namespace logger
 }  // namespace cobalt
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto
index dbab1b8..28ceb2f 100644
--- a/logger/local_aggregation.proto
+++ b/logger/local_aggregation.proto
@@ -22,15 +22,59 @@
 // logged events.
 message LocalAggregateStore {
   // Keyed by base64-encoded serializations of ReportAggregationKey messages.
-  map<string, ReportAggregates> aggregates = 1;
+  map<string, ReportAggregates> by_report_key = 1;
 }
 
 message ReportAggregates {
-  // Keyed by event code.
-  map<uint32, DailyAggregates> by_event_code = 1;
+  // A collection of aggregates whose form depends on the report type.
+  oneof type {
+    UniqueActivesReportAggregates unique_actives_aggregates = 1;
+    PerDeviceCountReportAggregates count_aggregates = 2;
+  }
   // The configuration for the report represented by the ReportAggregationKey
   // of this ReportAggregates.
-  AggregationConfig aggregation_config = 2;
+  AggregationConfig aggregation_config = 100;
+}
+
+message UniqueActivesReportAggregates {
+  // Keyed by event code.
+  map<uint32, DailyAggregates> by_event_code = 1;
+}
+
+message PerDeviceCountReportAggregates {
+  // Keyed by component string.
+  map<string, EventCodeAggregates> by_component = 1;
+}
+
+message EventCodeAggregates {
+  // Keyed by event code.
+  map<uint32, DailyAggregates> by_event_code = 1;
+}
+
+message DailyAggregates {
+  // Keyed by day index.
+  map<uint32, DailyAggregate> by_day_index = 1;
+}
+
+// A value formed by aggregating the events logged for a single report, event
+// code, and day index.
+message DailyAggregate {
+  oneof type {
+    ActivityDailyAggregate activity_daily_aggregate = 1;
+    CountDailyAggregate count_daily_aggregate = 2;
+  }
+}
+
+// A representation of the occurrence or non-occurrence of an event code on
+// a given day.
+message ActivityDailyAggregate {
+  bool activity_indicator = 1;
+}
+
+// A representation of the number of occurrences of a particular event code,
+// with a particular component label, on a given day.
+message CountDailyAggregate {
+  int64 count = 1;
 }
 
 // A representation of the configuration of a locally aggregated report and
@@ -54,39 +98,25 @@
   ReportDefinition report = 3;
 }
 
-message DailyAggregates {
-  // Keyed by day index.
-  map<uint32, DailyAggregate> by_day_index = 1;
-}
-
-// A value formed by aggregating the events logged for a single report, event
-// code, and day index.
-message DailyAggregate {
-  oneof type {
-    ActivityDailyAggregate activity_daily_aggregate = 1;
-  }
-}
-
-// A representation of the occurrence or non-occurrence of an event code on
-// a given day.
-message ActivityDailyAggregate {
-  bool activity_indicator = 1;
-}
-
 // A container used by the EventAggregator to store the latest day index for
 // which an Observation has been generated for each report, event code, and
 // window size.
-message AggregatedObservationHistory {
+message AggregatedObservationHistoryStore {
   // Keyed by base64-encoded serializations of ReportAggregationKey messages.
-  map<string, HistoryByEventCode> by_report_key = 1;
+  map<string, AggregatedObservationHistory> by_report_key = 1;
 }
 
-message HistoryByEventCode {
+message AggregatedObservationHistory {
+  oneof type {
+    UniqueActivesObservationHistory unique_actives_history = 1;
+  }
+}
+
+message UniqueActivesObservationHistory {
   // Keyed by event code.
   map<uint32, HistoryByWindowSize> by_event_code = 1;
 }
 
-
 message HistoryByWindowSize {
   // Keyed by window size. The value at a window size is the latest day index
   // for which an Observation has been generated for this report, event code,
diff --git a/logger/logger_test.cc b/logger/logger_test.cc
index 4cc2261..4ff2825 100644
--- a/logger/logger_test.cc
+++ b/logger/logger_test.cc
@@ -56,6 +56,8 @@
 using testing::TestUpdateRecipient;
 
 namespace {
+// Number of seconds in a day
+const int kDay = 60 * 60 * 24;
 // Number of seconds in an ideal year
 const int kYear = kDay * 365;
 
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
index 535c1b5..16a25b6 100644
--- a/logger/logger_test_utils.h
+++ b/logger/logger_test_utils.h
@@ -90,23 +90,24 @@
   int write_count_;
 };
 
-// A container for information about the set of all locally aggregated
-// reports in a configuration. This is used by tests to check the output of the
+// A container for information about the locally aggregated Observations which
+// should be generated for a given day, for all locally aggregated reports in a
+// configuration. This is used by tests to check the output of the
 // EventAggregator.
 typedef struct ExpectedAggregationParams {
   // The total number of locally aggregated Observations which should be
-  // generated each day.
+  // generated for the day.
   size_t daily_num_obs = 0;
   // The MetricReportIds of the locally aggregated reports in this
   // configuration.
   std::set<MetricReportId> metric_report_ids;
   // Keys are the MetricReportIds of all locally aggregated reports in a config.
   // The value at a key is the number of Observations which should be generated
-  // each day for that report.
+  // for that report, for the day.
   std::map<MetricReportId, size_t> num_obs_per_report;
-  // Keys are the MetricReportIds of all locally aggregated reports in a config.
-  // The value at a key is the number of event codes for that report's
-  // parent MetricDefinition.
+  // Keys are the MetricReportIds of the UNIQUE_N_DAY_ACTIVES reports in the
+  // config. The value at a MetricReportId is the number of event codes for that
+  // report's parent metric.
   std::map<MetricReportId, size_t> num_event_codes;
   // Keys are the MetricReportIds of all locally aggregated reports in a config.
   // The value at a key is the set of window sizes of that report.