[EventAggregator] Reland EventAggregator

Relanding commit 2d1c59e689c85e769deb6623305464335331bd7a with
additional BUILD.gn files.

Change-Id: Idc77b3ac562fe437045eb5619b99803dca9b47e1
diff --git a/encoder/BUILD.gn b/encoder/BUILD.gn
index 933d215..e53ee05 100644
--- a/encoder/BUILD.gn
+++ b/encoder/BUILD.gn
@@ -17,6 +17,7 @@
     "memory_observation_store.h",
     "observation_store.cc",
     "observation_store.h",
+    "observation_store_update_recipient.h",
     "project_context.cc",
     "project_context.h",
     "send_retryer.cc",
diff --git a/event.proto b/event.proto
index 631001e..23647e6 100644
--- a/event.proto
+++ b/event.proto
@@ -29,7 +29,7 @@
 
   // The three-part unique numerical identifier of the Metric that this
   // Event is associated with.
-  uint32 cutomer_id = 1000;
+  uint32 customer_id = 1000;
   uint32 project_id = 1001;
   uint32 metric_id = 1002;
 
diff --git a/logger/BUILD.gn b/logger/BUILD.gn
index 90cc154..ea6c4f3 100644
--- a/logger/BUILD.gn
+++ b/logger/BUILD.gn
@@ -2,6 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import("//third_party/protobuf/proto_library.gni")
 import("//third_party/cobalt_config/metrics_registry.gni")
 
 metrics_registry("internal_metrics_config") {
@@ -13,12 +14,46 @@
   generate_cc = true
 }
 
+proto_library("cobalt_local_aggregation_proto") {
+  proto_in_dir = "//third_party/cobalt"
+  sources = [
+    "local_aggregation.proto",
+  ]
+  import_dirs = [ "//third_party/protobuf/src" ]
+  generate_python = false
+  cc_generator_options = "lite"
+  deps = [
+    "//third_party/cobalt/config:cobalt_config_proto",
+  ]
+
+  extra_configs = [
+    "//third_party/cobalt:cobalt_config",
+    "//third_party/cobalt/config:proto_config",
+  ]
+}
+
 source_set("status") {
   sources = [
     "status.h",
   ]
 }
 
+source_set("event_record") {
+  sources = [
+    "event_record.h",
+  ]
+
+  public_configs = [
+    "//third_party/cobalt:cobalt_config",
+    "//third_party/cobalt/config:proto_config",
+  ]
+
+  public_deps = [
+    "//third_party/cobalt:cobalt_proto",
+    "//third_party/cobalt/config:cobalt_config_proto",
+  ]
+}
+
 source_set("project_context") {
   sources = [
     "project_context.cc",
@@ -52,6 +87,8 @@
     "//garnet/public/lib/fxl",
     "//third_party/abseil-cpp",
     "//third_party/cobalt/config:cobalt_config_proto",
+    "//third_party/cobalt/encoder",
+    "//third_party/cobalt/util/crypto_util",
   ]
 }
 
@@ -67,6 +104,33 @@
     ":status",
     "//garnet/public/lib/fxl",
     "//third_party/cobalt:cobalt_proto",
+    "//third_party/cobalt/encoder",
+    "//third_party/cobalt/util:encrypted_message_util",
+  ]
+}
+
+source_set("event_aggregator") {
+  sources = [
+    "event_aggregator.cc",
+    "event_aggregator.h",
+  ]
+
+  public_configs = [
+    "//third_party/cobalt:cobalt_config",
+    "//third_party/cobalt/config:proto_config",
+  ]
+
+  public_deps = [
+    ":cobalt_local_aggregation_proto",
+    ":encoder",
+    ":event_record",
+    ":observation_writer",
+    ":status",
+    "//garnet/public/lib/fxl",
+    "//third_party/cobalt/algorithms/rappor:rappor_encoder",
+    "//third_party/cobalt:cobalt_proto",
+    "//third_party/cobalt/config:cobalt_config_proto",
+    "//third_party/cobalt/util:proto_util",
   ]
 }
 
@@ -108,6 +172,8 @@
 
   public_deps = [
     ":encoder",
+    ":event_aggregator",
+    ":event_record",
     ":internal_metrics",
     ":logger_interface",
     ":observation_writer",
diff --git a/logger/CMakeLists.txt b/logger/CMakeLists.txt
index a7c2a7d..ac91521 100644
--- a/logger/CMakeLists.txt
+++ b/logger/CMakeLists.txt
@@ -10,8 +10,16 @@
   OUTPUT_FILE "${INTERNAL_METRICS_CONFIG_H}"
 )
 
+cobalt_make_protobuf_cpp_lib(local_aggregation_proto_lib
+                             LOCAL_AGGREGATION_PROTO_HDRS
+                             false
+                             local_aggregation)
+add_cobalt_dependencies(local_aggregation_proto_lib)
+
 add_library(project_context
             project_context.cc)
+target_link_libraries(project_context
+                      local_aggregation_proto_lib)
 add_cobalt_dependencies(project_context)
 target_link_libraries(project_context project_configs tensorflow_statusor)
 
@@ -24,6 +32,12 @@
             "${INTERNAL_METRICS_CONFIG_H}")
 add_cobalt_dependencies(internal_metrics)
 
+add_library(event_aggregator
+            event_aggregator.cc)
+target_link_libraries(event_aggregator
+                      local_aggregation_proto_lib
+                      proto_util)
+add_cobalt_dependencies(event_aggregator)
 
 add_library(encoder2
             encoder.cc)
@@ -40,6 +54,7 @@
 target_link_libraries(logger
                       encoder2
                       encrypted_message_util
+                      event_aggregator
                       observation_writer
                       project_context
                       rappor_config_helper
@@ -47,11 +62,24 @@
                       config_ids)
 add_cobalt_dependencies(logger)
 
+add_library(logger_test_utils
+            logger_test_utils.cc)
+target_link_libraries(logger_test_utils
+                      encoder2
+                      encoder
+                      logger
+                      local_aggregation_proto_lib
+		      proto_util)
+add_cobalt_dependencies(logger_test_utils)
+
 add_executable(logger_tests
                encoder_test.cc
+               event_aggregator_test.cc
                logger_test.cc)
 target_link_libraries(logger_tests
                       encoder2
-                      logger)
+                      event_aggregator
+                      logger
+                      logger_test_utils)
 add_cobalt_test_dependencies(logger_tests ${DIR_GTESTS})
 add_dependencies(logger_tests build_config_parser)
diff --git a/logger/event_aggregator.cc b/logger/event_aggregator.cc
new file mode 100644
index 0000000..f6aa779
--- /dev/null
+++ b/logger/event_aggregator.cc
@@ -0,0 +1,262 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "logger/event_aggregator.h"
+
+#include <utility>
+
+#include "algorithms/rappor/rappor_config_helper.h"
+#include "logger/project_context.h"
+#include "util/proto_util.h"
+
+namespace cobalt {
+
+using rappor::RapporConfigHelper;
+using util::SerializeToBase64;
+
+namespace logger {
+
+EventAggregator::EventAggregator(const Encoder* encoder,
+                                 const ObservationWriter* observation_writer,
+                                 LocalAggregateStore* local_aggregate_store)
+    : encoder_(encoder),
+      observation_writer_(observation_writer),
+      local_aggregate_store_(local_aggregate_store) {}
+
+Status EventAggregator::UpdateAggregationConfigs(
+    const ProjectContext& project_context) {
+  std::string key;
+  ReportAggregationKey key_data;
+  key_data.set_customer_id(project_context.project().customer_id());
+  key_data.set_project_id(project_context.project().project_id());
+  for (const auto& metric : project_context.metric_definitions()->metric()) {
+    switch (metric.metric_type()) {
+      case MetricDefinition::EVENT_OCCURRED: {
+        key_data.set_metric_id(metric.id());
+        for (const auto& report : metric.reports()) {
+          switch (report.report_type()) {
+            case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+              key_data.set_report_id(report.id());
+              if (!SerializeToBase64(key_data, &key)) {
+                return kInvalidArguments;
+              }
+              // TODO(pesk): update the EventAggregator's view of a Metric
+              // or ReportDefinition when appropriate.
+              if (local_aggregate_store_->aggregates().count(key) == 0) {
+                AggregationConfig aggregation_config;
+                *aggregation_config.mutable_project() =
+                    project_context.project();
+                *aggregation_config.mutable_metric() =
+                    *project_context.GetMetric(metric.id());
+                *aggregation_config.mutable_report() = report;
+                ReportAggregates report_aggregates;
+                *report_aggregates.mutable_aggregation_config() =
+                    aggregation_config;
+                (*local_aggregate_store_->mutable_aggregates())[key] =
+                    report_aggregates;
+              }
+            }
+            default:
+              continue;
+          }
+        }
+      }
+      default:
+        continue;
+    }
+  }
+  return kOK;
+}
+
+Status EventAggregator::LogUniqueActivesEvent(uint32_t report_id,
+                                              EventRecord* event_record) const {
+  if (!event_record->event->has_occurrence_event()) {
+    LOG(ERROR) << "EventAggregator::LogUniqueActivesEvent can only "
+                  "accept OccurrenceEvents.";
+    return kInvalidArguments;
+  }
+  ReportAggregationKey key_data;
+  key_data.set_customer_id(event_record->metric->customer_id());
+  key_data.set_project_id(event_record->metric->project_id());
+  key_data.set_metric_id(event_record->metric->id());
+  key_data.set_report_id(report_id);
+  std::string key;
+  if (!SerializeToBase64(key_data, &key)) {
+    return kInvalidArguments;
+  }
+  auto aggregates = local_aggregate_store_->mutable_aggregates()->find(key);
+  if (aggregates == local_aggregate_store_->mutable_aggregates()->end()) {
+    LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
+    return kInvalidArguments;
+  }
+  (*(*aggregates->second.mutable_by_event_code())
+        [event_record->event->occurrence_event().event_code()]
+            .mutable_by_day_index())[event_record->event->day_index()]
+      .mutable_activity_daily_aggregate()
+      ->set_activity_indicator(true);
+  return kOK;
+}
+
+Status EventAggregator::GenerateObservations(uint32_t final_day_index) {
+  for (auto pair : local_aggregate_store_->aggregates()) {
+    const auto& config = pair.second.aggregation_config();
+    switch (config.metric().metric_type()) {
+      case MetricDefinition::EVENT_OCCURRED:
+        switch (config.report().report_type()) {
+          case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+            auto status =
+                GenerateUniqueActivesObservations(pair.second, final_day_index);
+            if (status != kOK) {
+              return status;
+            }
+          }
+          default:
+            continue;
+        }
+      default:
+        continue;
+    }
+  }
+  return kOK;
+}
+
+Status EventAggregator::GarbageCollect(uint32_t day_index) {
+  for (auto pair : local_aggregate_store_->aggregates()) {
+    // Determine the largest window size in the report associated to this
+    // key-value pair.
+    uint32_t max_window_size = 1;
+    for (uint32_t window_size :
+         pair.second.aggregation_config().report().window_size()) {
+      if (window_size > max_window_size) {
+        max_window_size = window_size;
+      }
+    }
+    // For each event code, iterate over the sub-map of local aggregates
+    // keyed by day index. Keep buckets with day indices greater than
+    // |day_index| - |max_window_size|, and remove all buckets with smaller day
+    // indices.
+    for (auto event_code_aggregates : pair.second.by_event_code()) {
+      for (auto day_aggregates : event_code_aggregates.second.by_day_index()) {
+        if (day_aggregates.first <= day_index - max_window_size) {
+          local_aggregate_store_->mutable_aggregates()
+              ->at(pair.first)
+              .mutable_by_event_code()
+              ->at(event_code_aggregates.first)
+              .mutable_by_day_index()
+              ->erase(day_aggregates.first);
+        }
+      }
+      // If the day index map under this event code is empty, remove the event
+      // code from the event code map under this ReportAggregationKey.
+      if (local_aggregate_store_->aggregates()
+              .at(pair.first)
+              .by_event_code()
+              .at(event_code_aggregates.first)
+              .by_day_index()
+              .empty()) {
+        local_aggregate_store_->mutable_aggregates()
+            ->at(pair.first)
+            .mutable_by_event_code()
+            ->erase(event_code_aggregates.first);
+      }
+    }
+  }
+  return kOK;
+}
+
+Status EventAggregator::GenerateUniqueActivesObservations(
+    const ReportAggregates& report_aggregates, uint32_t final_day_index) const {
+  const auto& config = report_aggregates.aggregation_config();
+  auto metric_ref = MetricRef(&config.project(), &config.metric());
+  auto num_event_codes =
+      RapporConfigHelper::BasicRapporNumCategories(config.metric());
+
+  Status status;
+  for (uint32_t event_code = 0; event_code < num_event_codes; event_code++) {
+    // If no occurrences have been logged for this event code, generate
+    // a negative Observation for each window size specified in |report|.
+    if (report_aggregates.by_event_code().count(event_code) == 0) {
+      for (auto window_size : config.report().window_size()) {
+        status = GenerateSingleUniqueActivesObservation(
+            metric_ref, &config.report(), final_day_index, event_code, false,
+            window_size);
+        if (status != kOK) {
+          return status;
+        }
+      }
+    } else {
+      // If an occurrence has been logged for this event code, then
+      // for each window size, check whether a logged occurrence fell within
+      // the window and generate a positive or null Observation
+      // accordingly.
+      const auto& daily_aggregates =
+          report_aggregates.by_event_code().at(event_code).by_day_index();
+      uint32_t max_window_size = 0;
+      for (uint32_t window_size : config.report().window_size()) {
+        if (window_size > max_window_size) {
+          max_window_size = window_size;
+        }
+      }
+      if (max_window_size <= 0) {
+        LOG(ERROR) << "Maximum window size must be positive";
+        return kInvalidConfig;
+      }
+      uint32_t days_since_activity = max_window_size;
+      for (uint32_t day_index = final_day_index;
+           day_index > final_day_index - max_window_size; day_index--) {
+        if (daily_aggregates.count(day_index) > 0 &&
+            daily_aggregates.at(day_index)
+                    .activity_daily_aggregate()
+                    .activity_indicator() == true) {
+          days_since_activity = final_day_index - day_index;
+          break;
+        }
+      }
+      // If the most recent activity falls within a given window, generate an
+      // observation of activity. Otherwise, generate an observation of
+      // inactivity.
+      for (uint32_t window_size : config.report().window_size()) {
+        if (window_size > days_since_activity) {
+          status = GenerateSingleUniqueActivesObservation(
+              metric_ref, &config.report(), final_day_index, event_code,
+              /* was_active */ true, window_size);
+        } else {
+          status = GenerateSingleUniqueActivesObservation(
+              metric_ref, &config.report(), final_day_index, event_code,
+              /* was_active */ false, window_size);
+        }
+      }
+      if (status != kOK) {
+        return status;
+      }
+    }
+  }
+  return kOK;
+}
+
+Status EventAggregator::GenerateSingleUniqueActivesObservation(
+    const MetricRef metric_ref, const ReportDefinition* report,
+    uint32_t final_day_index, uint32_t event_code, bool was_active,
+    uint32_t window_size) const {
+  auto encoder_result = encoder_->EncodeUniqueActivesObservation(
+      metric_ref, report, final_day_index, event_code, was_active, window_size);
+  if (encoder_result.status != kOK) {
+    return encoder_result.status;
+  }
+  if (encoder_result.observation == nullptr ||
+      encoder_result.metadata == nullptr) {
+    LOG(ERROR) << "Failed to encode UniqueActivesObservation";
+    return kOther;
+  }
+
+  auto writer_status = observation_writer_->WriteObservation(
+      *encoder_result.observation, std::move(encoder_result.metadata));
+  if (writer_status != kOK) {
+    return writer_status;
+  }
+  return kOK;
+}
+
+}  // namespace logger
+}  // namespace cobalt
diff --git a/logger/event_aggregator.h b/logger/event_aggregator.h
new file mode 100644
index 0000000..4cadbe2
--- /dev/null
+++ b/logger/event_aggregator.h
@@ -0,0 +1,133 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_LOGGER_EVENT_AGGREGATOR_H_
+#define COBALT_LOGGER_EVENT_AGGREGATOR_H_
+
+#include <string>
+
+#include "./event.pb.h"
+#include "./logging.h"
+#include "./observation2.pb.h"
+#include "config/metric_definition.pb.h"
+#include "config/report_definition.pb.h"
+#include "logger/encoder.h"
+#include "logger/event_record.h"
+#include "logger/local_aggregation.pb.h"
+#include "logger/observation_writer.h"
+#include "logger/status.h"
+
+namespace cobalt {
+namespace logger {
+
+// The EventAggregator class manages an in-memory store of aggregated values of
+// Events logged for locally aggregated report types. For each day, this
+// LocalAggregateStore contains an aggregate of the values of logged Events of a
+// given event code over that day. These daily aggregates are used to produce
+// Observations of values aggregated over a rolling window of size specified in
+// the ReportDefinition.
+//
+// Each Logger interacts with the EventAggregator in the following way:
+// (1) When the Logger is created, it calls UpdateAggregationConfigs() to
+// provide the EventAggregator with the configurations of its locally aggregated
+// reports.
+// (2) When logging an Event for a locally aggregated report, a Logger calls
+// an Update*Aggregation() method with the Event and the ReportAggregationKey of
+// the report.
+//
+// TODO(pesk): Write the following worker thread.
+// In the future, a worker thread will do the following on a daily schedule:
+// (1) Call GenerateObservations() with the previous day's day index to generate
+// all Observations for rolling windows ending on that day index.
+// (2) Call GarbageCollect() to delete daily aggregates which are not needed to
+// compute aggregates for any windows of interest.
+class EventAggregator {
+ public:
+  // Constructs an EventAggregator.
+  //
+  // encoder: the singleton instance of an Encoder on the system.
+  //
+  // observation_writer: the singleton instance of an ObservationWriter on the
+  // system.
+  //
+  // local_aggregate_store: a LocalAggregateStore proto message.
+  EventAggregator(const Encoder* encoder,
+                  const ObservationWriter* observation_writer,
+                  LocalAggregateStore* local_aggregate_store);
+
+  // Updates the EventAggregator's view of the set of locally aggregated report
+  // configurations.
+  //
+  // This method may be called multiple times during the EventAggregator's
+  // lifetime. If the EventAggregator is provided with a report whose tuple
+  // (customer ID, project ID, metric ID, report ID) matches that of a
+  // previously provided report, then the new report is ignored, even if other
+  // properties of the customer, Project, MetricDefinition, or ReportDefinition
+  // differ from those of the existing report.
+  Status UpdateAggregationConfigs(const ProjectContext& project_context);
+
+  // Logs an Event associated to a report of type UNIQUE_N_DAY_ACTIVES to the
+  // EventAggregator.
+  //
+  // report_id: the ID of the report associated to the logged Event.
+  //
+  // event_record: an EventRecord wrapping an Event of type OccurrenceEvent and
+  // the MetricDefinition for which the Event is to be logged.
+  //
+  // Returns kOK if the LocalAggregateStore was successfully updated, and
+  // kInvalidArguments if either a lookup key corresponding to |report_id|  was
+  // not found in the LocalAggregateStore, or if the Event wrapped by
+  // EventRecord is not of type OccurrenceEvent.
+  //
+  // The EventAggregator does not explicitly validate the event code of
+  // the logged Event, and if the event code is larger than the associated
+  // metric's max_event_code then the EventAggregator will form and store an
+  // aggregate map for that event code just as it does for a valid event code.
+  // However, Observations will only be generated for valid event codes, and
+  // aggregates associated with invalid event codes will be garbage-collected
+  // together with valid aggregates when EventAggregator::GarbageCollect() is
+  // called.
+  Status LogUniqueActivesEvent(uint32_t report_id,
+                               EventRecord* event_record) const;
+
+  // Generates an Observation for each window size in each ReportDefinition
+  // known to the EventAggregator, for each event code less than the parent
+  // MetricDefinition's max_event_code, for the rolling window of that size
+  // ending on |day_index|.
+  Status GenerateObservations(uint32_t day_index);
+
+  // Removes from the LocalAggregateStore all daily aggregates which are too old
+  // to contribute to their parent report's largest rolling window on
+  // |day_index|.
+  Status GarbageCollect(uint32_t day_index);
+
+ private:
+  friend class LoggerTest;
+
+  // For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
+  // for each event code of the parent metric, for each window size of the
+  // report, for the window of that size ending on |final_day_index|. Writes the
+  // Observation to an ObservationStore via the ObservationWriter.
+  Status GenerateUniqueActivesObservations(
+      const ReportAggregates& report_aggregates,
+      uint32_t final_day_index) const;
+
+  // Called by GenerateUniqueActivesObservations to generate a single
+  // Observation.
+  Status GenerateSingleUniqueActivesObservation(const MetricRef metric_ref,
+                                                const ReportDefinition* report,
+                                                uint32_t final_day_index,
+                                                uint32_t event_code,
+                                                bool was_active,
+                                                uint32_t window_size) const;
+
+  const Encoder* encoder_;
+  const ObservationWriter* observation_writer_;
+  LocalAggregateStore* local_aggregate_store_;
+};
+
+}  // namespace logger
+}  // namespace cobalt
+
+#endif  // COBALT_LOGGER_EVENT_AGGREGATOR_H_
diff --git a/logger/event_aggregator_test.cc b/logger/event_aggregator_test.cc
new file mode 100644
index 0000000..7d045be
--- /dev/null
+++ b/logger/event_aggregator_test.cc
@@ -0,0 +1,1121 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "logger/event_aggregator.h"
+
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/util/message_differencer.h>
+#include <map>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "./event.pb.h"
+#include "./gtest.h"
+#include "logger/logger_test_utils.h"
+#include "util/proto_util.h"
+
+using ::google::protobuf::util::MessageDifferencer;
+
+namespace cobalt {
+
+using encoder::ClientSecret;
+using encoder::ObservationStoreUpdateRecipient;
+using encoder::ObservationStoreWriterInterface;
+using encoder::SystemDataInterface;
+using util::EncryptedMessageMaker;
+using util::SerializeToBase64;
+
+namespace logger {
+
+using testing::ExpectedActivity;
+using testing::ExpectedAggregationParams;
+using testing::FakeObservationStore;
+using testing::FetchAggregatedObservations;
+using testing::MakeAggregationConfig;
+using testing::MakeAggregationKey;
+using testing::MakeNullExpectedActivity;
+using testing::PopulateMetricDefinitions;
+using testing::TestUpdateRecipient;
+
+namespace {
+
+static const uint32_t kCustomerId = 1;
+static const uint32_t kProjectId = 1;
+static const char kCustomerName[] = "Fuchsia";
+static const char kProjectName[] = "Cobalt";
+static const uint32_t kStartDayIndex = 100;
+
+// Pairs (metric ID, report ID) for the locally aggregated reports defined in
+// |kMetricDefinitions| and |kNoiseFreeUniqueActivesMetricDefinitions|.
+const MetricReportId kDeviceBootsMetricReportId = MetricReportId(10, 101);
+const MetricReportId kFeaturesActiveMetricReportId = MetricReportId(20, 201);
+const MetricReportId kErrorsOccurredMetricReportId = MetricReportId(30, 302);
+const MetricReportId kEventsOccurredMetricReportId = MetricReportId(40, 402);
+
+// A set of metric definitions of type EVENT_OCCURRED, each of which has a
+// UNIQUE_N_DAY_ACTIVES report.
+static const char kUniqueActivesMetricDefinitions[] = R"(
+metric {
+  metric_name: "DeviceBoots"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 10
+  max_event_code: 1
+  reports: {
+    report_name: "DeviceBoots_UniqueDevices"
+    id: 101
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: MEDIUM
+    window_size: 1
+  }
+}
+
+metric {
+  metric_name: "FeaturesActive"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 20
+  max_event_code: 4
+  reports: {
+    report_name: "FeaturesActive_UniqueDevices"
+    id: 201
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: LARGE
+    window_size: 7
+    window_size: 30
+  }
+}
+
+metric {
+  metric_name: "ErrorsOccurred"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 30
+  max_event_code: 2
+  reports: {
+    report_name: "ErrorsOccurred_SimpleCount"
+    id: 301
+    report_type: SIMPLE_OCCURRENCE_COUNT
+    local_privacy_noise_level: NONE
+  }
+  reports: {
+    report_name: "ErrorsOccurred_UniqueDevices"
+    id: 302
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: LARGE
+    window_size: 1
+    window_size: 7
+    window_size: 30
+  }
+}
+
+)";
+
+// Properties of the locally aggregated reports in
+// |kUniqueActivesMetricDefinitions|.
+static const ExpectedAggregationParams kUniqueActivesExpectedParams = {
+    /* The total number of locally aggregated Observations which should be
+       generated for each day index. */
+    21,
+    /* The MetricReportIds of the locally aggregated reports in this
+       configuration. */
+    {kDeviceBootsMetricReportId, kFeaturesActiveMetricReportId,
+     kErrorsOccurredMetricReportId},
+    /* The number of Observations which should be generated for each day
+       index, broken down by MetricReportId. */
+    {{kDeviceBootsMetricReportId, 2},
+     {kFeaturesActiveMetricReportId, 10},
+     {kErrorsOccurredMetricReportId, 9}},
+    /* The number of event codes for each MetricReportId. */
+    {{kDeviceBootsMetricReportId, 2},
+     {kFeaturesActiveMetricReportId, 5},
+     {kErrorsOccurredMetricReportId, 3}},
+    /* The set of window sizes for each MetricReportId. */
+    {{kDeviceBootsMetricReportId, {1}},
+     {kFeaturesActiveMetricReportId, {7, 30}},
+     {kErrorsOccurredMetricReportId, {1, 7, 30}}}};
+
+// A set of MetricDefinitions of type EVENT_OCCURRED, each of which has a
+// UNIQUE_N_DAY_ACTIVES report with local_privacy_noise_level set to NONE.
+static const char kNoiseFreeUniqueActivesMetricDefinitions[] = R"(
+metric {
+  metric_name: "DeviceBoots"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 10
+  max_event_code: 1
+  reports: {
+    report_name: "DeviceBoots_UniqueDevices"
+    id: 101
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+  }
+}
+
+metric {
+  metric_name: "FeaturesActive"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 20
+  max_event_code: 4
+  reports: {
+    report_name: "FeaturesActive_UniqueDevices"
+    id: 201
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+    window_size: 7
+    window_size: 30
+  }
+}
+
+metric {
+  metric_name: "EventsOccurred"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 40
+  max_event_code: 4
+  reports: {
+    report_name: "EventsOccurred_SimpleCount"
+    id: 401
+    report_type: SIMPLE_OCCURRENCE_COUNT
+    local_privacy_noise_level: NONE
+  }
+  reports: {
+    report_name: "EventsOccurred_UniqueDevices"
+    id: 402
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+    window_size: 7
+  }
+}
+)";
+
+// Properties of the locally aggregated reports in
+// |kNoiseFreeUniqueActivesMetricDefinitions|.
+static const ExpectedAggregationParams kNoiseFreeUniqueActivesExpectedParams = {
+    /* The total number of locally aggregated Observations which should be
+       generated for each day index. */
+    27,
+    /* The MetricReportIds of the locally aggregated reports in this
+configuration. */
+    {kDeviceBootsMetricReportId, kFeaturesActiveMetricReportId,
+     kEventsOccurredMetricReportId},
+    /* The number of Observations which should be generated for each day
+       index, broken down by MetricReportId. */
+    {{kDeviceBootsMetricReportId, 2},
+     {kFeaturesActiveMetricReportId, 15},
+     {kEventsOccurredMetricReportId, 10}},
+    /* The number of event codes for each MetricReportId. */
+    {{kDeviceBootsMetricReportId, 2},
+     {kFeaturesActiveMetricReportId, 5},
+     {kEventsOccurredMetricReportId, 5}},
+    /* The set of window sizes for each MetricReportId. */
+    {{kDeviceBootsMetricReportId, {1}},
+     {kFeaturesActiveMetricReportId, {1, 7, 30}},
+     {kEventsOccurredMetricReportId, {1, 7}}}};
+
+// A map keyed by base64-encoded, serialized ReportAggregationKeys. The value at
+// a key is a map of event codes to sets of day indices. Used in tests as
+// a record, external to the LocalAggregateStore, of the activity logged for
+// UNIQUE_N_DAY_ACTIVES reports.
+typedef std::map<std::string, std::map<uint32_t, std::set<uint32_t>>>
+    LoggedActivity;
+
+// Given a string representing a MetricDefinitions proto message, creates a
+// ProjectContext from that MetricDefinitions and returns a unique pointer.
+std::unique_ptr<ProjectContext> MakeProjectContext(const char metric_string[]) {
+  auto metric_definitions = std::make_unique<MetricDefinitions>();
+  if (!PopulateMetricDefinitions(metric_string, metric_definitions.get())) {
+    return nullptr;
+  }
+  auto project_context = std::make_unique<ProjectContext>(
+      kCustomerId, kProjectId, kCustomerName, kProjectName,
+      std::move(metric_definitions));
+  return project_context;
+}
+
+}  // namespace
+
+// EventAggregatorTest creates an EventAggregator which sends its Observations
+// to a FakeObservationStore. The EventAggregator is not pre-populated with
+// aggregation configurations.
+class EventAggregatorTest : public ::testing::Test {
+ protected:
+  void SetUp() {
+    observation_store_.reset(new FakeObservationStore);
+    update_recipient_.reset(new TestUpdateRecipient);
+    observation_encrypter_.reset(
+        new EncryptedMessageMaker("", EncryptedMessage::NONE));
+    observation_writer_.reset(
+        new ObservationWriter(observation_store_.get(), update_recipient_.get(),
+                              observation_encrypter_.get()));
+    encoder_.reset(
+        new Encoder(ClientSecret::GenerateNewSecret(), system_data_.get()));
+    local_aggregate_store_.reset(new LocalAggregateStore);
+    event_aggregator_.reset(new EventAggregator(encoder_.get(),
+                                                observation_writer_.get(),
+                                                local_aggregate_store_.get()));
+  }
+
+  // Clears the FakeObservationStore and resets the TestUpdateRecipient's count
+  // of received observations.
+  void ResetObservationStore() {
+    observation_store_->messages_received.clear();
+    observation_store_->metadata_received.clear();
+    update_recipient_->invocation_count = 0;
+  }
+
+  // Given a ProjectContext |project_context| and the MetricReportId of a
+  // UNIQUE_N_DAY_ACTIVES report in |project_context|, as well as a day index
+  // and an event code, logs a UniqueActivesEvent to the EventAggregator for
+  // that report, day index, and event code. If a non-null LoggedActivity map is
+  // provided, updates the map with information about the logged Event.
+  Status LogUniqueActivesEvent(const ProjectContext& project_context,
+                               const MetricReportId& metric_report_id,
+                               uint32_t day_index, uint32_t event_code,
+                               LoggedActivity* logged_activity = nullptr) {
+    EventRecord event_record;
+    event_record.metric = project_context.GetMetric(metric_report_id.first);
+    event_record.event->set_day_index(day_index);
+    event_record.event->mutable_occurrence_event()->set_event_code(event_code);
+    auto status = event_aggregator_->LogUniqueActivesEvent(
+        metric_report_id.second, &event_record);
+    if (logged_activity == nullptr) {
+      return status;
+    }
+    std::string key;
+    if (!SerializeToBase64(
+            MakeAggregationKey(project_context, metric_report_id), &key)) {
+      return kInvalidArguments;
+    }
+    if (logged_activity->count(key) == 0) {
+      logged_activity->insert(
+          std::make_pair(key, std::map<uint32_t, std::set<uint32_t>>({})));
+    }
+    if (logged_activity->at(key).count(event_code) == 0) {
+      logged_activity->at(key).insert(
+          std::make_pair(event_code, std::set<uint32_t>({})));
+    }
+    (logged_activity->at(key).at(event_code)).insert(day_index);
+    return status;
+  }
+
+  // Given a LoggedActivity map describing the events that have been logged to
+  // the EventAggregator, checks whether the contents of the LocalAggregateStore
+  // are as expected, accounting for any garbage collection.
+  //
+  // logged_activity: a LoggedActivity representing event occurrences
+  // since the LocalAggregateStore was created. All day indices should be
+  // greater than or equal to |kStartDayIndex| and less than or equal to
+  // |current_day_index|.
+  //
+  // current_day_index: The day index of the current day in the test's frame
+  // of reference.
+  bool CheckAggregateStore(const LoggedActivity& logged_activity,
+                           uint32_t current_day_index) {
+    // Check that the LocalAggregateStore contains no more aggregates than
+    // |logged_activity| and |day_last_garbage_collected_| should imply.
+    for (const auto& report_pair : local_aggregate_store_->aggregates()) {
+      const auto& report_key = report_pair.first;
+      const auto& aggregates = report_pair.second;
+      // Check whether this ReportAggregationKey is in |logged_activity|. If
+      // not, expect that its by_event_code map is empty.
+      if (logged_activity.count(report_key) == 0u) {
+        EXPECT_TRUE(aggregates.by_event_code().empty());
+        if (!aggregates.by_event_code().empty()) {
+          return false;
+        }
+        break;
+      }
+      auto expected_events = logged_activity.at(report_key);
+      for (const auto& event_pair : aggregates.by_event_code()) {
+        // Check that this event code is in |logged_activity| under this
+        // ReportAggregationKey.
+        auto event_code = event_pair.first;
+        EXPECT_GT(expected_events.count(event_code), 0u);
+        if (expected_events.count(event_code) == 0u) {
+          return false;
+        }
+        const auto& expected_days = expected_events[event_code];
+        for (const auto& day_pair : event_pair.second.by_day_index()) {
+          // Check that this day index is in |logged_activity| under this
+          // ReportAggregationKey and event code.
+          const auto& day_index = day_pair.first;
+          EXPECT_GT(expected_days.count(day_index), 0u);
+          if (expected_days.count(day_index) == 0u) {
+            return false;
+          }
+          // Check that the day index is no earlier than is implied by the
+          // dates of store creation and garbage collection.
+          EXPECT_GE(day_index,
+                    EarliestAllowedDayIndex(aggregates.aggregation_config()));
+          if (day_index <
+              EarliestAllowedDayIndex(aggregates.aggregation_config())) {
+            return false;
+          }
+        }
+      }
+    }
+
+    // Check that the LocalAggregateStore contains aggregates for all events in
+    // |logged_activity|, as long as they are recent enough to have survived any
+    // garbage collection.
+    for (const auto& logged_pair : logged_activity) {
+      const auto& logged_key = logged_pair.first;
+      const auto& logged_event_map = logged_pair.second;
+      // Check that this ReportAggregationKey is in the LocalAggregateStore.
+      EXPECT_GT(local_aggregate_store_->aggregates().count(logged_key), 0u);
+      if (local_aggregate_store_->aggregates().count(logged_key) == 0u) {
+        return false;
+      }
+      for (const auto& logged_event_pair : logged_event_map) {
+        const auto& logged_event_code = logged_event_pair.first;
+        const auto& logged_days = logged_event_pair.second;
+        auto earliest_allowed =
+            EarliestAllowedDayIndex(local_aggregate_store_->aggregates()
+                                        .at(logged_key)
+                                        .aggregation_config());
+        // Check whether this event code is in the LocalAggregateStore
+        // under this ReportAggregationKey. If not, check that all day indices
+        // for this event code are smaller than the day index of the earliest
+        // allowed aggregate.
+        if (local_aggregate_store_->aggregates()
+                .at(logged_key)
+                .by_event_code()
+                .count(logged_event_code) == 0u) {
+          for (auto day_index : logged_days) {
+            EXPECT_LT(day_index, earliest_allowed);
+            if (day_index >= earliest_allowed) {
+              return false;
+            }
+          }
+          break;
+        }
+        // Check that all of the day indices in |logged_activity| under this
+        // ReportAggregationKey and event code are in the
+        // LocalAggregateStore, as long as they are recent enough to have
+        // survived any garbage collection. Check that each aggregate has its
+        // activity field set to true.
+        for (const auto& logged_day_index : logged_days) {
+          if (logged_day_index >= earliest_allowed) {
+            EXPECT_GT(local_aggregate_store_->aggregates()
+                          .at(logged_key)
+                          .by_event_code()
+                          .at(logged_event_code)
+                          .by_day_index()
+                          .count(logged_day_index),
+                      0u);
+            if (local_aggregate_store_->aggregates()
+                    .at(logged_key)
+                    .by_event_code()
+                    .at(logged_event_code)
+                    .by_day_index()
+                    .count(logged_day_index) == 0u) {
+              return false;
+            }
+            EXPECT_TRUE(local_aggregate_store_->aggregates()
+                            .at(logged_key)
+                            .by_event_code()
+                            .at(logged_event_code)
+                            .by_day_index()
+                            .at(logged_day_index)
+                            .activity_daily_aggregate()
+                            .activity_indicator());
+            if (!local_aggregate_store_->aggregates()
+                     .at(logged_key)
+                     .by_event_code()
+                     .at(logged_event_code)
+                     .by_day_index()
+                     .at(logged_day_index)
+                     .activity_daily_aggregate()
+                     .activity_indicator()) {
+              return false;
+            }
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  // Given the AggregationConfig of a locally aggregated report, returns the
+  // earliest (smallest) day index for which an aggregate may exist in the
+  // LocalAggregateStore for that report, accounting for garbage
+  // collection.
+  uint32_t EarliestAllowedDayIndex(const AggregationConfig& config) {
+    // If the LocalAggregateStore has never been garbage-collected, then the
+    // earliest allowed day index is just the day when the store was created;
+    // i.e., |kStartDayIndex|.
+    if (day_last_garbage_collected_ == 0u) {
+      return kStartDayIndex;
+    } else {
+      // Otherwise, it is the later of:
+      // (a) The day index on which the store was created.
+      // (b) The day index for which the store was last garbage-collected,
+      // minus the largest window size in the report associated to |config|,
+      // plus 1.
+      uint32_t max_window_size = 1;
+      for (uint32_t window_size : config.report().window_size()) {
+        if (window_size > max_window_size) {
+          max_window_size = window_size;
+        }
+      }
+      if (day_last_garbage_collected_ < (max_window_size + 1)) {
+        return kStartDayIndex;
+      }
+      return (kStartDayIndex <
+              (day_last_garbage_collected_ - max_window_size + 1))
+                 ? (day_last_garbage_collected_ - max_window_size + 1)
+                 : kStartDayIndex;
+    }
+  }
+
+  std::unique_ptr<LocalAggregateStore> local_aggregate_store_;
+  std::unique_ptr<EventAggregator> event_aggregator_;
+
+  std::unique_ptr<TestUpdateRecipient> update_recipient_;
+  std::unique_ptr<FakeObservationStore> observation_store_;
+
+  // The day index on which the LocalAggregateStore was last garbage-collected.
+  // A value of 0 indicates that the store has never been garbage-collected.
+  uint32_t day_last_garbage_collected_ = 0u;
+
+  std::unique_ptr<ObservationWriter> observation_writer_;
+  std::unique_ptr<Encoder> encoder_;
+  std::unique_ptr<EncryptedMessageMaker> observation_encrypter_;
+
+ private:
+  std::unique_ptr<SystemDataInterface> system_data_;
+};
+
+// Creates an EventAggregator and provides it with MetricDefinitions from a
+// serialized representation |metric_string|.
+class EventAggregatorTestWithProjectContext : public EventAggregatorTest {
+ protected:
+  explicit EventAggregatorTestWithProjectContext(const char metric_string[]) {
+    project_context_ = MakeProjectContext(metric_string);
+  }
+
+  void SetUp() {
+    EventAggregatorTest::SetUp();
+    event_aggregator_->UpdateAggregationConfigs(*project_context_);
+  }
+
+  // Logs a UniqueActivesEvent for the MetricReportId of a locally aggregated
+  // report in |metric_string|. Overrides the method
+  // EventAggregatorTest::LogUniqueActivesEvent.
+  Status LogUniqueActivesEvent(const MetricReportId& metric_report_id,
+                               uint32_t day_index, uint32_t event_code,
+                               LoggedActivity* logged_activity = nullptr) {
+    return EventAggregatorTest::LogUniqueActivesEvent(
+        *project_context_, metric_report_id, day_index, event_code,
+        logged_activity);
+  }
+
+ private:
+  // A ProjectContext wrapping the MetricDefinitions passed to the constructor
+  // in |metric_string|.
+  std::unique_ptr<ProjectContext> project_context_;
+};
+
+// Creates an EventAggregator and provides it with
+// |kUniqueActivesMetricDefinitions|.
+class UniqueActivesEventAggregatorTest
+    : public EventAggregatorTestWithProjectContext {
+ protected:
+  UniqueActivesEventAggregatorTest()
+      : EventAggregatorTestWithProjectContext(kUniqueActivesMetricDefinitions) {
+  }
+
+  void SetUp() { EventAggregatorTestWithProjectContext::SetUp(); }
+};
+
+// Creates an EventAggregator as in EventAggregatorTest and provides it with
+// |kNoiseFreeUniqueActivesMetricDefinitions|.
+class NoiseFreeUniqueActivesEventAggregatorTest
+    : public EventAggregatorTestWithProjectContext {
+ protected:
+  NoiseFreeUniqueActivesEventAggregatorTest()
+      : EventAggregatorTestWithProjectContext(
+            kNoiseFreeUniqueActivesMetricDefinitions) {}
+
+  void SetUp() { EventAggregatorTestWithProjectContext::SetUp(); }
+};
+
+// Tests that an empty LocalAggregateStore is updated with ReportAggregationKeys
+// and AggregationConfigs as expected when
+// EventAggregator::UpdateAggregationConfigs is called.
+TEST_F(EventAggregatorTest, UpdateAggregationConfigs) {
+  // Check that the LocalAggregateStore is empty.
+  EXPECT_EQ(0u, local_aggregate_store_->aggregates().size());
+  // Provide |kUniqueActivesMetricDefinitions| to the EventAggregator.
+  auto unique_actives_project_context =
+      MakeProjectContext(kUniqueActivesMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
+                     *unique_actives_project_context));
+  // Check that the number of key-value pairs in the LocalAggregateStore is
+  // now equal to the number of locally aggregated reports in
+  // |kUniqueActivesMetricDefinitions|.
+  EXPECT_EQ(kUniqueActivesExpectedParams.metric_report_ids.size(),
+            local_aggregate_store_->aggregates().size());
+  // Check that the LocalAggregateStore contains the expected
+  // ReportAggregationKey and AggregationConfig for each locally aggregated
+  // report in |kUniqueActivesMetricDefinitions|,
+  for (const auto& metric_report_id :
+       kUniqueActivesExpectedParams.metric_report_ids) {
+    std::string key;
+    SerializeToBase64(
+        MakeAggregationKey(*unique_actives_project_context, metric_report_id),
+        &key);
+    auto config = MakeAggregationConfig(*unique_actives_project_context,
+                                        metric_report_id);
+    EXPECT_NE(local_aggregate_store_->aggregates().end(),
+              local_aggregate_store_->aggregates().find(key));
+    EXPECT_TRUE(MessageDifferencer::Equals(
+        config,
+        local_aggregate_store_->aggregates().at(key).aggregation_config()));
+  }
+}
+
+// Tests two assumptions about the behavior of
+// EventAggregator::UpdateAggregationConfigs when two projects with the same
+// customer ID and project ID provide configurations to the EventAggregator.
+// These assumptions are:
+// (1) If the second project provides a report with a
+// ReportAggregationKey which was not provided by the first project, then the
+// EventAggregator accepts the new report.
+// (2) If a report provided by the second project has a ReportAggregationKey
+// which was already provided by the first project, then the EventAggregator
+// rejects the new report, even if its ReportDefinition differs from that of
+// existing report with the same ReportAggregationKey.
+TEST_F(EventAggregatorTest, UpdateAggregationConfigsWithSameKey) {
+  // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
+  auto unique_actives_project_context =
+      MakeProjectContext(kUniqueActivesMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
+                     *unique_actives_project_context));
+  // Check that the number of key-value pairs in the LocalAggregateStore is
+  // now equal to the number of locally aggregated reports in
+  // |kUniqueActivesMetricDefinitions|.
+  EXPECT_EQ(3u, local_aggregate_store_->aggregates().size());
+
+  // Provide the EventAggregator with
+  // |kNoiseFreeUniqueActivesMetricDefinitions|.
+  auto noise_free_unique_actives_project_context =
+      MakeProjectContext(kNoiseFreeUniqueActivesMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
+                     *noise_free_unique_actives_project_context));
+  // Check that the number of key-value pairs in the LocalAggregateStore is
+  // now equal to the number of distinct MetricReportIds of locally aggregated
+  // reports in |kUniqueActivesMetricDefinitions| and
+  // |kNoiseFreeUniqueActivesMetricDefinitions|.
+  EXPECT_EQ(4u, local_aggregate_store_->aggregates().size());
+  // The MetricReportId |kFeaturesActiveMetricReportId| appears in both
+  // |kUniqueActivesMetricDefinitions| and
+  // |kNoiseFreeUniqueActivesMetricDefinitions|. The associated
+  // ReportAggregationKeys are identical, but the AggregationConfigs are
+  // different.
+  //
+  // Check that the AggregationConfig stored in the LocalAggregateStore under
+  // the key associated to |kFeaturesActiveMetricReportId| is the first
+  // AggregationConfig that was provided for that key; i.e., is derived from
+  // |kUniqueActivesMetricDefinitions|.
+  std::string key;
+  EXPECT_TRUE(
+      SerializeToBase64(MakeAggregationKey(*unique_actives_project_context,
+                                           kFeaturesActiveMetricReportId),
+                        &key));
+  auto unique_actives_config = MakeAggregationConfig(
+      *unique_actives_project_context, kFeaturesActiveMetricReportId);
+  EXPECT_NE(local_aggregate_store_->aggregates().end(),
+            local_aggregate_store_->aggregates().find(key));
+  EXPECT_TRUE(MessageDifferencer::Equals(
+      unique_actives_config,
+      local_aggregate_store_->aggregates().at(key).aggregation_config()));
+  auto noise_free_config =
+      MakeAggregationConfig(*noise_free_unique_actives_project_context,
+                            kFeaturesActiveMetricReportId);
+  EXPECT_FALSE(MessageDifferencer::Equals(
+      noise_free_config,
+      local_aggregate_store_->aggregates().at(key).aggregation_config()));
+}
+
+// Tests that EventAggregator::LogUniqueActivesEvent returns
+// |kInvalidArguments| when passed a report ID which is not associated to a
+// key of the LocalAggregateStore, or when passed an EventRecord containing an
+// Event proto message which is not of type OccurrenceEvent.
+TEST_F(EventAggregatorTest, LogBadEvents) {
+  // Provide the EventAggregator with |kUniqueActivesMetricDefinitions|.
+  auto unique_actives_project_context =
+      MakeProjectContext(kUniqueActivesMetricDefinitions);
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(
+                     *unique_actives_project_context));
+  // Attempt to log a UniqueActivesEvent for |kEventsOccurredMetricReportId|,
+  // which is not in |kUniqueActivesMetricDefinitions|. Check that the result
+  // is |kInvalidArguments|.
+  auto noise_free_project_context =
+      MakeProjectContext(kNoiseFreeUniqueActivesMetricDefinitions);
+  EventRecord bad_event_record;
+  bad_event_record.metric = noise_free_project_context->GetMetric(
+      kEventsOccurredMetricReportId.first);
+  bad_event_record.event->set_day_index(kStartDayIndex);
+  bad_event_record.event->mutable_occurrence_event()->set_event_code(0u);
+  EXPECT_EQ(kInvalidArguments,
+            event_aggregator_->LogUniqueActivesEvent(
+                kEventsOccurredMetricReportId.second, &bad_event_record));
+  // Attempt to call LogUniqueActivesEvent() with a valid metric and report
+  // ID, but with an EventRecord wrapping an Event which is not an
+  // OccurrenceEvent. Check that the result is |kInvalidArguments|.
+  bad_event_record.metric = unique_actives_project_context->GetMetric(
+      kFeaturesActiveMetricReportId.first);
+  bad_event_record.event->mutable_count_event();
+  EXPECT_EQ(kInvalidArguments,
+            event_aggregator_->LogUniqueActivesEvent(
+                kFeaturesActiveMetricReportId.second, &bad_event_record));
+}
+
+// Tests that the LocalAggregateStore is updated as expected when
+// EventAggregator::LogUniqueActivesEvent() is called with valid arguments;
+// i.e., with a report ID associated to an existing key of the
+// LocalAggregateStore, and with an EventRecord which wraps an OccurrenceEvent.
+//
+// Logs some valid events each day for 35 days, checking the contents of the
+// LocalAggregateStore each day.
+TEST_F(UniqueActivesEventAggregatorTest, LogUniqueActivesEvents) {
+  LoggedActivity logged_activity;
+  uint32_t num_days = 35;
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    // Log an event for the FeaturesActive_UniqueDevices report of
+    // |kUniqueActivesMetricDefinitions| with event code 0. Check the contents
+    // of the LocalAggregateStore.
+    EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                         kStartDayIndex + offset, 0u,
+                                         &logged_activity));
+    EXPECT_TRUE(CheckAggregateStore(logged_activity, kStartDayIndex));
+    // Log another event for the same report, event code, and day index.
+    // Check the contents of the LocalAggregateStore.
+    EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                         kStartDayIndex + offset, 0u,
+                                         &logged_activity));
+    EXPECT_TRUE(CheckAggregateStore(logged_activity, kStartDayIndex));
+    // Log several more events for various valid reports and event codes. Check
+    // the contents of the LocalAggregateStore.
+    EXPECT_EQ(kOK, LogUniqueActivesEvent(kDeviceBootsMetricReportId,
+                                         kStartDayIndex + offset, 0u,
+                                         &logged_activity));
+    EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                         kStartDayIndex + offset, 4u,
+                                         &logged_activity));
+    EXPECT_EQ(kOK, LogUniqueActivesEvent(kErrorsOccurredMetricReportId,
+                                         kStartDayIndex + offset, 1u,
+                                         &logged_activity));
+    EXPECT_TRUE(CheckAggregateStore(logged_activity, kStartDayIndex + offset));
+  }
+}
+
+// Tests the method EventAggregator::GarbageCollect().
+//
+// For each value of N in the range [0, 34], logs some UniqueActivesEvents each
+// day for N consecutive days and then garbage-collect the LocalAggregateStore.
+// After garbage collection, verifies the contents of the LocalAggregateStore.
+TEST_F(UniqueActivesEventAggregatorTest, GarbageCollect) {
+  uint32_t max_days_before_gc = 35;
+  for (uint32_t days_before_gc = 0; days_before_gc < max_days_before_gc;
+       days_before_gc++) {
+    SetUp();
+    day_last_garbage_collected_ = 0u;
+    LoggedActivity logged_activity;
+    uint32_t end_day_index = kStartDayIndex + days_before_gc;
+    for (uint32_t day_index = kStartDayIndex; day_index < end_day_index;
+         day_index++) {
+      for (const auto& metric_report_id :
+           kUniqueActivesExpectedParams.metric_report_ids) {
+        // Log 2 events with event code 0.
+        EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 0u,
+                                             &logged_activity));
+        EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 0u,
+                                             &logged_activity));
+        // Log 1 event with event code 1.
+        EXPECT_EQ(kOK, LogUniqueActivesEvent(metric_report_id, day_index, 1u,
+                                             &logged_activity));
+      }
+    }
+    EXPECT_EQ(kOK, event_aggregator_->GarbageCollect(end_day_index));
+    day_last_garbage_collected_ = end_day_index;
+    EXPECT_TRUE(CheckAggregateStore(logged_activity, end_day_index));
+    TearDown();
+  }
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive status
+// and that the expected number of Observations is generated when no Events have
+// been logged to the EventAggregator.
+TEST_F(UniqueActivesEventAggregatorTest, GenerateObservationsNoEvents) {
+  EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(kStartDayIndex));
+
+  std::vector<Observation2> observations(0);
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, kUniqueActivesExpectedParams, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that EventAggregator::GenerateObservations() returns a positive status
+// and that the expected number of Observations is generated after some
+// UniqueActivesEvents have been logged, without any garbage collection.
+//
+// For 35 days, logs 2 events each day for the ErrorsOccurred_UniqueDevices
+// report and 2 events for the FeaturesActive_UniqueDevices report, all
+// with event type index 0.
+//
+// Each day following the first day, calls GenerateObservations() with the day
+// index of the previous day. Checks that a positive status is returned and
+// that the FakeObservationStore has received the expected number of new
+// observations for each locally aggregated report ID in
+// |kUniqueActivesMetricDefinitions|.
+TEST_F(UniqueActivesEventAggregatorTest, GenerateObservations) {
+  int num_days = 35;
+  std::vector<Observation2> observations(0);
+  for (uint32_t day_index = kStartDayIndex;
+       day_index < kStartDayIndex + num_days; day_index++) {
+    if (day_index > kStartDayIndex) {
+      observations.clear();
+      ResetObservationStore();
+      EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(day_index - 1));
+      EXPECT_TRUE(FetchAggregatedObservations(
+          &observations, kUniqueActivesExpectedParams, observation_store_.get(),
+          update_recipient_.get()));
+    }
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogUniqueActivesEvent(kErrorsOccurredMetricReportId,
+                                           day_index, 0u));
+      EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                           day_index, 0u));
+    }
+  }
+  observations.clear();
+  ResetObservationStore();
+  EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(kStartDayIndex +
+                                                         num_days - 1));
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, kUniqueActivesExpectedParams, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that GenerateObservations() returns a positive status and that the
+// expected number of Observations is generated after some UniqueActivesEvents
+// have been logged, in the presence of daily garbage collection.
+//
+// For 35 days, logs 2 events each day for the ErrorsOccurred_UniqueDevices
+// report and 2 events for the FeaturesActive_UniqueDevices report, all
+// with event type index 0.
+//
+// Each day following the first day, calls GenerateObservations() and then
+// GarbageCollect() with the day index of the current day. Checks that positive
+// statuses are returned and that the FakeObservationStore has received the
+// expected number of new observations for each locally aggregated report ID in
+// |kUniqueActivesMetricDefinitions|.
+TEST_F(UniqueActivesEventAggregatorTest,
+       GenerateObservationWithGarbageCollection) {
+  int num_days = 35;
+  std::vector<Observation2> observations(0);
+  for (uint32_t day_index = kStartDayIndex;
+       day_index < kStartDayIndex + num_days; day_index++) {
+    if (day_index > kStartDayIndex) {
+      observations.clear();
+      ResetObservationStore();
+      EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(day_index - 1));
+      EXPECT_TRUE(FetchAggregatedObservations(
+          &observations, kUniqueActivesExpectedParams, observation_store_.get(),
+          update_recipient_.get()));
+      EXPECT_EQ(kOK, event_aggregator_->GarbageCollect(day_index));
+    }
+    for (int i = 0; i < 2; i++) {
+      EXPECT_EQ(kOK, LogUniqueActivesEvent(kErrorsOccurredMetricReportId,
+                                           day_index, 0u));
+      EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                           day_index, 0u));
+    }
+  }
+  observations.clear();
+  ResetObservationStore();
+  EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(kStartDayIndex +
+                                                         num_days - 1));
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, kUniqueActivesExpectedParams, observation_store_.get(),
+      update_recipient_.get()));
+  EXPECT_EQ(kOK, event_aggregator_->GarbageCollect(kStartDayIndex + num_days));
+}
+
+// Checks that UniqueActivesObservations with the expected values (i.e.,
+// non-active for all window sizes and event types) are generated when no Events
+// have been logged to the EventAggregator.
+TEST_F(NoiseFreeUniqueActivesEventAggregatorTest,
+       CheckObservationValuesNoEvents) {
+  EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(kStartDayIndex));
+
+  auto expected_activity =
+      MakeNullExpectedActivity(kNoiseFreeUniqueActivesExpectedParams);
+
+  EXPECT_TRUE(CheckUniqueActivesObservations(
+      expected_activity, kNoiseFreeUniqueActivesExpectedParams,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+// Checks that UniqueActivesObservations with the expected values are generated
+// when some events have been logged for a UNIQUE_N_DAY_ACTIVES report for over
+// multiple days, without garbage collection.
+//
+// Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
+// metric has max_event_type_index = 4) for 10 days, according to the following
+// pattern:
+//
+// * Never log event type 0.
+// * On the i-th day (0-indexed) of logging, log an event for event type k,
+// 1 <= k < 5, if 3*k divides i.
+//
+// Each day following the first day, generates Observations for the previous
+// day index and check them against the expected set of Observations. Also
+// generates and check Observations for the last day of logging.
+//
+// The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
+// the expected pattern of those Observations' values on the i-th day is:
+//
+// (i, window size)            true for event types
+// ------------------------------------------------------
+// (0, 1)                           1, 2, 3, 4
+// (0, 7)                           1, 2, 3, 4
+// (1, 1)                          ---
+// (1, 7)                           1, 2, 3, 4
+// (2, 1)                          ---
+// (2, 7)                           1, 2, 3, 4
+// (3, 1)                           1
+// (3, 7)                           1, 2, 3, 4
+// (4, 1)                          ---
+// (4, 7)                           1, 2, 3, 4
+// (5, 1)                          ---
+// (5, 7)                           1, 2, 3, 4
+// (6, 1)                           1, 2
+// (6, 7)                           1, 2, 3, 4
+// (7, 1)                          ---
+// (7, 7)                           1, 2
+// (8, 1)                          ---
+// (8, 7)                           1, 2
+// (9, 1)                           1, 3
+// (9, 7)                           1, 2, 3
+// All Observations for all other locally aggregated reports should be
+// observations of non-occurrence.
+TEST_F(NoiseFreeUniqueActivesEventAggregatorTest,
+       CheckObservationValuesSingleDay) {
+  // Log several events.
+  EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                       kStartDayIndex, 0u));
+  EXPECT_EQ(kOK, LogUniqueActivesEvent(kFeaturesActiveMetricReportId,
+                                       kStartDayIndex, 0u));
+  EXPECT_EQ(kOK, LogUniqueActivesEvent(kEventsOccurredMetricReportId,
+                                       kStartDayIndex, 1u));
+  // Generate locally aggregated Observations.
+  EXPECT_EQ(kOK, event_aggregator_->GenerateObservations(kStartDayIndex));
+
+  // Form the expected activity map.
+  auto expected_activity =
+      MakeNullExpectedActivity(kNoiseFreeUniqueActivesExpectedParams);
+  expected_activity[kFeaturesActiveMetricReportId] = {
+      {1, {true, false, false, false, false}},
+      {7, {true, false, false, false, false}},
+      {30, {true, false, false, false, false}}};
+  expected_activity[kEventsOccurredMetricReportId] = {
+      {1, {false, true, false, false, false}},
+      {7, {false, true, false, false, false}}};
+
+  // Check the contents of the FakeObservationStore.
+  EXPECT_TRUE(CheckUniqueActivesObservations(
+      expected_activity, kNoiseFreeUniqueActivesExpectedParams,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+TEST_F(NoiseFreeUniqueActivesEventAggregatorTest,
+       CheckObservationValuesMultiDay) {
+  // Generate expected activity maps for the 10 days of logging.
+  uint32_t num_days = 10;
+  std::vector<ExpectedActivity> expected_activity(num_days);
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    expected_activity[offset] =
+        MakeNullExpectedActivity(kNoiseFreeUniqueActivesExpectedParams);
+  }
+  expected_activity[0][kEventsOccurredMetricReportId] = {
+      {1, {false, true, true, true, true}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[1][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[2][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[3][kEventsOccurredMetricReportId] = {
+      {1, {false, true, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[4][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[5][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[6][kEventsOccurredMetricReportId] = {
+      {1, {false, true, true, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[7][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[8][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[9][kEventsOccurredMetricReportId] = {
+      {1, {false, true, false, true, false}},
+      {7, {false, true, true, true, false}}};
+
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    for (uint32_t event_code = 1;
+         event_code < kNoiseFreeUniqueActivesExpectedParams.num_event_codes.at(
+                          kEventsOccurredMetricReportId);
+         event_code++) {
+      if (offset % (3 * event_code) == 0) {
+        EXPECT_EQ(kOK,
+                  LogUniqueActivesEvent(kEventsOccurredMetricReportId,
+                                        kStartDayIndex + offset, event_code));
+      }
+    }
+    // Clear the FakeObservationStore.
+    ResetObservationStore();
+    // Generate locally aggregated Observations.
+    EXPECT_EQ(kOK,
+              event_aggregator_->GenerateObservations(kStartDayIndex + offset));
+    // Check the generated Observations against the expectation.
+    EXPECT_TRUE(CheckUniqueActivesObservations(
+        expected_activity[offset], kNoiseFreeUniqueActivesExpectedParams,
+        observation_store_.get(), update_recipient_.get()));
+  }
+}
+
+// Checks that UniqueActivesObservations with the expected values are generated
+// when some events have been logged for a UNIQUE_N_DAY_ACTIVES report for over
+// multiple days, with daily garbage collection.
+//
+// Logs events for the SomeEventsOccurred_UniqueDevices report (whose parent
+// metric has max_event_type_index = 4) for 10 days, according to the following
+// pattern:
+//
+// * Never log event type 0.
+// * On the i-th day (0-indexed) of logging, log an event for event type k,
+// 1 <= k < 5, if 3*k divides i.
+//
+// Each day following the first day, generates Observations for the previous
+// day index and check them against the expected set of Observations. Also
+// generates and check Observations for the last day of logging.
+//
+// The SomeEventsOccurred_UniqueDevices report has window sizes 1 and 7, and
+// the expected pattern of those Observations' values on the i-th day is:
+//
+// (i, window size)            true for event types
+// ------------------------------------------------------
+// (0, 1)                           1, 2, 3, 4
+// (0, 7)                           1, 2, 3, 4
+// (1, 1)                          ---
+// (1, 7)                           1, 2, 3, 4
+// (2, 1)                          ---
+// (2, 7)                           1, 2, 3, 4
+// (3, 1)                           1
+// (3, 7)                           1, 2, 3, 4
+// (4, 1)                          ---
+// (4, 7)                           1, 2, 3, 4
+// (5, 1)                          ---
+// (5, 7)                           1, 2, 3, 4
+// (6, 1)                           1, 2
+// (6, 7)                           1, 2, 3, 4
+// (7, 1)                          ---
+// (7, 7)                           1, 2
+// (8, 1)                          ---
+// (8, 7)                           1, 2
+// (9, 1)                           1, 3
+// (9, 7)                           1, 2, 3
+// All Observations for all other locally aggregated reports should be
+// observations of non-occurrence.
+TEST_F(NoiseFreeUniqueActivesEventAggregatorTest,
+       CheckObservationValuesMultiDayWithGarbageCollection) {
+  // Generate expected activity maps for the 10 days of logging.
+  uint32_t num_days = 10;
+  std::vector<ExpectedActivity> expected_activity(num_days);
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    expected_activity[offset] =
+        MakeNullExpectedActivity(kNoiseFreeUniqueActivesExpectedParams);
+  }
+  expected_activity[0][kEventsOccurredMetricReportId] = {
+      {1, {false, true, true, true, true}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[1][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[2][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[3][kEventsOccurredMetricReportId] = {
+      {1, {false, true, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[4][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[5][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[6][kEventsOccurredMetricReportId] = {
+      {1, {false, true, true, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[7][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[8][kEventsOccurredMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[9][kEventsOccurredMetricReportId] = {
+      {1, {false, true, false, true, false}},
+      {7, {false, true, true, true, false}}};
+
+  for (uint32_t offset = 0; offset < num_days; offset++) {
+    for (uint32_t event_code = 1;
+         event_code < kNoiseFreeUniqueActivesExpectedParams.num_event_codes.at(
+                          kEventsOccurredMetricReportId);
+         event_code++) {
+      if (offset % (3 * event_code) == 0) {
+        EXPECT_EQ(kOK,
+                  LogUniqueActivesEvent(kEventsOccurredMetricReportId,
+                                        kStartDayIndex + offset, event_code));
+      }
+    }
+    // Clear the FakeObservationStore.
+    ResetObservationStore();
+    // Generate locally aggregated Observations.
+    EXPECT_EQ(kOK,
+              event_aggregator_->GenerateObservations(kStartDayIndex + offset));
+    // Check the generated Observations against the expectation.
+    EXPECT_TRUE(CheckUniqueActivesObservations(
+        expected_activity[offset], kNoiseFreeUniqueActivesExpectedParams,
+        observation_store_.get(), update_recipient_.get()));
+    // Garbage collect for the next day index.
+    EXPECT_EQ(kOK,
+              event_aggregator_->GarbageCollect(kStartDayIndex + offset + 1));
+  }
+}
+
+}  // namespace logger
+}  // namespace cobalt
diff --git a/logger/event_record.h b/logger/event_record.h
new file mode 100644
index 0000000..52fb5a6
--- /dev/null
+++ b/logger/event_record.h
@@ -0,0 +1,26 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_LOGGER_EVENT_RECORD_H_
+#define COBALT_LOGGER_EVENT_RECORD_H_
+
+#include <memory>
+
+#include "./event.pb.h"
+#include "config/metric_definition.pb.h"
+
+namespace cobalt {
+namespace logger {
+
+// A container for an Event proto message and the MetricDefinition for which
+// that Event should be logged.
+struct EventRecord {
+  const MetricDefinition* metric;
+  std::unique_ptr<Event> event = std::make_unique<Event>();
+};
+
+}  // namespace logger
+}  // namespace cobalt
+
+#endif  // COBALT_LOGGER_EVENT_RECORD_H_
diff --git a/logger/local_aggregation.proto b/logger/local_aggregation.proto
new file mode 100644
index 0000000..16cfd04
--- /dev/null
+++ b/logger/local_aggregation.proto
@@ -0,0 +1,74 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+syntax = "proto3";
+
+package cobalt;
+
+import "config/metric_definition.proto";
+import "config/report_definition.proto";
+import "config/project.proto";
+
+// An identifier of a locally aggregated report.
+message ReportAggregationKey {
+  uint32 customer_id = 1;
+  uint32 project_id = 2;
+  uint32 metric_id = 3;
+  uint32 report_id = 4;
+}
+
+// A container used by the EventAggregator to store local aggregates of
+// logged events.
+message LocalAggregateStore {
+  // Keyed by base64-encoded serializations of ReportAggregationKey messages.
+  map<string, ReportAggregates> aggregates = 1;
+}
+
+message ReportAggregates {
+  // Keyed by event code.
+  map<uint32, DailyAggregates> by_event_code = 1;
+  // The configuration for the report represented by the ReportAggregationKey
+  // of this ReportAggregates.
+  AggregationConfig aggregation_config = 2;
+}
+
+// A representation of the configuration of a locally aggregated report and
+// of its parent metric.
+message AggregationConfig {
+  // A Project message.
+  Project project = 1;
+  // A MetricDefinition.
+  //
+  // TODO(pesk): When implementing handling of config changes, replace this
+  // field with a MetricDefinition with all ReportDefinitions removed, as well
+  // as all fields which should not affect the EventAggregator's handling
+  // of the MetricDefinition.
+  MetricDefinition metric = 2;
+  // A ReportDefinition message.
+  //
+  // TODO(pesk): When implementing handling of config changes, replace this
+  // field with a ReportDefinition with all WindowSizes removed, as well as
+  // all fields which should not affect the EventAggregator's handling of the
+  // ReportDefinition.
+  ReportDefinition report = 3;
+}
+
+message DailyAggregates {
+  // Keyed by day index.
+  map<uint32, DailyAggregate> by_day_index = 1;
+}
+
+// A value formed by aggregating the events logged for a single report, event
+// code, and day index.
+message DailyAggregate {
+  oneof type {
+    ActivityDailyAggregate activity_daily_aggregate = 1;
+  }
+}
+
+// A representation of the occurrence or non-occurrence of an event code on
+// a given day.
+message ActivityDailyAggregate {
+  bool activity_indicator = 1;
+}
\ No newline at end of file
diff --git a/logger/logger.cc b/logger/logger.cc
index d134b80..b7ba968 100644
--- a/logger/logger.cc
+++ b/logger/logger.cc
@@ -15,8 +15,7 @@
 #include "config/id.h"
 #include "config/metric_definition.pb.h"
 #include "config/report_definition.pb.h"
-#include "logger/project_context.h"
-#include "util/clock.h"
+#include "logger/event_record.h"
 #include "util/datetime_util.h"
 
 namespace cobalt {
@@ -28,15 +27,6 @@
 using ::cobalt::util::TimeToDayIndex;
 using ::google::protobuf::RepeatedPtrField;
 
-namespace {
-
-struct EventRecord {
-  const MetricDefinition* metric;
-  std::unique_ptr<Event> event = std::make_unique<Event>();
-};
-
-}  // namespace
-
 // EventLogger is an abstract interface used internally in logger.cc to
 // dispatch logging logic based on Metric type. Below we create subclasses
 // of EventLogger for each of several Metric types.
@@ -55,6 +45,9 @@
 
  protected:
   const Encoder* encoder() { return logger_->encoder_; }
+  const EventAggregator* event_aggregator() {
+    return logger_->event_aggregator_;
+  }
   const ProjectContext* project_context() { return logger_->project_context_; }
   Encoder::Result BadReportType(const MetricDefinition& metric,
                                 const ReportDefinition& report);
@@ -70,8 +63,8 @@
   // Given an EventRecord and a ReportDefinition, determines whether or not
   // the Event should be used to update a local aggregation and if so passes
   // the Event to the Local Aggregator.
-  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                     EventRecord* event_record);
+  virtual Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
+                                             EventRecord* event_record);
 
   // Given an EventRecord and a ReportDefinition, determines whether or not
   // the Event should be used to generate an immediate Observation and if so
@@ -112,6 +105,8 @@
   Encoder::Result MaybeEncodeImmediateObservation(
       const ReportDefinition& report, bool may_invalidate,
       EventRecord* event_record) override;
+  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
+                                     EventRecord* event_record) override;
 };
 
 // Implementation of EventLogger for metrics of type EVENT_COUNT.
@@ -127,8 +122,8 @@
 };
 
 // Implementation of EventLogger for all of the numerical performance metric
-// types. This is an abstract class. There are subclasses below for each metric
-// type.
+// types. This is an abstract class. There are subclasses below for each
+// metric type.
 class IntegerPerformanceEventLogger : public EventLogger {
  protected:
   explicit IntegerPerformanceEventLogger(Logger* logger)
@@ -223,9 +218,32 @@
 
 //////////////////// Logger method implementations ////////////////////////
 
+Logger::Logger(const Encoder* encoder, EventAggregator* event_aggregator,
+               ObservationWriter* observation_writer,
+               const ProjectContext* project, LoggerInterface* internal_logger)
+    : encoder_(encoder),
+      event_aggregator_(event_aggregator),
+      observation_writer_(observation_writer),
+      project_context_(project),
+      clock_(new SystemClock()) {
+  CHECK(project);
+
+  if (internal_logger) {
+    internal_metrics_.reset(new InternalMetricsImpl(internal_logger));
+  } else {
+    // We were not provided with a metrics logger. We must create one.
+    internal_metrics_.reset(new NoOpInternalMetrics());
+  }
+  if (event_aggregator_->UpdateAggregationConfigs(*project_context_) != kOK) {
+    LOG(ERROR) << "Failed to provide aggregation configurations to the "
+                  "EventAggregator.";
+  }
+}
+
 Logger::Logger(const Encoder* encoder, ObservationWriter* observation_writer,
                const ProjectContext* project, LoggerInterface* internal_logger)
     : encoder_(encoder),
+      event_aggregator_(nullptr),
       observation_writer_(observation_writer),
       project_context_(project),
       clock_(new SystemClock()) {
@@ -364,13 +382,14 @@
       return status;
     }
 
-    // If we are processing the final report, then we set may_invalidate to
-    // true in order to allow data to be moved out of |event_record| instead
-    // of being copied. One example where this is useful is when creating
-    // an immediate Observation of type Histogram. In that case we can move
-    // the histogram from the Event to the Observation and avoid copying.
-    // Since the |event_record| is invalidated, any other operation on the
-    // |event_record| must be performed before this for loop.
+    // If we are processing the final report, then we set may_invalidate
+    // to true in order to allow data to be moved out of |event_record|
+    // instead of being copied. One example where this is useful is when
+    // creating an immediate Observation of type Histogram. In that case
+    // we can move the histogram from the Event to the Observation and
+    // avoid copying. Since the |event_record| is invalidated, any other
+    // operation on the |event_record| must be performed before this for
+    // loop.
     bool may_invalidate = ++report_index == num_reports;
     status =
         MaybeGenerateImmediateObservation(report, may_invalidate, event_record);
@@ -414,9 +433,6 @@
 // and returns OK.
 Status EventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& report,
                                                 EventRecord* event_record) {
-  // TODO(pesk) Implement this method in subclasses of EventLoger
-  // corresponding to Metric types for which there exist Report types for
-  // which which you want to perform local aggregation.
   return kOK;
 }
 
@@ -489,13 +505,41 @@
           occurrence_event.event_code(),
           RapporConfigHelper::BasicRapporNumCategories(metric));
     }
+      // Report type UNIQUE_N_DAY_ACTIVES is valid but should not result in
+      // generation of an immediate observation.
+    case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+      Encoder::Result result;
+      result.status = kOK;
+      result.observation = nullptr;
+      result.metadata = nullptr;
+      return result;
+    }
 
     default:
       return BadReportType(metric, report);
   }
 }
 
-/////////////// CountEventLogger method implementations ////////////////////////
+Status OccurrenceEventLogger::MaybeUpdateLocalAggregation(
+    const ReportDefinition& report, EventRecord* event_record) {
+  // If the Logger was constructed without an EventAggregator, do nothing and
+  // return kOK.
+  // TODO(pesk): remove this clause when the deprecated Logger constructor is
+  // removed.
+  if (event_aggregator() == nullptr) {
+    return kOK;
+  }
+  switch (report.report_type()) {
+    case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
+      return event_aggregator()->LogUniqueActivesEvent(report.id(),
+                                                       event_record);
+    }
+    default:
+      return kOK;
+  }
+}
+
+///////////// CountEventLogger method implementations //////////////////////////
 
 Encoder::Result CountEventLogger::MaybeEncodeImmediateObservation(
     const ReportDefinition& report, bool may_invalidate,
@@ -675,6 +719,7 @@
 }
 
 /////////////// StringUsedEventLogger method implementations ///////////////////
+
 Encoder::Result StringUsedEventLogger::MaybeEncodeImmediateObservation(
     const ReportDefinition& report, bool may_invalidate,
     EventRecord* event_record) {
@@ -702,6 +747,7 @@
 }
 
 /////////////// CustomEventLogger method implementations ///////////////////////
+
 Status CustomEventLogger::ValidateEvent(const EventRecord& event_record) {
   // TODO(ninai) Add proto validation.
   return kOK;
diff --git a/logger/logger.h b/logger/logger.h
index 16c76e9..4d2d97f 100644
--- a/logger/logger.h
+++ b/logger/logger.h
@@ -12,6 +12,7 @@
 
 #include "./observation2.pb.h"
 #include "logger/encoder.h"
+#include "logger/event_aggregator.h"
 #include "logger/internal_metrics.h"
 #include "logger/logger_interface.h"
 #include "logger/observation_writer.h"
@@ -37,6 +38,11 @@
   // valid as long as the Logger is being used. The Logger uses this to
   // encode immediate Observations.
   //
+  // |event_aggregator| The system's singleton instance of EventAggregator.
+  // This must remain valid as long as the Logger is being used. The Logger
+  // uses this to aggregate values derived from Events and to produce locally
+  // aggregated Observations.
+  //
   // |observation_writer| An instance of ObservationWriter, used by the Logger
   // to write immediate Observations to an ObservationStore. Must remain valid
   // as long as the Logger is in use.
@@ -45,8 +51,18 @@
   // Logger will log events.
   //
   // |internal_logger| An instance of LoggerInterface, used internally by the
-  // Logger to send metrics about Cobalt to Cobalt. If nullptr, no such internal
-  // logging will be performed by this Logger.
+  // Logger to send metrics about Cobalt to Cobalt. If nullptr, no such
+  // internal logging will be performed by this Logger.
+  Logger(const Encoder* encoder, EventAggregator* event_aggregator,
+         ObservationWriter* observation_writer, const ProjectContext* project,
+         LoggerInterface* internal_logger = nullptr);
+
+  // Deprecated constructor
+  //
+  // This constructor does not take an EventAggregator. Loggers created with
+  // this constructor do not log Events for locally aggregated reports. This
+  // constructor is currently used by the Cobalt test app in Garnet and will be
+  // removed once those tests are updated.
   Logger(const Encoder* encoder, ObservationWriter* observation_writer,
          const ProjectContext* project,
          LoggerInterface* internal_logger = nullptr);
@@ -80,12 +96,12 @@
 
  private:
   friend class EventLogger;
+  friend class LoggerTest;
 
-  void SetClock(std::unique_ptr<util::ClockInterface> clock) {
-    clock_ = std::move(clock);
-  }
+  void SetClock(util::ClockInterface* clock) { clock_.reset(clock); }
 
   const Encoder* encoder_;
+  EventAggregator* event_aggregator_;
   const ObservationWriter* observation_writer_;
   const ProjectContext* project_context_;
   std::unique_ptr<util::ClockInterface> clock_;
diff --git a/logger/logger_test.cc b/logger/logger_test.cc
index b00f48d..6e804a1 100644
--- a/logger/logger_test.cc
+++ b/logger/logger_test.cc
@@ -7,42 +7,59 @@
 #include <google/protobuf/text_format.h>
 #include <google/protobuf/util/message_differencer.h>
 
+#include <chrono>
 #include <memory>
 #include <string>
-#include <utility>
-#include <vector>
 
 #include "./gtest.h"
 #include "./observation.pb.h"
 #include "./observation2.pb.h"
+#include "algorithms/rappor/rappor_encoder.h"
+#include "config/encodings.pb.h"
+#include "encoder/client_secret.h"
 #include "encoder/encoder.h"
-#include "encoder/memory_observation_store.h"
-#include "encoder/observation_store.h"
-#include "encoder/send_retryer.h"
-#include "encoder/shipping_manager.h"
 #include "logger/encoder.h"
+#include "logger/event_aggregator.h"
+#include "logger/logger_test_utils.h"
 #include "logger/project_context.h"
 #include "logger/status.h"
+#include "util/clock.h"
+#include "util/datetime_util.h"
 #include "util/encrypted_message_util.h"
 
+using ::google::protobuf::RepeatedPtrField;
+using ::google::protobuf::util::MessageDifferencer;
+
 namespace cobalt {
 
 using encoder::ClientSecret;
-using encoder::LegacyShippingManager;
-using encoder::MemoryObservationStore;
-using encoder::ObservationStoreUpdateRecipient;
-using encoder::ObservationStoreWriterInterface;
-using encoder::SendRetryerInterface;
-using encoder::ShippingManager;
 using encoder::SystemDataInterface;
-using google::protobuf::RepeatedPtrField;
-using google::protobuf::util::MessageDifferencer;
+using rappor::BasicRapporEncoder;
 using util::EncryptedMessageMaker;
+using util::IncrementingClock;
 using util::MessageDecrypter;
+using util::TimeToDayIndex;
 
 namespace logger {
 
+using testing::CheckNumericEventObservations;
+using testing::CheckUniqueActivesObservations;
+using testing::ExpectedActivity;
+using testing::ExpectedAggregationParams;
+using testing::FakeObservationStore;
+using testing::FetchAggregatedObservations;
+using testing::FetchObservations;
+using testing::FetchSingleObservation;
+using testing::MakeNullExpectedActivity;
+using testing::PopulateMetricDefinitions;
+using testing::TestUpdateRecipient;
+
 namespace {
+// Number of seconds in an ideal day
+const int kDay = 86400;
+// Number of seconds in an ideal year
+const int kYear = kDay * 365;
+
 static const uint32_t kCustomerId = 1;
 static const uint32_t kProjectId = 1;
 static const char kCustomerName[] = "Fuchsia";
@@ -57,6 +74,19 @@
 const uint32_t kFileSystemWriteTimesMetricId = 6;
 const uint32_t kModuleDownloadsMetricId = 7;
 const uint32_t kModuleInstallsMetricId = 8;
+const uint32_t kDeviceBootsMetricId = 9;
+const uint32_t kFeaturesActiveMetricId = 10;
+const uint32_t kEventsOccurredMetricId = 11;
+
+// MetricReportIds
+const MetricReportId kDeviceBoots_UniqueDevicesMetricReportId =
+    MetricReportId(kDeviceBootsMetricId, 91);
+const MetricReportId kFeaturesActive_UniqueDevicesMetricReportId =
+    MetricReportId(kFeaturesActiveMetricId, 201);
+const MetricReportId kEventsOccurred_SimpleOccurrenceCountMetricReportId =
+    MetricReportId(kEventsOccurredMetricId, 301);
+const MetricReportId kEventsOccurred_UniqueDevicesMetricReportId =
+    MetricReportId(kEventsOccurredMetricId, 401);
 
 static const char kMetricDefinitions[] = R"(
 metric {
@@ -67,7 +97,7 @@
   id: 1
   max_event_code: 100
   reports: {
-    report_name: "ErrorCountsByType"
+    report_name: "ErrorCountsByCode"
     id: 123
     report_type: SIMPLE_OCCURRENCE_COUNT
     local_privacy_noise_level: SMALL
@@ -205,13 +235,179 @@
     report_type: CUSTOM_RAW_DUMP
   }
 }
+
+metric {
+  metric_name: "DeviceBoots"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 9
+  max_event_code: 1
+  reports: {
+    report_name: "DeviceBoots_UniqueDevices"
+    id: 91
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: SMALL
+    window_size: 1
+  }
+}
+
+metric {
+  metric_name: "FeaturesActive"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 10
+  max_event_code: 4
+  reports: {
+    report_name: "FeaturesActive_UniqueDevices"
+    id: 201
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: LARGE
+    window_size: 1
+    window_size: 7
+    window_size: 30
+  }
+}
+
+metric {
+  metric_name: "EventsOccurred"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 11
+  max_event_code: 4
+  reports: {
+    report_name: "SomeEventsOccurred_SimpleCount"
+    id: 301
+    report_type: SIMPLE_OCCURRENCE_COUNT
+    local_privacy_noise_level: MEDIUM
+  }
+  reports: {
+    report_name: "EventsOccurred_UniqueDevices"
+    id: 401
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: MEDIUM
+    window_size: 1
+    window_size: 7
+  }
+}
+
 )";
 
-bool PopulateMetricDefinitions(MetricDefinitions* metric_definitions) {
-  google::protobuf::TextFormat::Parser parser;
-  return parser.ParseFromString(kMetricDefinitions, metric_definitions);
+// Expected parameters of the locally aggregated reports in
+// |kMetricDefinitions|.
+static const ExpectedAggregationParams kExpectedAggregationParams = {
+    /* The total number of locally aggregated Observations which should be
+       generated for each day index. */
+    27,
+    /* The MetricReportIds of the locally aggregated reports in this
+configuration. */
+    {kDeviceBoots_UniqueDevicesMetricReportId,
+     kFeaturesActive_UniqueDevicesMetricReportId,
+     kEventsOccurred_UniqueDevicesMetricReportId},
+    /* The number of Observations which should be generated for each day
+       index, broken down by MetricReportId. */
+    {{kDeviceBoots_UniqueDevicesMetricReportId, 2},
+     {kFeaturesActive_UniqueDevicesMetricReportId, 15},
+     {kEventsOccurred_UniqueDevicesMetricReportId, 10}},
+    /* The number of event codes for each MetricReportId. */
+    {{kDeviceBoots_UniqueDevicesMetricReportId, 2},
+     {kFeaturesActive_UniqueDevicesMetricReportId, 5},
+     {kEventsOccurred_UniqueDevicesMetricReportId, 5}},
+    /* The set of window sizes for each MetricReportId. */
+    {{kDeviceBoots_UniqueDevicesMetricReportId, {1}},
+     {kFeaturesActive_UniqueDevicesMetricReportId, {1, 7, 30}},
+     {kEventsOccurred_UniqueDevicesMetricReportId, {1, 7}}}};
+
+// A set of MetricDefinitions of type EVENT_OCCURRED, each of which has a
+// UNIQUE_N_DAY_ACTIVES report with local_privacy_noise_level set to NONE.
+static const char kNoiseFreeUniqueActivesMetricDefinitions[] = R"(
+metric {
+  metric_name: "DeviceBoots"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 9
+  max_event_code: 1
+  reports: {
+    report_name: "DeviceBoots_UniqueDevices"
+    id: 91
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+  }
 }
 
+metric {
+  metric_name: "FeaturesActive"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 10
+  max_event_code: 4
+  reports: {
+    report_name: "SomeFeaturesActive_UniqueDevices"
+    id: 201
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+    window_size: 7
+    window_size: 30
+  }
+}
+
+metric {
+  metric_name: "EventsOccurred"
+  metric_type: EVENT_OCCURRED
+  customer_id: 1
+  project_id: 1
+  id: 11
+  max_event_code: 4
+  reports: {
+    report_name: "SomeEventsOccurred_SimpleCount"
+    id: 301
+    report_type: SIMPLE_OCCURRENCE_COUNT
+    local_privacy_noise_level: NONE
+  }
+  reports: {
+    report_name: "SomeEventsOccurred_UniqueDevices"
+    id: 401
+    report_type: UNIQUE_N_DAY_ACTIVES
+    local_privacy_noise_level: NONE
+    window_size: 1
+    window_size: 7
+  }
+}
+
+)";
+
+// Expected parameters of the locally aggregated reports in
+// |kNoiseFreeUniqueActivesMetricDefinitions|.
+static const ExpectedAggregationParams
+    kNoiseFreeUniqueActivesExpectedAggregationParams = {
+        /* The total number of locally aggregated Observations which should be
+            generated for each day index. */
+        27,
+        /* The MetricReportIds of the locally aggregated reports in this
+    configuration. */
+        {kDeviceBoots_UniqueDevicesMetricReportId,
+         kFeaturesActive_UniqueDevicesMetricReportId,
+         kEventsOccurred_UniqueDevicesMetricReportId},
+        /* The number of Observations which should be generated for each day
+           index, broken down by MetricReportId. */
+        {{kDeviceBoots_UniqueDevicesMetricReportId, 2},
+         {kFeaturesActive_UniqueDevicesMetricReportId, 15},
+         {kEventsOccurred_UniqueDevicesMetricReportId, 10}},
+        /* The number of event codes for each MetricReportId. */
+        {{kDeviceBoots_UniqueDevicesMetricReportId, 2},
+         {kFeaturesActive_UniqueDevicesMetricReportId, 5},
+         {kEventsOccurred_UniqueDevicesMetricReportId, 5}},
+        /* The set of window sizes for each MetricReportId. */
+        {{kDeviceBoots_UniqueDevicesMetricReportId, {1}},
+         {kFeaturesActive_UniqueDevicesMetricReportId, {1, 7, 30}},
+         {kEventsOccurred_UniqueDevicesMetricReportId, {1, 7}}}};
+
 HistogramPtr NewHistogram(std::vector<uint32_t> indices,
                           std::vector<uint32_t> counts) {
   CHECK(indices.size() == counts.size());
@@ -236,34 +432,22 @@
   return custom_event;
 }
 
-class FakeObservationStore : public ObservationStoreWriterInterface {
- public:
-  StoreStatus AddEncryptedObservation(
-      std::unique_ptr<EncryptedMessage> message,
-      std::unique_ptr<ObservationMetadata> metadata) override {
-    messages_received.emplace_back(std::move(message));
-    metadata_received.emplace_back(std::move(metadata));
-    return kOk;
-  }
-
-  std::vector<std::unique_ptr<EncryptedMessage>> messages_received;
-  std::vector<std::unique_ptr<ObservationMetadata>> metadata_received;
-};
-
-class TestUpdateRecipient : public ObservationStoreUpdateRecipient {
- public:
-  void NotifyObservationsAdded() override { invocation_count++; }
-
-  int invocation_count = 0;
-};
-
 }  // namespace
 
 class LoggerTest : public ::testing::Test {
  protected:
   void SetUp() {
+    SetUpFromMetrics(kMetricDefinitions, kExpectedAggregationParams);
+  }
+
+  // Set up LoggerTest using serialized MetricDefinitions.
+  void SetUpFromMetrics(
+      const char metric_string[],
+      const ExpectedAggregationParams expected_aggregation_params) {
     auto metric_definitions = std::make_unique<MetricDefinitions>();
-    ASSERT_TRUE(PopulateMetricDefinitions(metric_definitions.get()));
+    ASSERT_TRUE(
+        PopulateMetricDefinitions(metric_string, metric_definitions.get()));
+    expected_aggregation_params_ = expected_aggregation_params;
     project_context_.reset(new ProjectContext(kCustomerId, kProjectId,
                                               kCustomerName, kProjectName,
                                               std::move(metric_definitions)));
@@ -276,126 +460,67 @@
                               observation_encrypter_.get()));
     encoder_.reset(
         new Encoder(ClientSecret::GenerateNewSecret(), system_data_.get()));
-    logger_.reset(new Logger(encoder_.get(), observation_writer_.get(),
+    local_aggregate_store_.reset(new LocalAggregateStore);
+    event_aggregator_.reset(new EventAggregator(encoder_.get(),
+                                                observation_writer_.get(),
+                                                local_aggregate_store_.get()));
+    logger_.reset(new Logger(encoder_.get(), event_aggregator_.get(),
+                             observation_writer_.get(),
                              project_context_.get()));
+    // Create a mock clock which does not increment by default when called.
+    // Set the time to 1 year after the start of Unix time so that the start
+    // date of any aggregation window falls after the start of time.
+    mock_clock_ = new IncrementingClock(std::chrono::system_clock::duration(0));
+    mock_clock_->set_time(
+        std::chrono::system_clock::time_point(std::chrono::seconds(kYear)));
+    logger_->SetClock(mock_clock_);
   }
 
-  // Populates |observations| with the contents of the FakeObservationStore.
-  // |observations| should be a vector whose size is equal to the number
-  // of expected observations. Checks the the ObservationStore contains
-  // that number of Observations and that the report_ids of the Observations
-  // are equal to |expected_report_ids|. Returns true iff all checks pass.
-  bool FetchImmediateObservations(
-      std::vector<Observation2>* observations,
-      const std::vector<uint32_t>& expected_report_ids) {
-    CHECK(observations);
-    size_t expected_num_received = observations->size();
-    CHECK(expected_report_ids.size() == expected_num_received);
-    auto num_received = observation_store_->messages_received.size();
-    EXPECT_EQ(num_received, observation_store_->metadata_received.size());
-    if (num_received != observation_store_->metadata_received.size()) {
-      return false;
-    }
-    EXPECT_EQ(num_received, expected_num_received);
-    if (num_received != expected_num_received) {
-      return false;
-    }
-    num_received = update_recipient_->invocation_count;
-    EXPECT_EQ(num_received, expected_num_received);
-    if (num_received != expected_num_received) {
-      return false;
-    }
-    MessageDecrypter message_decrypter("");
-    for (auto i = 0u; i < expected_num_received; i++) {
-      bool isNull = (observation_store_->metadata_received[i].get() == nullptr);
-      EXPECT_FALSE(isNull);
-      if (isNull) {
-        return false;
-      }
-      EXPECT_EQ(observation_store_->metadata_received[i]->report_id(),
-                expected_report_ids[i])
-          << "i=" << i;
-      isNull = (observation_store_->messages_received[i].get() == nullptr);
-      EXPECT_FALSE(isNull);
-      if (isNull) {
-        return false;
-      }
-      bool successfullyDeserialized = message_decrypter.DecryptMessage(
-          *(observation_store_->messages_received[i]), &(observations->at(i)));
-      EXPECT_TRUE(successfullyDeserialized);
-      if (!successfullyDeserialized) {
-        return false;
-      }
-      bool has_random_id = !(observations->at(i).random_id().empty());
-      EXPECT_TRUE(has_random_id);
-      if (!successfullyDeserialized) {
-        return false;
-      }
-    }
-
-    return true;
+  // Returns the day index of the current day according to |mock_clock_|, in
+  // |time_zone|, without incrementing the clock.
+  uint32_t CurrentDayIndex(MetricDefinition::TimeZonePolicy time_zone) {
+    return TimeToDayIndex(
+        std::chrono::system_clock::to_time_t(mock_clock_->peek_now()),
+        time_zone);
   }
 
-  // Populates |observation| with the contents of the FakeObservationStore,
-  // which is expected to contain a single Observation with a report_id
-  // of |expected_report_id|. Returns true iff all checks pass.
-  bool FetchSingleImmediateObservation(Observation2* observation,
-                                       uint32_t expected_report_id) {
-    std::vector<Observation2> observations(1);
-    std::vector<uint32_t> expected_report_ids;
-    expected_report_ids.push_back(expected_report_id);
-    if (!FetchImmediateObservations(&observations, expected_report_ids)) {
-      return false;
-    }
-    *observation = observations[0];
-    return true;
+  // Advances |mock_clock_| by |num_days| days.
+  void AdvanceDay(int num_days) {
+    mock_clock_->increment_by(std::chrono::seconds(kDay) * num_days);
   }
 
-  // Checks that the contents of the FakeObservationStore is a sequence of
-  // IntegerEventObservations specified by the various parameters. Returns
-  // true if all checks pass.
-  bool CheckNumericEventObservations(
-      const std::vector<uint32_t>& expected_report_ids,
-      uint32_t expected_event_code, const std::string expected_component_name,
-      int64_t expected_int_value) {
-    size_t expected_num_observations = expected_report_ids.size();
-    std::vector<Observation2> observations(expected_num_observations);
-    if (!FetchImmediateObservations(&observations, expected_report_ids)) {
-      return false;
-    }
-    for (auto i = 0u; i < expected_num_observations; i++) {
-      const auto& numeric_event = observations[i].numeric_event();
-      EXPECT_EQ(expected_event_code, numeric_event.event_code());
-      if (expected_event_code != numeric_event.event_code()) {
-        return false;
-      }
-      if (expected_component_name.empty()) {
-        EXPECT_TRUE(numeric_event.component_name_hash().empty());
-        if (!numeric_event.component_name_hash().empty()) {
-          return false;
-        }
-      } else {
-        EXPECT_EQ(numeric_event.component_name_hash().size(), 32u);
-        if (numeric_event.component_name_hash().size() != 32u) {
-          return false;
-        }
-      }
-      EXPECT_EQ(expected_int_value, numeric_event.value());
-      if (expected_int_value != numeric_event.value()) {
-        return false;
-      }
-    }
-    return true;
+  // Clears the FakeObservationStore and resets the TestUpdateRecipient's
+  // count.
+  void ResetObservationStore() {
+    observation_store_->messages_received.clear();
+    observation_store_->metadata_received.clear();
+    update_recipient_->invocation_count = 0;
   }
 
   std::unique_ptr<Encoder> encoder_;
   std::unique_ptr<Logger> logger_;
+  std::unique_ptr<EventAggregator> event_aggregator_;
   std::unique_ptr<ObservationWriter> observation_writer_;
+  std::unique_ptr<LocalAggregateStore> local_aggregate_store_;
   std::unique_ptr<FakeObservationStore> observation_store_;
   std::unique_ptr<TestUpdateRecipient> update_recipient_;
   std::unique_ptr<EncryptedMessageMaker> observation_encrypter_;
   std::unique_ptr<ProjectContext> project_context_;
   std::unique_ptr<SystemDataInterface> system_data_;
+  IncrementingClock* mock_clock_;
+  ExpectedAggregationParams expected_aggregation_params_;
+};
+
+// Creates a Logger whose ProjectContext contains only EVENT_OCCURRED metrics,
+// each of which has a report of type UNIQUE_N_DAY_ACTIVES with
+// local_privacy_noise_level set to NONE. Used to test the values of
+// UniqueActivesObservations generated by the EventAggregator.
+class UniqueActivesLoggerTest : public LoggerTest {
+ protected:
+  void SetUp() {
+    SetUpFromMetrics(kNoiseFreeUniqueActivesMetricDefinitions,
+                     kNoiseFreeUniqueActivesExpectedAggregationParams);
+  }
 };
 
 // Tests the method LogEvent().
@@ -403,8 +528,9 @@
   ASSERT_EQ(kOK, logger_->LogEvent(kErrorOccurredMetricId, 42));
   Observation2 observation;
   uint32_t expected_report_id = 123;
-  ASSERT_TRUE(
-      FetchSingleImmediateObservation(&observation, expected_report_id));
+  ASSERT_TRUE(FetchSingleObservation(&observation, expected_report_id,
+                                     observation_store_.get(),
+                                     update_recipient_.get()));
   ASSERT_TRUE(observation.has_basic_rappor());
   EXPECT_FALSE(observation.basic_rappor().data().empty());
 }
@@ -414,8 +540,9 @@
   std::vector<uint32_t> expected_report_ids = {111};
   ASSERT_EQ(kOK, logger_->LogEventCount(kReadCacheHitsMetricId, 43,
                                         "component2", 1, 303));
-  EXPECT_TRUE(CheckNumericEventObservations(expected_report_ids, 43u,
-                                            "component2", 303));
+  EXPECT_TRUE(CheckNumericEventObservations(
+      expected_report_ids, 43u, "component2", 303, observation_store_.get(),
+      update_recipient_.get()));
 }
 
 // Tests the method LogElapsedTime().
@@ -423,8 +550,9 @@
   std::vector<uint32_t> expected_report_ids = {121, 221, 321};
   ASSERT_EQ(kOK, logger_->LogElapsedTime(kModuleLoadTimeMetricId, 44,
                                          "component4", 4004));
-  EXPECT_TRUE(CheckNumericEventObservations(expected_report_ids, 44u,
-                                            "component4", 4004));
+  EXPECT_TRUE(CheckNumericEventObservations(
+      expected_report_ids, 44u, "component4", 4004, observation_store_.get(),
+      update_recipient_.get()));
 }
 
 // Tests the method LogFrameRate().
@@ -432,8 +560,9 @@
   std::vector<uint32_t> expected_report_ids = {131, 231, 331};
   ASSERT_EQ(kOK, logger_->LogFrameRate(kLoginModuleFrameRateMetricId, 45,
                                        "component5", 5.123));
-  EXPECT_TRUE(CheckNumericEventObservations(expected_report_ids, 45u,
-                                            "component5", 5123));
+  EXPECT_TRUE(CheckNumericEventObservations(
+      expected_report_ids, 45u, "component5", 5123, observation_store_.get(),
+      update_recipient_.get()));
 }
 
 // Tests the method LogMemoryUsage().
@@ -441,8 +570,9 @@
   std::vector<uint32_t> expected_report_ids = {141, 241};
   ASSERT_EQ(kOK, logger_->LogMemoryUsage(kLedgerMemoryUsageMetricId, 46,
                                          "component6", 606));
-  EXPECT_TRUE(CheckNumericEventObservations(expected_report_ids, 46u,
-                                            "component6", 606));
+  EXPECT_TRUE(CheckNumericEventObservations(
+      expected_report_ids, 46u, "component6", 606, observation_store_.get(),
+      update_recipient_.get()));
 }
 
 // Tests the method LogIntHistogram().
@@ -454,8 +584,9 @@
                                           "component7", std::move(histogram)));
   Observation2 observation;
   uint32_t expected_report_id = 151;
-  ASSERT_TRUE(
-      FetchSingleImmediateObservation(&observation, expected_report_id));
+  ASSERT_TRUE(FetchSingleObservation(&observation, expected_report_id,
+                                     observation_store_.get(),
+                                     update_recipient_.get()));
   ASSERT_TRUE(observation.has_histogram());
   auto histogram_observation = observation.histogram();
   EXPECT_EQ(47u, histogram_observation.event_code());
@@ -475,7 +606,9 @@
             logger_->LogString(kModuleDownloadsMetricId, "www.mymodule.com"));
   std::vector<Observation2> observations(2);
   std::vector<uint32_t> expected_report_ids = {161, 261};
-  ASSERT_TRUE(FetchImmediateObservations(&observations, expected_report_ids));
+  ASSERT_TRUE(FetchObservations(&observations, expected_report_ids,
+                                observation_store_.get(),
+                                update_recipient_.get()));
 
   ASSERT_TRUE(observations[0].has_string_rappor());
   EXPECT_FALSE(observations[0].string_rappor().data().empty());
@@ -496,8 +629,9 @@
                                          std::move(custom_event)));
   Observation2 observation;
   uint32_t expected_report_id = 125;
-  ASSERT_TRUE(
-      FetchSingleImmediateObservation(&observation, expected_report_id));
+  ASSERT_TRUE(FetchSingleObservation(&observation, expected_report_id,
+                                     observation_store_.get(),
+                                     update_recipient_.get()));
   ASSERT_TRUE(observation.has_custom());
   const CustomObservation& custom_observation = observation.custom();
   for (auto i = 0u; i < values.size(); i++) {
@@ -506,5 +640,246 @@
   }
 }
 
+// Tests that the expected number of locally aggregated Observations are
+// generated when no events have been logged.
+TEST_F(LoggerTest, CheckNumAggregatedObsNoEvents) {
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  std::vector<Observation2> observations;
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, expected_aggregation_params_, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that the expected number of locally aggregated Observations are
+// generated when one Event has been logged for a locally aggregated report.
+TEST_F(LoggerTest, CheckNumAggregatedObsOneEvent) {
+  // Log 1 occurrence of event code 0 for the DeviceBoots metric, which has no
+  // immediate reports.
+  ASSERT_EQ(kOK, logger_->LogEvent(kDeviceBootsMetricId, 0));
+  // Check that no immediate Observation was generated.
+  std::vector<Observation2> immediate_observations(0);
+  std::vector<uint32_t> expected_immediate_report_ids = {};
+  ASSERT_TRUE(
+      FetchObservations(&immediate_observations, expected_immediate_report_ids,
+                        observation_store_.get(), update_recipient_.get()));
+  // Generate locally aggregated observations for the current day index.
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  // Check that the expected numbers of aggregated observations were generated.
+  std::vector<Observation2> aggregated_observations;
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &aggregated_observations, expected_aggregation_params_,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+// Tests that the expected number of locally aggregated Observations are
+// generated when multiple Events have been logged for locally aggregated
+// reports.
+TEST_F(LoggerTest, CheckNumAggregatedObsMultipleEvents) {
+  // Log 2 occurrences of event code 0 for the DeviceBoots metric, which has 1
+  // locally aggregated report and no immediate reports.
+  ASSERT_EQ(kOK, logger_->LogEvent(kDeviceBootsMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kDeviceBootsMetricId, 0));
+  // Log 2 occurrences of event codes for the FeaturesActive metric, which
+  // has 1 locally aggregated report and no immediate reports.
+  ASSERT_EQ(kOK, logger_->LogEvent(kFeaturesActiveMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kFeaturesActiveMetricId, 1));
+  // Check that no immediate Observations were generated.
+  std::vector<Observation2> immediate_observations(0);
+  std::vector<uint32_t> expected_immediate_report_ids = {};
+  ASSERT_TRUE(
+      FetchObservations(&immediate_observations, expected_immediate_report_ids,
+                        observation_store_.get(), update_recipient_.get()));
+  // Generate locally aggregated observations for the current day index.
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  // Check that the expected numbers of aggregated observations were generated.
+  std::vector<Observation2> aggregated_observations;
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &aggregated_observations, expected_aggregation_params_,
+      observation_store_.get(), update_recipient_.get()));
+}
+
+// Tests that the expected number of locally aggregated Observations are
+// generated when multiple Events have been logged for locally aggregated
+// and immediate reports.
+TEST_F(LoggerTest, CheckNumAggregatedObsImmediateAndAggregatedEvents) {
+  // Log 3 occurrences of event codes for the EventsOccurred metric, which
+  // has 1 locally aggregated report and 1 immediate report.
+  ASSERT_EQ(kOK, logger_->LogEvent(kEventsOccurredMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kEventsOccurredMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kEventsOccurredMetricId, 2));
+  // Check that each of the 3 logged events resulted in an immediate
+  // Observation.
+  std::vector<Observation2> immediate_observations(3);
+  std::vector<uint32_t> expected_immediate_report_ids(
+      3, kEventsOccurred_SimpleOccurrenceCountMetricReportId.second);
+  ASSERT_TRUE(
+      FetchObservations(&immediate_observations, expected_immediate_report_ids,
+                        observation_store_.get(), update_recipient_.get()));
+  // Clear the FakeObservationStore.
+  ResetObservationStore();
+  // Generate locally aggregated observations for the current day index.
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  // Check that the expected aggregated observations were generated.
+  std::vector<Observation2> observations;
+  EXPECT_TRUE(FetchAggregatedObservations(
+      &observations, expected_aggregation_params_, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that UniqueActivesObservations with the expected values are generated
+// when no events have been logged.
+TEST_F(UniqueActivesLoggerTest, CheckUniqueActivesObsValuesNoEvents) {
+  // Generate locally aggregated Observations without logging any events.
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  // Check that all generated Observations are of non-activity.
+  auto expected_activity =
+      MakeNullExpectedActivity(expected_aggregation_params_);
+  EXPECT_TRUE(CheckUniqueActivesObservations(
+      expected_activity, expected_aggregation_params_, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that UniqueActivesObservations with the expected values are generated
+// when events have been logged for UNIQUE_N_DAY_ACTIVES reports on a single
+// day.
+TEST_F(UniqueActivesLoggerTest, CheckUniqueActivesObsValuesSingleDay) {
+  // Log 2 occurrences of event code 0 for the DeviceBoots metric, which has 1
+  // locally aggregated report and no immediate reports.
+  ASSERT_EQ(kOK, logger_->LogEvent(kDeviceBootsMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kDeviceBootsMetricId, 0));
+  // Log 2 occurrences of event codes for the SomeFeaturesActive metric, which
+  // has 1 locally aggregated report and no immediate reports.
+  ASSERT_EQ(kOK, logger_->LogEvent(kFeaturesActiveMetricId, 0));
+  ASSERT_EQ(kOK, logger_->LogEvent(kFeaturesActiveMetricId, 1));
+  // Generate locally aggregated observations for the current day index.
+  ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                     CurrentDayIndex(MetricDefinition::UTC)));
+  // Form the expected activity map for the current day index.
+  auto expected_activity =
+      MakeNullExpectedActivity(expected_aggregation_params_);
+  expected_activity[kDeviceBoots_UniqueDevicesMetricReportId] = {
+      {1, {true, false}}};
+  expected_activity[kFeaturesActive_UniqueDevicesMetricReportId] = {
+      {1, {true, true, false, false, false}},
+      {7, {true, true, false, false, false}},
+      {30, {true, true, false, false, false}}};
+  // Check that the expected aggregated observations were generated.
+  EXPECT_TRUE(CheckUniqueActivesObservations(
+      expected_activity, expected_aggregation_params_, observation_store_.get(),
+      update_recipient_.get()));
+}
+
+// Tests that UniqueActivesObservations with the expected values are generated
+// when events have been logged for a UniqueActives report over multiple days.
+//
+// Logs events for the EventsOccurred_UniqueDevices report (whose parent
+// metric has max_event_code = 5, and whose report ID is 401) for 10
+// days, according to the following pattern:
+//
+// * Never log event code 0.
+// * On the i-th day (0-indexed) of logging, log an event for event code k,
+// 1 <= k < 5, if 3*k divides i.
+//
+// Each day following the first day, generates Observations for the previous
+// day index and checks them against the expected set of Observations. Each day,
+// garbage-collects the LocalAggregateStore for the current day index. Finally,
+// generates and check Observations for the last day of logging.
+//
+// The EventsOccurred_UniqueDevices report has window sizes 1 and 7, and
+// the expected pattern of those Observations' values on the i-th day is:
+//
+// (i, window size)            true for event codes
+// ------------------------------------------------------
+// (0, 1)                           1, 2, 3, 4
+// (0, 7)                           1, 2, 3, 4
+// (1, 1)                          ---
+// (1, 7)                           1, 2, 3, 4
+// (2, 1)                          ---
+// (2, 7)                           1, 2, 3, 4
+// (3, 1)                           1
+// (3, 7)                           1, 2, 3, 4
+// (4, 1)                          ---
+// (4, 7)                           1, 2, 3, 4
+// (5, 1)                          ---
+// (5, 7)                           1, 2, 3, 4
+// (6, 1)                           1, 2
+// (6, 7)                           1, 2, 3, 4
+// (7, 1)                          ---
+// (7, 7)                           1, 2
+// (8, 1)                          ---
+// (8, 7)                           1, 2
+// (9, 1)                           1, 3
+// (9, 7)                           1, 2, 3
+// All Observations for all other locally aggregated reports should be
+// observations of non-occurrence.
+TEST_F(UniqueActivesLoggerTest, CheckUniqueActivesObservationValues) {
+  // Form expected activity maps.
+  std::vector<ExpectedActivity> expected_activity(10);
+  for (uint32_t day = 0; day < expected_activity.size(); day++) {
+    expected_activity[day] =
+        MakeNullExpectedActivity(expected_aggregation_params_);
+  }
+  expected_activity[0][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, true, true, true, true}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[1][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[2][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[3][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, true, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[4][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[5][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[6][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, true, true, false, false}},
+      {7, {false, true, true, true, true}}};
+  expected_activity[7][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[8][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, false, false, false, false}},
+      {7, {false, true, true, false, false}}};
+  expected_activity[9][kEventsOccurred_UniqueDevicesMetricReportId] = {
+      {1, {false, true, false, true, false}},
+      {7, {false, true, true, true, false}}};
+
+  for (uint32_t i = 0; i < 10; i++) {
+    if (i < 10) {
+      for (uint32_t event_code = 1; event_code < 5; event_code++) {
+        if (i % (3 * event_code) == 0) {
+          ASSERT_EQ(kOK,
+                    logger_->LogEvent(kEventsOccurredMetricId, event_code));
+        }
+      }
+    }
+    // Clear the FakeObservationStore.
+    ResetObservationStore();
+    // Advance the Logger's clock by 1 day.
+    AdvanceDay(1);
+    // Generate locally aggregated Observations for the previous day index.
+    ASSERT_EQ(kOK, event_aggregator_->GenerateObservations(
+                       CurrentDayIndex(MetricDefinition::UTC) - 1));
+    // Check the generated Observations against the expectation.
+    EXPECT_TRUE(CheckUniqueActivesObservations(
+        expected_activity[i], expected_aggregation_params_,
+        observation_store_.get(), update_recipient_.get()));
+    // Garbage-collect the LocalAggregateStore for the current day index.
+    ASSERT_EQ(kOK, event_aggregator_->GarbageCollect(
+                       CurrentDayIndex(MetricDefinition::UTC)));
+  }
+}
+
 }  // namespace logger
 }  // namespace cobalt
diff --git a/logger/logger_test_utils.cc b/logger/logger_test_utils.cc
new file mode 100644
index 0000000..53cf734
--- /dev/null
+++ b/logger/logger_test_utils.cc
@@ -0,0 +1,343 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "logger/logger_test_utils.h"
+
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/util/message_differencer.h>
+#include <utility>
+
+#include "./observation.pb.h"
+#include "./observation2.pb.h"
+#include "algorithms/rappor/rappor_config_helper.h"
+#include "algorithms/rappor/rappor_encoder.h"
+#include "config/encodings.pb.h"
+#include "encoder/client_secret.h"
+#include "encoder/encoder.h"
+#include "encoder/send_retryer.h"
+#include "encoder/shipping_manager.h"
+#include "logger/encoder.h"
+#include "util/encrypted_message_util.h"
+
+using ::google::protobuf::util::MessageDifferencer;
+
+namespace cobalt {
+
+using encoder::ClientSecret;
+using rappor::BasicRapporEncoder;
+using rappor::RapporConfigHelper;
+using util::EncryptedMessageMaker;
+using util::MessageDecrypter;
+
+namespace logger {
+namespace testing {
+
+bool PopulateMetricDefinitions(const char metric_string[],
+                               MetricDefinitions* metric_definitions) {
+  google::protobuf::TextFormat::Parser parser;
+  return parser.ParseFromString(metric_string, metric_definitions);
+}
+
+ReportAggregationKey MakeAggregationKey(
+    const ProjectContext& project_context,
+    const MetricReportId& metric_report_id) {
+  ReportAggregationKey key;
+  key.set_customer_id(project_context.project().customer_id());
+  key.set_project_id(project_context.project().project_id());
+  key.set_metric_id(metric_report_id.first);
+  key.set_report_id(metric_report_id.second);
+  return key;
+}
+
+AggregationConfig MakeAggregationConfig(
+    const ProjectContext& project_context,
+    const MetricReportId& metric_report_id) {
+  AggregationConfig config;
+  const auto& metric = project_context.GetMetric(metric_report_id.first);
+  bool found_report_id;
+  for (const auto& report : metric->reports()) {
+    if (metric_report_id.second == report.id()) {
+      found_report_id = true;
+      *config.mutable_project() = project_context.project();
+      *config.mutable_metric() = *metric;
+      *config.mutable_report() = report;
+      break;
+    }
+  }
+  if (!found_report_id) {
+    LOG(ERROR) << "Report ID " << metric_report_id.second << " not found.\n";
+  }
+  return config;
+}
+
+ExpectedActivity MakeNullExpectedActivity(
+    const ExpectedAggregationParams& expected_params) {
+  ExpectedActivity expected_activity;
+  for (const auto& report_pair : expected_params.window_sizes) {
+    for (const auto& window_size : report_pair.second) {
+      for (uint32_t event_code = 0;
+           event_code < expected_params.num_event_codes.at(report_pair.first);
+           event_code++) {
+        expected_activity[report_pair.first][window_size].push_back(false);
+      }
+    }
+  }
+  return expected_activity;
+}
+
+bool FetchObservations(std::vector<Observation2>* observations,
+                       const std::vector<uint32_t>& expected_report_ids,
+                       FakeObservationStore* observation_store,
+                       TestUpdateRecipient* update_recipient) {
+  CHECK(observations);
+  size_t expected_num_received = observations->size();
+  CHECK(expected_report_ids.size() == expected_num_received);
+  auto num_received = observation_store->messages_received.size();
+  EXPECT_EQ(num_received, observation_store->metadata_received.size());
+  if (num_received != observation_store->metadata_received.size()) {
+    return false;
+  }
+  EXPECT_EQ(num_received, expected_num_received);
+  if (num_received != expected_num_received) {
+    return false;
+  }
+  num_received = update_recipient->invocation_count;
+  EXPECT_EQ(num_received, expected_num_received);
+  if (num_received != expected_num_received) {
+    return false;
+  }
+  MessageDecrypter message_decrypter("");
+
+  for (auto i = 0u; i < expected_num_received; i++) {
+    bool isNull = (observation_store->metadata_received[i].get() == nullptr);
+    EXPECT_FALSE(isNull);
+    if (isNull) {
+      return false;
+    }
+    EXPECT_EQ(observation_store->metadata_received[i]->report_id(),
+              expected_report_ids[i])
+        << "i=" << i;
+    isNull = (observation_store->messages_received[i].get() == nullptr);
+    EXPECT_FALSE(isNull);
+    if (isNull) {
+      return false;
+    }
+    bool successfullyDeserialized = message_decrypter.DecryptMessage(
+        *(observation_store->messages_received[i]), &(observations->at(i)));
+    EXPECT_TRUE(successfullyDeserialized);
+    if (!successfullyDeserialized) {
+      return false;
+    }
+    bool has_random_id = !(observations->at(i).random_id().empty());
+    EXPECT_TRUE(has_random_id);
+    if (!successfullyDeserialized) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool FetchSingleObservation(Observation2* observation,
+                            uint32_t expected_report_id,
+                            FakeObservationStore* observation_store,
+                            TestUpdateRecipient* update_recipient) {
+  std::vector<Observation2> observations(1);
+  std::vector<uint32_t> expected_report_ids;
+  expected_report_ids.push_back(expected_report_id);
+  if (!FetchObservations(&observations, expected_report_ids, observation_store,
+                         update_recipient)) {
+    return false;
+  }
+  *observation = observations[0];
+  return true;
+}
+
+bool FetchAggregatedObservations(
+    std::vector<Observation2>* observations,
+    const ExpectedAggregationParams& expected_params,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient) {
+  auto num_received = observation_store->messages_received.size();
+  EXPECT_EQ(num_received, observation_store->metadata_received.size());
+  if (num_received != observation_store->metadata_received.size()) {
+    return false;
+  }
+  EXPECT_EQ(expected_params.daily_num_obs, num_received);
+  if (expected_params.daily_num_obs != num_received) {
+    return false;
+  }
+  num_received = update_recipient->invocation_count;
+  EXPECT_EQ(expected_params.daily_num_obs, num_received);
+  if (expected_params.daily_num_obs != num_received) {
+    return false;
+  }
+  observations->resize(expected_params.daily_num_obs);
+  MessageDecrypter message_decrypter("");
+  // Get the expected number of Observations for each report ID.
+  // Decrement the expected number as received Observations are counted.
+  auto expected_num_obs_by_id = expected_params.num_obs_per_report;
+  for (auto i = 0u; i < expected_params.daily_num_obs; i++) {
+    bool isNull = (observation_store->metadata_received[i].get() == nullptr);
+    EXPECT_FALSE(isNull);
+    if (isNull) {
+      return false;
+    }
+    auto metric_report_id =
+        MetricReportId(observation_store->metadata_received[i]->metric_id(),
+                       observation_store->metadata_received[i]->report_id());
+    EXPECT_GE(expected_num_obs_by_id[metric_report_id], 1u) << "i=" << i;
+    expected_num_obs_by_id[metric_report_id]--;
+
+    isNull = (observation_store->messages_received[i].get() == nullptr);
+    EXPECT_FALSE(isNull);
+    if (isNull) {
+      return false;
+    }
+    bool successfullyDeserialized = message_decrypter.DecryptMessage(
+        *(observation_store->messages_received[i]), &(observations->at(i)));
+    EXPECT_TRUE(successfullyDeserialized);
+    if (!successfullyDeserialized) {
+      return false;
+    }
+    bool has_random_id = !(observations->at(i).random_id().empty());
+    EXPECT_TRUE(has_random_id);
+    if (!successfullyDeserialized) {
+      return false;
+    }
+  }
+  // Check that all expected Observations have been found.
+  for (auto iter = expected_num_obs_by_id.begin();
+       iter != expected_num_obs_by_id.end(); iter++) {
+    EXPECT_EQ(0u, iter->second);
+  }
+  return true;
+}
+
+bool CheckNumericEventObservations(
+    const std::vector<uint32_t>& expected_report_ids,
+    uint32_t expected_event_code, const std::string expected_component_name,
+    int64_t expected_int_value, FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient) {
+  size_t expected_num_observations = expected_report_ids.size();
+  std::vector<Observation2> observations(expected_num_observations);
+  if (!FetchObservations(&observations, expected_report_ids, observation_store,
+                         update_recipient)) {
+    return false;
+  }
+  for (auto i = 0u; i < expected_num_observations; i++) {
+    const auto& numeric_event = observations[i].numeric_event();
+    EXPECT_EQ(expected_event_code, numeric_event.event_code());
+    if (expected_event_code != numeric_event.event_code()) {
+      return false;
+    }
+    if (expected_component_name.empty()) {
+      EXPECT_TRUE(numeric_event.component_name_hash().empty());
+      if (!numeric_event.component_name_hash().empty()) {
+        return false;
+      }
+    } else {
+      EXPECT_EQ(numeric_event.component_name_hash().size(), 32u);
+      if (numeric_event.component_name_hash().size() != 32u) {
+        return false;
+      }
+    }
+    EXPECT_EQ(expected_int_value, numeric_event.value());
+    if (expected_int_value != numeric_event.value()) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool CheckUniqueActivesObservations(
+    const ExpectedActivity& expected_activity,
+    const ExpectedAggregationParams& expected_params,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient) {
+  // A container for the strings expected to appear in the |data| field of the
+  // BasicRapporObservation wrapped by the UniqueActivesObservation for a given
+  // MetricReportId, window size, and event code.
+  std::map<MetricReportId, std::map<uint32_t, std::map<uint32_t, std::string>>>
+      expected_values;
+  // Form Basic RAPPOR-encoded bits from the expected activity values and
+  // populate |expected_values|.
+  BasicRapporConfig basic_rappor_config;
+  basic_rappor_config.set_prob_0_becomes_1(0.0);
+  basic_rappor_config.set_prob_1_stays_1(1.0);
+  basic_rappor_config.mutable_indexed_categories()->set_num_categories(1u);
+  std::unique_ptr<BasicRapporEncoder> encoder(new BasicRapporEncoder(
+      basic_rappor_config, ClientSecret::GenerateNewSecret()));
+  for (const auto& id_pair : expected_activity) {
+    expected_values[id_pair.first] = {};
+    for (const auto& window_pair : id_pair.second) {
+      expected_values[id_pair.first][window_pair.first] = {};
+      for (uint32_t event_code = 0; event_code < window_pair.second.size();
+           event_code++) {
+        BasicRapporObservation basic_rappor_obs;
+        if (window_pair.second[event_code]) {
+          ValuePart value;
+          value.set_index_value(0u);
+          encoder->Encode(value, &basic_rappor_obs);
+        } else {
+          encoder->EncodeNullObservation(&basic_rappor_obs);
+        }
+        expected_values[id_pair.first][window_pair.first][event_code] =
+            basic_rappor_obs.data();
+      }
+    }
+  }
+  // Fetch the contents of the ObservationStore and check that each
+  // received Observation corresponds to an element of |expected_values|.
+  std::vector<Observation2> observations;
+  if (!FetchAggregatedObservations(&observations, expected_params,
+                                   observation_store, update_recipient)) {
+    return false;
+  }
+
+  for (size_t i = 0; i < observations.size(); i++) {
+    if (!observations.at(i).has_unique_actives()) {
+      return false;
+    }
+    auto obs_id =
+        MetricReportId(observation_store->metadata_received[i]->metric_id(),
+                       observation_store->metadata_received[i]->report_id());
+    if (expected_values.count(obs_id) == 0) {
+      return false;
+    }
+    uint32_t obs_window_size =
+        observations.at(i).unique_actives().window_size();
+    if (expected_values.at(obs_id).count(obs_window_size) == 0) {
+      return false;
+    }
+    uint32_t obs_event_code = observations.at(i).unique_actives().event_code();
+    if (expected_values.at(obs_id).at(obs_window_size).count(obs_event_code) ==
+        0) {
+      return false;
+    }
+    std::string obs_data =
+        observations.at(i).unique_actives().basic_rappor_obs().data();
+    if (expected_values.at(obs_id).at(obs_window_size).at(obs_event_code) !=
+        obs_data) {
+      return false;
+    }
+    // Remove the bucket of |expected_values| corresponding to the
+    // received Observation.
+    expected_values[obs_id][obs_window_size].erase(obs_event_code);
+    if (expected_values[obs_id][obs_window_size].empty()) {
+      expected_values[obs_id].erase(obs_window_size);
+    }
+    if (expected_values[obs_id].empty()) {
+      expected_values.erase(obs_id);
+    }
+  }
+  // Check that every expected Observation has been received.
+  if (!expected_values.empty()) {
+    return false;
+  }
+  return true;
+}
+
+}  // namespace testing
+}  // namespace logger
+}  // namespace cobalt
diff --git a/logger/logger_test_utils.h b/logger/logger_test_utils.h
new file mode 100644
index 0000000..d2cd102
--- /dev/null
+++ b/logger/logger_test_utils.h
@@ -0,0 +1,162 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_LOGGER_LOGGER_TEST_UTILS_H_
+#define COBALT_LOGGER_LOGGER_TEST_UTILS_H_
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "./gtest.h"
+#include "encoder/shipping_manager.h"
+#include "logger/encoder.h"
+#include "logger/local_aggregation.pb.h"
+
+namespace cobalt {
+namespace logger {
+namespace testing {
+
+// A mock ObservationStore.
+class FakeObservationStore
+    : public ::cobalt::encoder::ObservationStoreWriterInterface {
+ public:
+  StoreStatus AddEncryptedObservation(
+      std::unique_ptr<EncryptedMessage> message,
+      std::unique_ptr<ObservationMetadata> metadata) override {
+    messages_received.emplace_back(std::move(message));
+    metadata_received.emplace_back(std::move(metadata));
+    return kOk;
+  }
+
+  std::vector<std::unique_ptr<EncryptedMessage>> messages_received;
+  std::vector<std::unique_ptr<ObservationMetadata>> metadata_received;
+};
+
+// A mock ObservationStoreUpdateRecipient.
+class TestUpdateRecipient
+    : public ::cobalt::encoder::ObservationStoreUpdateRecipient {
+ public:
+  void NotifyObservationsAdded() override { invocation_count++; }
+
+  int invocation_count = 0;
+};
+
+// A container for information about the set of all locally aggregated
+// reports in a configuration. This is used by tests to check the output of the
+// EventAggregator.
+typedef struct ExpectedAggregationParams {
+  // The total number of locally aggregated Observations which should be
+  // generated each day.
+  size_t daily_num_obs;
+  // The MetricReportIds of the locally aggregated reports in this
+  // configuration.
+  std::set<MetricReportId> metric_report_ids;
+  // Keys are the MetricReportIds of all locally aggregated reports in a config.
+  // The value at a key is the number of Observations which should be generated
+  // each day for that report.
+  std::map<MetricReportId, size_t> num_obs_per_report;
+  // Keys are the MetricReportIds of all locally aggregated reports in a config.
+  // The value at a key is the number of event codes for that report's
+  // parent MetricDefinition.
+  std::map<MetricReportId, size_t> num_event_codes;
+  // Keys are the MetricReportIds of all locally aggregated reports in a config.
+  // The value at a key is the set of window sizes of that report.
+  std::map<MetricReportId, std::set<uint32_t>> window_sizes;
+} ExpectedAggregationParams;
+
+// A representation of a set of expected UniqueActivesObservations for a fixed
+// day index. Used to check the values of UniqueActivesObservations generated by
+// the EventAggregator.
+//
+// The outer map is keyed by MetricReportId, and the value at an ID is
+// a map keyed by window size. The value of the inner map at a window size is a
+// vector of size equal to the number of event codes for the parent metric of
+// the report, and the i-th element of the vector is |true| if the i-th event
+// code occurred on the device during the window of that size, or |false| if
+// not.
+typedef std::map<MetricReportId, std::map<uint32_t, std::vector<bool>>>
+    ExpectedActivity;
+
+// Populates a MetricDefinitions proto message from a serialized representation.
+bool PopulateMetricDefinitions(const char metric_string[],
+                               MetricDefinitions* metric_definitions);
+
+// Returns the ReportAggregationKey associated to a report, given a
+// ProjectContext containing the report and the report's MetricReportId.
+ReportAggregationKey MakeAggregationKey(const ProjectContext& project_context,
+                                        const MetricReportId& metric_report_id);
+
+// Returns the AggregationConfig associated to a report, given a ProjectContext
+// containing the report and the report's MetricReportId.
+AggregationConfig MakeAggregationConfig(const ProjectContext& project_context,
+                                        const MetricReportId& metric_report_id);
+
+// Given an ExpectedAggregationParams struct populated with information about
+// the locally aggregated reports in a config, return an ExpectedActivity map
+// initialized with that config's MetricReportIds and window sizes, with all
+// activity indicators set to false.
+ExpectedActivity MakeNullExpectedActivity(
+    const ExpectedAggregationParams& expected_params);
+
+// Populates |observations| with the contents of a FakeObservationStore.
+// |observations| should be a vector whose size is equal to the number
+// of expected observations. Checks the the ObservationStore contains
+// the expected number of Observations and that the report_ids of the
+// Observations are equal to |expected_report_ids|. Returns true iff all checks
+// pass.
+bool FetchObservations(std::vector<Observation2>* observations,
+                       const std::vector<uint32_t>& expected_report_ids,
+                       FakeObservationStore* observation_store,
+                       TestUpdateRecipient* update_recipient);
+
+// Populates |observation| with the contents of a FakeObservationStore,
+// which is expected to contain a single Observation with a report_id
+// of |expected_report_id|. Returns true iff all checks pass.
+bool FetchSingleObservation(Observation2* observation,
+                            uint32_t expected_report_id,
+                            FakeObservationStore* observation_store,
+                            TestUpdateRecipient* update_recipient);
+
+// Given an ExpectedAggregationParams containing information about the set of
+// locally aggregated reports in a config, populates a vector |observations|
+// with the contents of a FakeObservationStore and checks that the vector
+// contains exactly the number of Observations that the EventAggregator should
+// generate for a single day index, for each locally aggregated report in that
+// config. Does not assume that the contents of the FakeObservationStore have a
+// particular order. The size of |observations| is ignored, and can be 0.
+bool FetchAggregatedObservations(
+    std::vector<Observation2>* observations,
+    const ExpectedAggregationParams& expected_params,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient);
+
+// Checks that the contents of a FakeObservationStore is a sequence of
+// IntegerEventObservations specified by the various parameters. Returns
+// true if all checks pass.
+bool CheckNumericEventObservations(
+    const std::vector<uint32_t>& expected_report_ids,
+    uint32_t expected_event_code, const std::string expected_component_name,
+    int64_t expected_int_value, FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient);
+
+// Checks that the Observations contained in a FakeObservationStore are exactly
+// the UniqueActivesObservations that should be generated for a single day index
+// given a representation of the expected activity indicators for that day, for
+// each UniqueActives report, for each window size and event code, for a config
+// whose locally aggregated reports are all of type UNIQUE_N_DAY_ACTIVES.
+bool CheckUniqueActivesObservations(
+    const ExpectedActivity& expected_activity,
+    const ExpectedAggregationParams& expected_params,
+    FakeObservationStore* observation_store,
+    TestUpdateRecipient* update_recipient);
+
+}  // namespace testing
+}  // namespace logger
+}  // namespace cobalt
+
+#endif  //  COBALT_LOGGER_LOGGER_TEST_UTILS_H_
diff --git a/logger/project_context.h b/logger/project_context.h
index 80c6964..13331fb 100644
--- a/logger/project_context.h
+++ b/logger/project_context.h
@@ -19,6 +19,9 @@
 namespace cobalt {
 namespace logger {
 
+// A pair (metric ID, report ID).
+typedef std::pair<uint32_t, uint32_t> MetricReportId;
+
 std::string MetricDebugString(const MetricDefinition& metric);
 
 // A reference object that gives access to the names and IDs of a Metric and
@@ -69,6 +72,7 @@
 
   const MetricDefinition* GetMetric(const std::string& metric_name) const;
   const MetricDefinition* GetMetric(const uint32_t metric_id) const;
+
   // Makes a MetricRef that wraps this ProjectContext's Project and the given
   // metric_definition (which should have been obtained via GetMetric()).
   // The Project and MetricDefinition must remain valid as long as the returned
@@ -77,6 +81,10 @@
 
   const Project& project() const { return project_; }
 
+  const MetricDefinitions* metric_definitions() const {
+    return metric_definitions_.get();
+  }
+
   const std::string DebugString() const;
 
  private:
diff --git a/logger/status.h b/logger/status.h
index 0430948..c2ab508 100644
--- a/logger/status.h
+++ b/logger/status.h
@@ -19,7 +19,7 @@
   // The Cobalt config is invalid.
   kInvalidConfig = 2,
 
-  // The size of the proviced data is too large.
+  // The size of the provided data is too large.
   kTooBig = 3,
 
   // The repository being written to is full.
diff --git a/tools/test_app2/test_app.cc b/tools/test_app2/test_app.cc
index 41bcbea..42375fe 100644
--- a/tools/test_app2/test_app.cc
+++ b/tools/test_app2/test_app.cc
@@ -23,6 +23,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 #include "logger/encoder.h"
+#include "logger/event_aggregator.h"
 #include "logger/project_context.h"
 #include "logger/status.h"
 #include "util/clearcut/curl_http_client.h"
@@ -39,6 +40,7 @@
 using encoder::SystemData;
 using encoder::SystemDataInterface;
 using logger::Encoder;
+using logger::EventAggregator;
 using logger::EventValuesPtr;
 using logger::kOK;
 using logger::kOther;
@@ -247,6 +249,7 @@
       std::unique_ptr<ProjectContext> project_context,
       std::unique_ptr<MemoryObservationStore> observation_store,
       std::unique_ptr<ClearcutV1ShippingManager> shipping_manager,
+      std::unique_ptr<LocalAggregateStore> local_aggregate_store,
       std::unique_ptr<SystemDataInterface> system_data);
 
   std::unique_ptr<LoggerInterface> NewLogger() override;
@@ -261,9 +264,11 @@
   std::unique_ptr<ProjectContext> project_context_;
   std::unique_ptr<MemoryObservationStore> observation_store_;
   std::unique_ptr<ClearcutV1ShippingManager> shipping_manager_;
+  std::unique_ptr<LocalAggregateStore> local_aggregate_store_;
   std::unique_ptr<SystemDataInterface> system_data_;
   std::unique_ptr<Encoder> encoder_;
   std::unique_ptr<ObservationWriter> observation_writer_;
+  std::unique_ptr<EventAggregator> event_aggregator_;
 };
 
 RealLoggerFactory::RealLoggerFactory(
@@ -272,12 +277,14 @@
     std::unique_ptr<ProjectContext> project_context,
     std::unique_ptr<MemoryObservationStore> observation_store,
     std::unique_ptr<ClearcutV1ShippingManager> shipping_manager,
+    std::unique_ptr<LocalAggregateStore> local_aggregate_store,
     std::unique_ptr<SystemDataInterface> system_data)
     : observation_encrypter_(std::move(observation_encrypter)),
       envelope_encrypter_(std::move(envelope_encrypter)),
       project_context_(std::move(project_context)),
       observation_store_(std::move(observation_store)),
       shipping_manager_(std::move(shipping_manager)),
+      local_aggregate_store_(std::move(local_aggregate_store)),
       system_data_(std::move(system_data)) {}
 
 std::unique_ptr<LoggerInterface> RealLoggerFactory::NewLogger() {
@@ -286,8 +293,11 @@
   observation_writer_.reset(
       new ObservationWriter(observation_store_.get(), shipping_manager_.get(),
                             observation_encrypter_.get()));
-  return std::unique_ptr<LoggerInterface>(new Logger(
-      encoder_.get(), observation_writer_.get(), project_context_.get()));
+  event_aggregator_.reset(new EventAggregator(
+      encoder_.get(), observation_writer_.get(), local_aggregate_store_.get()));
+  return std::unique_ptr<LoggerInterface>(
+      new Logger(encoder_.get(), event_aggregator_.get(),
+                 observation_writer_.get(), project_context_.get()));
 }
 
 bool RealLoggerFactory::SendAccumulatedObservations() {
@@ -337,6 +347,7 @@
       shuffler_public_key_pem, shuffler_encryption_scheme);
   auto observation_store = std::make_unique<MemoryObservationStore>(
       kMaxBytesPerObservation, kMaxBytesPerEnvelope, kMaxBytesTotal);
+  auto local_aggregate_store = std::make_unique<LocalAggregateStore>();
 
   // By using (kMaxSeconds, 0) here we are effectively putting the
   // ShippingDispatcher in manual mode. It will never send
@@ -360,7 +371,8 @@
   std::unique_ptr<LoggerFactory> logger_factory(new RealLoggerFactory(
       std::move(observation_encrypter), std::move(envelope_encrypter),
       std::move(project_context), std::move(observation_store),
-      std::move(shipping_manager), std::move(system_data)));
+      std::move(shipping_manager), std::move(local_aggregate_store),
+      std::move(system_data)));
 
   std::unique_ptr<TestApp> test_app(new TestApp(
       std::move(logger_factory), FLAGS_metric_name, mode, &std::cout));
@@ -749,7 +761,7 @@
         *ostream_ << "Expected non-negative integer instead of " << str << "."
                   << std::endl;
       } else {
-        LOG(ERROR) << "Expected non-negativea integer instead of " << str;
+        LOG(ERROR) << "Expected non-negative integer instead of " << str;
       }
     }
     return false;
diff --git a/util/BUILD.gn b/util/BUILD.gn
index 82ba84d..5ff1588 100644
--- a/util/BUILD.gn
+++ b/util/BUILD.gn
@@ -72,3 +72,16 @@
     ":status",
   ]
 }
+
+static_library("proto_util") {
+  sources = [
+    "proto_util.cc",
+    "proto_util.h",
+  ]
+  configs += [ "//third_party/cobalt:cobalt_config" ]
+  deps = [
+    "//garnet/public/lib/fxl",
+    "//third_party/cobalt/util/crypto_util",
+    "//third_party/protobuf:protobuf_lite",
+  ]
+}
diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt
index 096d9dd..9766584 100644
--- a/util/CMakeLists.txt
+++ b/util/CMakeLists.txt
@@ -21,6 +21,12 @@
                       cobalt_crypto)
 add_cobalt_dependencies(pem_util)
 
+add_library(proto_util
+            proto_util.cc)
+target_link_libraries(proto_util
+	    cobalt_crypto)
+add_cobalt_dependencies(proto_util)
+
 add_executable(util_tests
                datetime_util_test.cc
                encrypted_message_util_test.cc
diff --git a/util/clock.h b/util/clock.h
index 1c25e40..485030d 100644
--- a/util/clock.h
+++ b/util/clock.h
@@ -6,6 +6,9 @@
 #define COBALT_UTIL_CLOCK_H_
 
 #include <chrono>
+#include <iomanip>
+#include <sstream>
+#include <string>
 
 namespace cobalt {
 namespace util {
@@ -31,6 +34,12 @@
 // clock ticks.
 class IncrementingClock : public ClockInterface {
  public:
+  IncrementingClock() {}
+  // Constructs an IncrementingClock which increments by |increment| seconds
+  // each time it is called.
+  explicit IncrementingClock(std::chrono::system_clock::duration increment)
+      : increment_(increment) {}
+
   std::chrono::system_clock::time_point now() override {
     time_ += increment_;
     if (callback_) {
@@ -42,10 +51,19 @@
   // Return the current value of time_ without advancing time.
   std::chrono::system_clock::time_point peek_now() { return time_; }
 
+  // Set the value by which the clock is incremented each time it is called.
   void set_increment(std::chrono::system_clock::duration increment) {
     increment_ = increment;
   }
 
+  // Increment the clock's current time once.
+  void increment_by(std::chrono::system_clock::duration increment) {
+    time_ += increment;
+    if (callback_) {
+      callback_(time_);
+    }
+  }
+
   void set_time(std::chrono::system_clock::time_point t) { time_ = t; }
 
   void set_callback(
@@ -53,6 +71,13 @@
     callback_ = c;
   }
 
+  std::string DebugString() {
+    std::ostringstream stream;
+    std::time_t now = std::chrono::system_clock::to_time_t(time_);
+    stream << "time: " << std::put_time(std::localtime(&now), "%F %T") << "\n";
+    return stream.str();
+  }
+
  private:
   std::chrono::system_clock::time_point time_ =
       std::chrono::system_clock::time_point(
diff --git a/util/crypto_util/base64.cc b/util/crypto_util/base64.cc
index cd7d0b0..fcf733a 100644
--- a/util/crypto_util/base64.cc
+++ b/util/crypto_util/base64.cc
@@ -15,7 +15,6 @@
 #include "util/crypto_util/base64.h"
 
 #include <openssl/base64.h>
-
 #include <algorithm>
 
 namespace cobalt {
diff --git a/util/proto_util.cc b/util/proto_util.cc
new file mode 100644
index 0000000..124cec3
--- /dev/null
+++ b/util/proto_util.cc
@@ -0,0 +1,30 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "util/proto_util.h"
+
+using ::google::protobuf::MessageLite;
+
+namespace cobalt {
+
+using crypto::Base64Encode;
+
+namespace util {
+
+bool SerializeToBase64(const MessageLite& message,
+                       std::string* encoded_message) {
+  std::string serialized_message;
+  if (!message.SerializeToString(&serialized_message)) {
+    LOG(ERROR) << "Failed to serialize proto message.";
+    return false;
+  }
+  if (!Base64Encode(serialized_message, encoded_message)) {
+    LOG(ERROR) << "Failed to base64-encode serialized message.";
+    return false;
+  }
+  return true;
+}
+
+}  // namespace util
+}  // namespace cobalt
diff --git a/util/proto_util.h b/util/proto_util.h
new file mode 100644
index 0000000..27719e3
--- /dev/null
+++ b/util/proto_util.h
@@ -0,0 +1,26 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_UTIL_PROTO_UTIL_H_
+#define COBALT_UTIL_PROTO_UTIL_H_
+
+#include <google/protobuf/message_lite.h>
+#include <string>
+
+#include "./logging.h"
+#include "util/crypto_util/base64.h"
+
+namespace cobalt {
+namespace util {
+
+// Given a proto message, populates a string with the base64 encoding of the
+// serialized message and returns |true| on success. Logs an error and returns
+// |false| if either serialization or base64-encoding fails.
+bool SerializeToBase64(const ::google::protobuf::MessageLite& message,
+                       std::string* encoded_message);
+
+}  // namespace util
+}  // namespace cobalt
+
+#endif  // COBALT_UTIL_PROTO_UTIL_H_