[NoCheck] Avoid pointers in EventAggregatorMgr

- Removes 2 CHECK calls
- Makes AggregateStore/EventAggregator move-assignable (so they no
  longer need to be pointers)
  - Adds move constructor for ProtectedFields.
- Creates 'NotNullUniquePtr' and 'PinnedUniquePtr' that guarantees that
  the held object can never be nullptr.
  - Used for steady_clock_ which must be a pointer, but should never be
    null.

Change-Id: I2ea45ceef6cb8814cf471071b410942e861f3e50
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/659347
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/bin/test_app/test_app.cc b/src/bin/test_app/test_app.cc
index eaa7bad..12db433 100644
--- a/src/bin/test_app/test_app.cc
+++ b/src/bin/test_app/test_app.cc
@@ -355,7 +355,7 @@
 
 bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
   return cobalt_service_->event_aggregator_manager()
-      ->aggregate_store_->GenerateObservations(day_index)
+      ->aggregate_store_.GenerateObservations(day_index)
       .ok();
 }
 
diff --git a/src/lib/util/BUILD.gn b/src/lib/util/BUILD.gn
index 6a55766..dfe1277 100644
--- a/src/lib/util/BUILD.gn
+++ b/src/lib/util/BUILD.gn
@@ -309,6 +309,11 @@
   sources = [ "named_type.h" ]
 }
 
+source_set("not_null") {
+  sources = [ "not_null.h" ]
+  public_deps = [ "$cobalt_root/src:logging" ]
+}
+
 group("tests") {
   testonly = true
   deps = [
diff --git a/src/lib/util/not_null.h b/src/lib/util/not_null.h
new file mode 100644
index 0000000..cc675ba
--- /dev/null
+++ b/src/lib/util/not_null.h
@@ -0,0 +1,183 @@
+// Copyright 2022 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COBALT_SRC_LIB_UTIL_NOT_NULL_H_
+#define COBALT_SRC_LIB_UTIL_NOT_NULL_H_
+
+#include <memory>
+#include <ostream>
+#include <type_traits>
+
+#include "src/logging.h"
+
+namespace cobalt::util {
+
+template <class T>
+class PinnedUniquePtr;
+
+// NotNullUniquePtr is a wrapper around std::unique_ptr<T> that guarantees that the contained type
+// is non-null at construction time.
+//
+// It does this by:
+//   1) Only being constructible by using MakeNotNullUniquePtr or WrapNotNullUniquePtrOrDefault
+//      which both guarantee that the contained type cannot be null.
+//   2) Not allowing any access to the contained type.
+//
+// NotNullUniquePtr is a type that is only used in *transit*. It works as a return type to
+// functions, and it can be transparently converted into a PinnedUniquePtr (see below).
+template <class T>
+class NotNullUniquePtr {
+ private:
+  template <typename U, class... Args>
+  friend constexpr NotNullUniquePtr<U> MakeNotNullUniquePtr(Args&&... args);
+
+  template <typename U, typename V, class... Args>
+  friend constexpr NotNullUniquePtr<V> WrapNotNullUniquePtrOrDefault(std::unique_ptr<V>&& other,
+                                                                     Args&&... args);
+
+  template <typename U>
+  friend constexpr NotNullUniquePtr<U> TESTONLY_TakeRawPointer(U* other);
+
+  // PinnedUniquePtr is move-constructed from a NotNullUniquePtr, and thus needs to access the
+  // contained ptr_.
+  template <class U>
+  friend class PinnedUniquePtr;
+
+  template <class U>
+  friend class NotNullUniquePtr;
+
+  // Private constructor. ptr *must* be non null.
+  template <typename U>
+  explicit NotNullUniquePtr(std::unique_ptr<U> ptr) : ptr_(std::move(ptr)) {
+    // Invariants that must be true if we are to trust this class.
+    static_assert(sizeof(std::unique_ptr<T>) == sizeof(NotNullUniquePtr<T>),
+                  "NotNullUniquePtr must have zero space overhead.");
+    static_assert(!std::is_copy_constructible<NotNullUniquePtr<T>>::value,
+                  "NotNullUniquePtr must not be copy constructible.");
+    static_assert(!std::is_copy_assignable<NotNullUniquePtr<T>>::value,
+                  "NotNullUniquePtr must not be copy assignable.");
+    static_assert(!std::is_default_constructible<NotNullUniquePtr<T>>::value,
+                  "NotNullUniquePtr must have no default constructor.");
+    static_assert(std::is_move_constructible<NotNullUniquePtr<T>>::value,
+                  "NotNullUniquePtr must be move constructible.");
+    static_assert(std::is_move_assignable<NotNullUniquePtr<T>>::value,
+                  "NotNullUniquePtr must be move assignable.");
+  }
+
+ public:
+  // Move constructor. Source NotNullUniquePtr will be invalidated.
+  template <typename U>
+  NotNullUniquePtr(NotNullUniquePtr<U>&& other)  // NOLINT(google-explicit-constructor)
+      : ptr_(std::move(other.ptr_)) {}
+
+ private:
+  std::unique_ptr<T> ptr_;
+};
+
+// MakeNotNullUniquePtr is equivalent to std::make_unique, and constructs a NotNullUniquePtr that is
+// guaranteed to not be null.
+template <typename T, class... Args>
+constexpr NotNullUniquePtr<T> MakeNotNullUniquePtr(Args&&... args) {
+  return NotNullUniquePtr<T>(std::make_unique<T>(std::forward<Args>(args)...));
+}
+
+// WrapNotNullUniquePtrOrDefault allows converting std::unique_ptr into a NotNullUniquePtr. It does
+// this by taking arguments for a fallback constructor after the std::unique_ptr argument. If the
+// provided std::unique_ptr happens to be null, the fallback constructor will be used. This way the
+// pointer is guaranteed never to be null.
+template <typename T, typename U, class... Args>
+constexpr NotNullUniquePtr<U> WrapNotNullUniquePtrOrDefault(std::unique_ptr<U>&& other,
+                                                            Args&&... args) {
+  if (other) {
+    return NotNullUniquePtr<U>(std::move(other));
+  }
+  return NotNullUniquePtr<U>(std::make_unique<T>(std::forward<Args>(args)...));
+}
+
+// TESTONLY_TakeRawPointer is a hack to allow a test to keep a reference to a pointer while passing
+// a NotNullUniquePtr to another class. It is currently only used in testing Cobalt 1.0 local
+// aggregation and is not the preferred solution.
+template <typename T>
+constexpr NotNullUniquePtr<T> TESTONLY_TakeRawPointer(T* other) {
+  CHECK(other) << "Expected non-null other in TESTONLY_TakeRawPointer";
+  return NotNullUniquePtr<T>(std::unique_ptr<T>(other));
+}
+
+// PinnedUniquePtr is a wrapper around std::unique_ptr<T> that guarantees that the held pointer is
+// never null.
+//
+// It does this by:
+//   1) Only being constructible by a move-constructor from NotNullUniquePtr
+//   2) Being immovable (since a move can invalidate the source ptr)
+//
+// PinnedUniquePtr is the actually usable part of the pair of classes in this file, allowing get,
+// operator->, operator*, as well as swap() access. No other methods of the underlying unique_ptr
+// are allowed, as they may cause the held pointer to become null.
+template <class T>
+class PinnedUniquePtr {
+ public:
+  // Swaps the managed objects and associated deleters of *this and another PinnedUniquePtr object
+  // *other*.
+  template <class U>
+  void swap(PinnedUniquePtr<U>& other) noexcept {
+    other.ptr_.swap(ptr_);
+  }
+
+  // Swaps the managed objects and associated deleters of *this and a NotNullUniquePtr object.
+  template <class U>
+  void swap(NotNullUniquePtr<U>& other) noexcept {
+    other.ptr_.swap(ptr_);
+  }
+
+  // Swaps the managed objects and associated deleters of *this and a NotNullUniquePtr object (by
+  // rvalue).
+  template <class U>
+  void swap(NotNullUniquePtr<U>&& other) noexcept {
+    other.ptr_.swap(ptr_);
+  }
+
+  // Construct the PinnedUniquePtr from an existing NotNullUniquePtr. This invalidates the source
+  // ptr, and pins the managed object in place.
+  template <typename U>
+  explicit PinnedUniquePtr(NotNullUniquePtr<U>&& other) : ptr_(std::move(other.ptr_)) {
+    // Invariants that must be true if we are to trust this class.
+    static_assert(sizeof(std::unique_ptr<T>) == sizeof(PinnedUniquePtr<T>),
+                  "PinnedUniquePtr must have zero space overhead.");
+    static_assert(sizeof(std::unique_ptr<T>) == sizeof(PinnedUniquePtr<T>),
+                  "PinnedUniquePtr must have zero space overhead.");
+    static_assert(!std::is_copy_constructible<PinnedUniquePtr<T>>::value,
+                  "PinnedUniquePtr must not be copy constructible.");
+    static_assert(!std::is_copy_assignable<PinnedUniquePtr<T>>::value,
+                  "PinnedUniquePtr must not be copy assignable.");
+    static_assert(!std::is_default_constructible<PinnedUniquePtr<T>>::value,
+                  "PinnedUniquePtr must have no default constructor.");
+    static_assert(!std::is_move_constructible<PinnedUniquePtr<T>>::value,
+                  "PinnedUniquePtr must not be move constructible.");
+    static_assert(!std::is_move_assignable<PinnedUniquePtr<T>>::value,
+                  "PinnedUniquePtr must not be move assignable.");
+  }
+
+  template <typename U>
+  PinnedUniquePtr& operator=(NotNullUniquePtr<U>&& other) {
+    ptr_ = std::move(other.ptr_);
+    return *this;
+  }
+
+  // Access the contained ptr. Guaranteed never to return null.
+  [[nodiscard]] T* get() const noexcept { return ptr_.get(); }
+  T* operator->() { return ptr_.operator->(); }
+  T& operator*() { return ptr_.operator*(); }
+
+ private:
+  std::unique_ptr<T> ptr_;
+
+  // Disable copy and move
+  PinnedUniquePtr() = delete;
+  PinnedUniquePtr(PinnedUniquePtr const&) = delete;
+  PinnedUniquePtr(PinnedUniquePtr&&) = delete;
+};
+
+}  // namespace cobalt::util
+
+#endif  // COBALT_SRC_LIB_UTIL_NOT_NULL_H_
diff --git a/src/lib/util/protected_fields.h b/src/lib/util/protected_fields.h
index 301283a..62df00b 100644
--- a/src/lib/util/protected_fields.h
+++ b/src/lib/util/protected_fields.h
@@ -167,6 +167,14 @@
   BaseProtectedFields& operator=(const BaseProtectedFields&) = delete;
   BaseProtectedFields(const BaseProtectedFields&) = delete;
   BaseProtectedFields() = default;
+
+  // Make BaseProtectedFields Move constructible/assignable
+  BaseProtectedFields& operator=(BaseProtectedFields&& other) noexcept {
+    std::unique_lock<Mutex> lock(other.mutex_);
+    fields_ = std::move(other.fields_);
+    return *this;
+  }
+  BaseProtectedFields(BaseProtectedFields&& other) noexcept { *this = other; }
 };
 
 // The default implementation of ProtectedFields. Uses standard mutexes.
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 5a993e4..6704a13 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -58,6 +58,7 @@
     "$cobalt_root/src/lib/util:consistent_proto_store",
     "$cobalt_root/src/lib/util:datetime_util",
     "$cobalt_root/src/lib/util:file_system",
+    "$cobalt_root/src/lib/util:not_null",
     "$cobalt_root/src/logger:encoder",
     "$cobalt_root/src/logger:observation_writer",
     "$cobalt_root/src/public:cobalt_config",
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 606878f..842770c 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -169,8 +169,8 @@
 AggregateStore::AggregateStore(
     const Encoder& encoder, const ObservationWriter* observation_writer,
     // TODO(fxbug.dev/85571): NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
-    ConsistentProtoStore* local_aggregate_proto_store,
-    ConsistentProtoStore* obs_history_proto_store, const size_t backfill_days)
+    ConsistentProtoStore& local_aggregate_proto_store,
+    ConsistentProtoStore& obs_history_proto_store, const size_t backfill_days)
     : encoder_(encoder),
       observation_writer_(observation_writer),
       local_aggregate_proto_store_(local_aggregate_proto_store),
@@ -181,7 +181,7 @@
   auto locked_store = protected_aggregate_store_.lock();
   locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore();
   auto restore_aggregates_status =
-      local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
+      local_aggregate_proto_store_.get().Read(&(locked_store->local_aggregate_store));
   switch (restore_aggregates_status.error_code()) {
     case StatusCode::OK: {
       VLOG(4) << "Read LocalAggregateStore from disk.";
@@ -211,7 +211,8 @@
   }
 
   auto locked_obs_history = protected_obs_history_.lock();
-  auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history);
+  auto restore_history_status =
+      obs_history_proto_store_.get().Read(&locked_obs_history->obs_history);
   switch (restore_history_status.error_code()) {
     case StatusCode::OK: {
       VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
@@ -371,7 +372,7 @@
   // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
   internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::AggregateStore,
                                     static_cast<int64_t>(store_size));
-  auto status = local_aggregate_proto_store_->Write(local_aggregate_store);
+  auto status = local_aggregate_proto_store_.get().Write(local_aggregate_store);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to back up the LocalAggregateStore with error " << status;
     return status;
@@ -385,7 +386,7 @@
   // Cannot cause a loop, since TrackDiskUsage does not use any Cobalt 1.0 metrics
   internal_metrics_->TrackDiskUsage(logger::InternalMetrics::StorageClass::ObservationHistory,
                                     static_cast<int64_t>(history_size));
-  auto status = obs_history_proto_store_->Write(obs_history);
+  auto status = obs_history_proto_store_.get().Write(obs_history);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. " << status;
     return status;
@@ -687,8 +688,8 @@
 Status AggregateStore::GenerateSingleUniqueActivesObservation(
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const {
-  auto encoder_result = encoder_.EncodeUniqueActivesObservation(metric_ref, report, obs_day_index,
-                                                                event_code, was_active, window);
+  auto encoder_result = encoder_.get().EncodeUniqueActivesObservation(
+      metric_ref, report, obs_day_index, event_code, was_active, window);
   if (!encoder_result.status.ok()) {
     return encoder_result.status;
   }
@@ -858,9 +859,9 @@
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
     int64_t value) const {
-  Encoder::Result encoder_result =
-      encoder_.EncodePerDeviceNumericObservation(metric_ref, report, obs_day_index, component,
-                                                 UnpackEventCodesProto(event_code), value, window);
+  Encoder::Result encoder_result = encoder_.get().EncodePerDeviceNumericObservation(
+      metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value,
+      window);
   if (!encoder_result.status.ok()) {
     return encoder_result.status;
   }
@@ -879,7 +880,7 @@
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
     int64_t value) const {
-  Encoder::Result encoder_result = encoder_.EncodePerDeviceHistogramObservation(
+  Encoder::Result encoder_result = encoder_.get().EncodePerDeviceHistogramObservation(
       metric_ref, report, obs_day_index, component, UnpackEventCodesProto(event_code), value,
       window);
 
@@ -902,7 +903,7 @@
                                                                     const ReportDefinition* report,
                                                                     uint32_t obs_day_index) const {
   auto encoder_result =
-      encoder_.EncodeReportParticipationObservation(metric_ref, report, obs_day_index);
+      encoder_.get().EncodeReportParticipationObservation(metric_ref, report, obs_day_index);
   if (!encoder_result.status.ok()) {
     return encoder_result.status;
   }
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 9f9a709..9a0aca1 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -70,8 +70,11 @@
   // value larger than |kMaxAllowedBackfillDays| is passed.
   AggregateStore(const logger::Encoder& encoder,
                  const logger::ObservationWriter* observation_writer,
-                 util::ConsistentProtoStore* local_aggregate_proto_store,
-                 util::ConsistentProtoStore* obs_history_proto_store, size_t backfill_days = 0);
+                 util::ConsistentProtoStore& local_aggregate_proto_store,
+                 util::ConsistentProtoStore& obs_history_proto_store, size_t backfill_days = 0);
+
+  AggregateStore(AggregateStore&&) = default;
+  AggregateStore& operator=(AggregateStore&&) = default;
 
   // Given a ProjectContext, MetricDefinition, and ReportDefinition checks whether a key with the
   // same customer, project, metric, and report ID already exists in the LocalAggregateStore. If
@@ -339,12 +342,12 @@
   size_t backfill_days_ = 0;
 
   // Objects used to generate observations.
-  const logger::Encoder& encoder_;                       // not owned
-  const logger::ObservationWriter* observation_writer_;  // not owned
+  std::reference_wrapper<const logger::Encoder> encoder_;  // not owned
+  const logger::ObservationWriter* observation_writer_;    // not owned
 
   // Used for loading and backing up the proto stores to disk.
-  util::ConsistentProtoStore* local_aggregate_proto_store_;  // not owned
-  util::ConsistentProtoStore* obs_history_proto_store_;      // not owned
+  std::reference_wrapper<util::ConsistentProtoStore> local_aggregate_proto_store_;  // not owned
+  std::reference_wrapper<util::ConsistentProtoStore> obs_history_proto_store_;      // not owned
   logger::InternalMetricsPtr internal_metrics_;
 
   // In memory store of local aggregations and data needed to derive them.
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 897c731..873530b 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -17,6 +17,7 @@
 
 #include "src/lib/util/clock.h"
 #include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
 #include "src/lib/util/proto_util.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation/aggregation_utils.h"
@@ -223,7 +224,7 @@
         cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
     day_store_created_ = CurrentDayIndex();
     test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
-    event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
+    event_aggregator_mgr_->SetSteadyClock(util::TESTONLY_TakeRawPointer(test_steady_clock_));
   }
 
   // Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -244,45 +245,45 @@
                           time_zone);
   }
 
-  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
+  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_.backfill_days_; }
 
   void SetBackfillDays(size_t num_days) {
-    event_aggregator_mgr_->aggregate_store_->backfill_days_ = num_days;
+    event_aggregator_mgr_->aggregate_store_.backfill_days_ = num_days;
   }
 
   Status BackUpLocalAggregateStore() {
-    return event_aggregator_mgr_->aggregate_store_->BackUpLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_.BackUpLocalAggregateStore();
   }
 
   Status BackUpObservationHistory() {
-    return event_aggregator_mgr_->aggregate_store_->BackUpObservationHistory();
+    return event_aggregator_mgr_->aggregate_store_.BackUpObservationHistory();
   }
 
-  LocalAggregateStore MakeNewLocalAggregateStore(
+  static LocalAggregateStore MakeNewLocalAggregateStore(
       uint32_t version = kCurrentLocalAggregateStoreVersion) {
-    return event_aggregator_mgr_->aggregate_store_->MakeNewLocalAggregateStore(version);
+    return AggregateStore::MakeNewLocalAggregateStore(version);
   }
 
-  AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
+  static AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
       uint32_t version = kCurrentObservationHistoryStoreVersion) {
-    return event_aggregator_mgr_->aggregate_store_->MakeNewObservationHistoryStore(version);
+    return AggregateStore::MakeNewObservationHistoryStore(version);
   }
 
-  Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
-    return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
+  static Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
+    return AggregateStore::MaybeUpgradeLocalAggregateStore(store);
   }
 
-  Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
-    return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
+  static Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
+    return AggregateStore::MaybeUpgradeObservationHistoryStore(store);
   }
 
   LocalAggregateStore CopyLocalAggregateStore() {
-    return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
   }
 
   Status GenerateObservations(uint32_t final_day_index_utc, uint32_t final_day_index_local = 0u) {
-    return event_aggregator_mgr_->aggregate_store_->GenerateObservations(final_day_index_utc,
-                                                                         final_day_index_local);
+    return event_aggregator_mgr_->aggregate_store_.GenerateObservations(final_day_index_utc,
+                                                                        final_day_index_local);
   }
 
   bool IsReportInStore(lib::ReportIdentifier report) {
@@ -294,7 +295,7 @@
     key_data.set_report_id(report.report_id());
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
     return locked->local_aggregate_store.by_report_key().count(key) > 0;
   }
 
@@ -307,7 +308,7 @@
     key_data.set_report_id(report.report_id());
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
 
     auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
     if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -338,7 +339,7 @@
     key_data.set_report_id(report.report_id());
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_.protected_aggregate_store_.lock();
 
     auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
     if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -363,10 +364,10 @@
     return by_day_index->second.numeric_daily_aggregate().value();
   }
 
-  AggregateStore* GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_.get(); }
+  AggregateStore& GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_; }
 
   Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return event_aggregator_mgr_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+    return event_aggregator_mgr_->aggregate_store_.GarbageCollect(day_index_utc, day_index_local);
   }
 
   void DoScheduledTasksNow() {
@@ -399,7 +400,7 @@
             .ValueOrDie();
     event_record->event()->set_day_index(day_index);
     event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddUniqueActivesEvent(
         metric_report_id.second, *event_record);
     if (logged_activity == nullptr) {
       return status;
@@ -431,7 +432,7 @@
     event_count_event->set_component(component);
     event_count_event->add_event_code(event_code);
     event_count_event->set_count(count);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddEventCountEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddEventCountEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -463,7 +464,7 @@
     elapsed_time_event->set_component(component);
     elapsed_time_event->add_event_code(event_code);
     elapsed_time_event->set_elapsed_micros(micros);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddElapsedTimeEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -496,7 +497,7 @@
     frame_rate_event->add_event_code(event_code);
     int64_t frames_per_1000_seconds = std::lround(fps * 1000.0);
     frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddFrameRateEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -531,7 +532,7 @@
       memory_usage_event->add_event_code(event_code);
     }
     memory_usage_event->set_bytes(bytes);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddMemoryUsageEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -560,7 +561,7 @@
   // of reference.
   bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
                                     uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more UniqueActives
     // aggregates than |logged_activity| and |day_last_garbage_collected_|
     // should imply.
@@ -675,7 +676,7 @@
 
   bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
                                        uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more PerDeviceNumeric
     // aggregates than |logged_values| and |day_last_garbage_collected_| should
     // imply.
@@ -922,7 +923,7 @@
 
   void SetUp() override {
     AggregateStoreTest::SetUp();
-    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
   }
 
   // Adds an EventOccurredEvent to the local aggregations for the MetricReportId of a locally
@@ -1142,7 +1143,7 @@
   EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
   ASSERT_EQ(
       StatusCode::OK,
-      GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+      GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
   EXPECT_TRUE(IsReportInStore(kTestReportIdentifier));
 }
 
@@ -1154,10 +1155,10 @@
   EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
   ASSERT_EQ(
       StatusCode::OK,
-      GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+      GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
   ASSERT_EQ(
       StatusCode::OK,
-      GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+      GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
   EXPECT_TRUE(IsReportInStore(kTestReportIdentifier));
 }
 
@@ -1170,7 +1171,7 @@
   EXPECT_FALSE(IsReportInStore(kTestReportIdentifier));
   ASSERT_EQ(
       StatusCode::INVALID_ARGUMENT,
-      GetAggregateStore()->MaybeInsertReportConfig(*project_context, metric, report).error_code());
+      GetAggregateStore().MaybeInsertReportConfig(*project_context, metric, report).error_code());
 }
 
 // The Aggregate store sets the record for an event to active using SetActive.
@@ -1179,12 +1180,12 @@
   auto project_context = GetProjectContextFor(metric);
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_FALSE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
   ASSERT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+                                .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
                                 .error_code());
   EXPECT_TRUE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
 }
@@ -1195,15 +1196,15 @@
   auto project_context = GetProjectContextFor(metric);
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   EXPECT_FALSE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
 
   ASSERT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+                                .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
                                 .error_code());
   ASSERT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+                                .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
                                 .error_code());
   EXPECT_TRUE(IsActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex));
 }
@@ -1215,11 +1216,11 @@
   auto project_context = GetProjectContextFor(metric);
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   ASSERT_EQ(StatusCode::INVALID_ARGUMENT,
             GetAggregateStore()
-                ->SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
+                .SetActive(kTestReportIdentifier, kTestEventCode, kTestDayIndex)
                 .error_code());
 }
 
@@ -1234,17 +1235,17 @@
   const int64_t kSecondValue = 7;
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kFirstValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kFirstValue)
                                 .error_code());
   EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kSecondValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kSecondValue)
                                 .error_code());
   EXPECT_EQ(kSecondValue + kFirstValue,
             GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
@@ -1261,17 +1262,17 @@
   const int64_t kSecondValue = 7;
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kFirstValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kFirstValue)
                                 .error_code());
   EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kSecondValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kSecondValue)
                                 .error_code());
   EXPECT_EQ(kSecondValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
 }
@@ -1286,17 +1287,17 @@
   const int64_t kSecondValue = 7;
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kFirstValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kFirstValue)
                                 .error_code());
   EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
   EXPECT_EQ(StatusCode::OK, GetAggregateStore()
-                                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
-                                                         kTestDayIndex, kSecondValue)
+                                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode,
+                                                        kTestDayIndex, kSecondValue)
                                 .error_code());
   EXPECT_EQ(kFirstValue, GetValue(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex));
 }
@@ -1310,8 +1311,8 @@
 
   ASSERT_EQ(StatusCode::INVALID_ARGUMENT,
             GetAggregateStore()
-                ->UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex,
-                                         /*value*/ 4)
+                .UpdateNumericAggregate(kTestReportIdentifier, "", kTestEventCode, kTestDayIndex,
+                                        /*value*/ 4)
                 .error_code());
 }
 
@@ -1320,25 +1321,24 @@
   const int64_t kFirstValue = 3;
 
   EXPECT_EQ(0u,
-            GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
-                                                                       /*aggregation_days*/ 1));
-  GetAggregateStore()->SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
-                                                             /*aggregation_days*/ 1, kFirstValue);
+            GetAggregateStore().GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                                      /*aggregation_days*/ 1));
+  GetAggregateStore().SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                            /*aggregation_days*/ 1, kFirstValue);
   EXPECT_EQ(kFirstValue,
-            GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
-                                                                       /*aggregation_days*/ 1));
+            GetAggregateStore().GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                                      /*aggregation_days*/ 1));
 }
 TEST_F(AggregateStoreTest, SetPerDeviceNumericLastGeneratedDayIndex) {
   const std::string kReportKey = "test_key";
   const int64_t kFirstValue = 3;
 
-  EXPECT_EQ(0u, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+  EXPECT_EQ(0u, GetAggregateStore().GetPerDeviceNumericLastGeneratedDayIndex(
                     kReportKey, "", kTestEventCode,
                     /*aggregation_days*/ 1));
-  GetAggregateStore()->SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
-                                                                /*aggregation_days*/ 1,
-                                                                kFirstValue);
-  EXPECT_EQ(kFirstValue, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+  GetAggregateStore().SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
+                                                               /*aggregation_days*/ 1, kFirstValue);
+  EXPECT_EQ(kFirstValue, GetAggregateStore().GetPerDeviceNumericLastGeneratedDayIndex(
                              kReportKey, "", kTestEventCode,
                              /*aggregation_days*/ 1));
 }
@@ -1346,10 +1346,10 @@
   const std::string kReportKey = "test_key";
   const int64_t kFirstValue = 3;
 
-  EXPECT_EQ(0u, GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
-  GetAggregateStore()->SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
+  EXPECT_EQ(0u, GetAggregateStore().GetReportParticipationLastGeneratedDayIndex(kReportKey));
+  GetAggregateStore().SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
   EXPECT_EQ(kFirstValue,
-            GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+            GetAggregateStore().GetReportParticipationLastGeneratedDayIndex(kReportKey));
 }
 
 // Tests that EventAggregator::GenerateObservations() returns a positive
@@ -1359,7 +1359,7 @@
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   // Generate locally aggregated Observations for the current day index.
   EXPECT_EQ(StatusCode::OK, GenerateObservations(CurrentDayIndex()).error_code());
@@ -1375,7 +1375,7 @@
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   // Check that Observations are generated when GenerateObservations is called
   // for the current day index for the first time.
@@ -1419,7 +1419,7 @@
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr_->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   EXPECT_EQ(StatusCode::OK, GenerateObservations(CurrentDayIndex()).error_code());
   std::vector<Observation> observations(0);
diff --git a/src/local_aggregation/event_aggregator.cc b/src/local_aggregation/event_aggregator.cc
index 833edcf..c668941 100644
--- a/src/local_aggregation/event_aggregator.cc
+++ b/src/local_aggregation/event_aggregator.cc
@@ -21,7 +21,7 @@
 using logger::EventRecord;
 using logger::ProjectContext;
 
-EventAggregator::EventAggregator(AggregateStore* aggregate_store)
+EventAggregator::EventAggregator(AggregateStore& aggregate_store)
     : aggregate_store_(aggregate_store) {}
 
 Status EventAggregator::UpdateAggregationConfigs(const ProjectContext& project_context) {
@@ -32,7 +32,8 @@
         for (const auto& report : metric.reports()) {
           switch (report.report_type()) {
             case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
-              status = aggregate_store_->MaybeInsertReportConfig(project_context, metric, report);
+              status =
+                  aggregate_store_.get().MaybeInsertReportConfig(project_context, metric, report);
               if (!status.ok()) {
                 return status;
               }
@@ -50,7 +51,8 @@
           switch (report.report_type()) {
             case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
             case ReportDefinition::PER_DEVICE_HISTOGRAM: {
-              status = aggregate_store_->MaybeInsertReportConfig(project_context, metric, report);
+              status =
+                  aggregate_store_.get().MaybeInsertReportConfig(project_context, metric, report);
               if (!status.ok()) {
                 return status;
               }
@@ -91,9 +93,9 @@
         .WithContexts(event_record)
         .Build();
   }
-  return aggregate_store_->SetActive(event_record.MetricIdentifier().ForReport(report_id),
-                                     event->event_occurred_event().event_code(),
-                                     event->day_index());
+  return aggregate_store_.get().SetActive(event_record.MetricIdentifier().ForReport(report_id),
+                                          event->event_occurred_event().event_code(),
+                                          event->day_index());
 }
 
 Status EventAggregator::AddEventCountEvent(uint32_t report_id, const EventRecord& event_record) {
@@ -107,7 +109,7 @@
 
   const EventCountEvent& event_count_event = event->event_count_event();
 
-  return aggregate_store_->UpdateNumericAggregate(
+  return aggregate_store_.get().UpdateNumericAggregate(
       event_record.MetricIdentifier().ForReport(report_id), event_count_event.component(),
       config::PackEventCodes(event_count_event.event_code()), event->day_index(),
       event_count_event.count());
@@ -124,7 +126,7 @@
 
   const ElapsedTimeEvent& elapsed_time_event = event->elapsed_time_event();
 
-  return aggregate_store_->UpdateNumericAggregate(
+  return aggregate_store_.get().UpdateNumericAggregate(
       event_record.MetricIdentifier().ForReport(report_id), elapsed_time_event.component(),
       config::PackEventCodes(elapsed_time_event.event_code()), event->day_index(),
       elapsed_time_event.elapsed_micros());
@@ -140,7 +142,7 @@
   }
   const FrameRateEvent& frame_rate_event = event->frame_rate_event();
 
-  return aggregate_store_->UpdateNumericAggregate(
+  return aggregate_store_.get().UpdateNumericAggregate(
       event_record.MetricIdentifier().ForReport(report_id), frame_rate_event.component(),
       config::PackEventCodes(frame_rate_event.event_code()), event->day_index(),
       frame_rate_event.frames_per_1000_seconds());
@@ -157,7 +159,7 @@
 
   const MemoryUsageEvent& memory_usage_event = event->memory_usage_event();
 
-  return aggregate_store_->UpdateNumericAggregate(
+  return aggregate_store_.get().UpdateNumericAggregate(
       event_record.MetricIdentifier().ForReport(report_id), memory_usage_event.component(),
       config::PackEventCodes(memory_usage_event.event_code()), event->day_index(),
       memory_usage_event.bytes());
diff --git a/src/local_aggregation/event_aggregator.h b/src/local_aggregation/event_aggregator.h
index 97b5d8f..a2b3bf0 100644
--- a/src/local_aggregation/event_aggregator.h
+++ b/src/local_aggregation/event_aggregator.h
@@ -33,7 +33,12 @@
   // Constructs an EventAggregator.
   //
   // aggregate_store: an AggregateStore, which is used to store the local aggregates.
-  explicit EventAggregator(AggregateStore* aggregate_store);
+  explicit EventAggregator(AggregateStore& aggregate_store);
+
+  // EventAggregator should be move-only.
+  EventAggregator(EventAggregator const&) = delete;
+  EventAggregator& operator=(EventAggregator const&) = delete;
+  EventAggregator& operator=(EventAggregator&&) = default;
 
   // Updates the EventAggregator's view of the Cobalt metric and report
   // registry.
@@ -94,7 +99,7 @@
   Status AddMemoryUsageEvent(uint32_t report_id, const logger::EventRecord& event_record);
 
  private:
-  AggregateStore* aggregate_store_;  // not owned
+  std::reference_wrapper<AggregateStore> aggregate_store_;  // not owned
 };
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index b02c0a4..c9593b9 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -9,14 +9,13 @@
 #include "src/lib/util/consistent_proto_store.h"
 #include "src/lib/util/datetime_util.h"
 #include "src/lib/util/file_system.h"
+#include "src/lib/util/not_null.h"
 #include "src/lib/util/status_builder.h"
 #include "src/public/lib/statusor/status_macros.h"
 #include "src/registry/window_size.pb.h"
 
 namespace cobalt::local_aggregation {
 
-using util::ConsistentProtoStore;
-using util::SteadyClock;
 using util::TimeToDayIndex;
 
 const std::chrono::seconds EventAggregatorManager::kDefaultAggregateBackupInterval =
@@ -36,12 +35,12 @@
       aggregate_backup_interval_(kDefaultAggregateBackupInterval),
       generate_obs_interval_(kDefaultGenerateObsInterval),
       gc_interval_(kDefaultGCInterval),
-      owned_local_aggregate_proto_store_(
-          new ConsistentProtoStore(cfg.local_aggregate_proto_store_path, fs)),
-      owned_obs_history_proto_store_(
-          new ConsistentProtoStore(cfg.obs_history_proto_store_path, fs)) {
-  Reset();
-}
+      steady_clock_(util::MakeNotNullUniquePtr<util::SteadyClock>()),
+      owned_local_aggregate_proto_store_(cfg.local_aggregate_proto_store_path, fs),
+      owned_obs_history_proto_store_(cfg.obs_history_proto_store_path, fs),
+      aggregate_store_(encoder_, observation_writer, owned_local_aggregate_proto_store_,
+                       owned_obs_history_proto_store_, backfill_days_),
+      event_aggregator_(aggregate_store_) {}
 
 void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
   auto locked = protected_worker_thread_controller_.lock();
@@ -77,7 +76,7 @@
     // If shutdown has been requested, back up the LocalAggregateStore and
     // exit.
     if (locked->shut_down) {
-      aggregate_store_->BackUpLocalAggregateStore();
+      aggregate_store_.BackUpLocalAggregateStore();
       return;
     }
     // Sleep until the next scheduled backup of the LocalAggregateStore or
@@ -90,10 +89,10 @@
       }
       return locked->shut_down || locked->back_up_now;
     });
-    aggregate_store_->BackUpLocalAggregateStore();
+    aggregate_store_.BackUpLocalAggregateStore();
     if (locked->back_up_now) {
       locked->back_up_now = false;
-      aggregate_store_->BackUpObservationHistory();
+      aggregate_store_.BackUpObservationHistory();
     }
     // If the worker thread was woken up by a shutdown request, exit.
     // Otherwise, complete any scheduled Observation generation and garbage
@@ -125,9 +124,9 @@
                                 "current day index is too small.";
     } else {
       Status obs_status =
-          aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
+          aggregate_store_.GenerateObservations(yesterday_utc, yesterday_local_time);
       if (obs_status.ok()) {
-        aggregate_store_->BackUpObservationHistory();
+        aggregate_store_.BackUpObservationHistory();
 
         // We can guarantee that we will never need a metadata if it is older than:
         // BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
@@ -145,9 +144,9 @@
       LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
                                 "current day index is too small.";
     } else {
-      Status gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
+      Status gc_status = aggregate_store_.GarbageCollect(yesterday_utc, yesterday_local_time);
       if (gc_status.ok()) {
-        aggregate_store_->BackUpLocalAggregateStore();
+        aggregate_store_.BackUpLocalAggregateStore();
       } else {
         LOG(ERROR) << "GarbageCollect failed with status: " << gc_status.error_message();
       }
@@ -166,7 +165,7 @@
   }
 
   CB_RETURN_IF_ERROR(
-      aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local));
+      aggregate_store_.GenerateObservations(final_day_index_utc, final_day_index_local));
 
   // We can guarantee that we will never need a metadata if it is older than:
   // BackfillManagerDays (currently 3 days) + WindowSize_MAX (currently 30 days).
@@ -184,13 +183,13 @@
 }
 
 void EventAggregatorManager::Reset() {
-  aggregate_store_ = std::make_unique<AggregateStore>(
-      encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
-      owned_obs_history_proto_store_.get(), backfill_days_);
-  aggregate_store_->ResetInternalMetrics(internal_metrics_);
+  aggregate_store_ =
+      AggregateStore(encoder_, observation_writer_, owned_local_aggregate_proto_store_,
+                     owned_obs_history_proto_store_, backfill_days_);
+  aggregate_store_.ResetInternalMetrics(internal_metrics_);
 
-  event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
-  steady_clock_ = std::make_unique<SteadyClock>();
+  event_aggregator_ = EventAggregator(aggregate_store_);
+  steady_clock_ = util::MakeNotNullUniquePtr<util::SteadyClock>();
 }
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index d2e1bc1..1f884bd 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -13,6 +13,7 @@
 
 #include "src/lib/util/clock.h"
 #include "src/lib/util/consistent_proto_store.h"
+#include "src/lib/util/not_null.h"
 #include "src/lib/util/protected_fields.h"
 #include "src/local_aggregation/aggregate_store.h"
 #include "src/local_aggregation/event_aggregator.h"
@@ -81,7 +82,7 @@
   void Reset();
 
   // Returns a pointer to an EventAggregator to be used for logging.
-  EventAggregator* GetEventAggregator() { return event_aggregator_.get(); }
+  EventAggregator& GetEventAggregator() { return event_aggregator_; }
 
   // Checks that the worker thread is shut down, and if so, triggers an out of schedule
   // AggregateStore::GenerateObservations() and returns its result. Returns kOther if the
@@ -97,15 +98,15 @@
   // only useful in testing to verify that the worker thread is not running too frequently.
   uint64_t num_runs() const { return num_runs_; }
 
-  void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); }
+  void Disable(bool is_disabled) { aggregate_store_.Disable(is_disabled); }
   void DeleteData() {
-    aggregate_store_->DeleteData();
+    aggregate_store_.DeleteData();
     TriggerBackups();
   }
 
   void ResetInternalMetrics(logger::InternalMetrics* internal_metrics) {
     internal_metrics_ = internal_metrics;
-    aggregate_store_->ResetInternalMetrics(internal_metrics_);
+    aggregate_store_.ResetInternalMetrics(internal_metrics_);
   }
 
  private:
@@ -172,12 +173,12 @@
 
   std::chrono::steady_clock::time_point next_generate_obs_;
   std::chrono::steady_clock::time_point next_gc_;
-  std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+  util::PinnedUniquePtr<util::SteadyClockInterface> steady_clock_;
 
-  std::unique_ptr<AggregateStore> aggregate_store_;
-  std::unique_ptr<util::ConsistentProtoStore> owned_local_aggregate_proto_store_;
-  std::unique_ptr<util::ConsistentProtoStore> owned_obs_history_proto_store_;
-  std::unique_ptr<EventAggregator> event_aggregator_;
+  util::ConsistentProtoStore owned_local_aggregate_proto_store_;
+  util::ConsistentProtoStore owned_obs_history_proto_store_;
+  AggregateStore aggregate_store_;
+  EventAggregator event_aggregator_;
 
   static const std::chrono::seconds kDefaultAggregateBackupInterval;
   static const std::chrono::seconds kDefaultGenerateObsInterval;
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index c2e6d61..e2b488d 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -8,6 +8,7 @@
 
 #include "src/lib/util/clock.h"
 #include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
 #include "src/lib/util/proto_util.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation/test_utils/test_event_aggregator_mgr.h"
@@ -78,13 +79,13 @@
     return test_clock;
   }
 
-  IncrementingSteadyClock* GetTestSteadyClock() {
+  util::NotNullUniquePtr<IncrementingSteadyClock> GetTestSteadyClock() {
     test_steady_clock_ptr_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
-    return test_steady_clock_ptr_;
+    return util::TESTONLY_TakeRawPointer(test_steady_clock_ptr_);
   }
 
   std::unique_ptr<TestEventAggregatorManager> GetEventAggregatorManager(
-      IncrementingSteadyClock* steady_clock) {
+      util::NotNullUniquePtr<IncrementingSteadyClock> steady_clock) {
     CobaltConfig cfg = {.client_secret = system_data::ClientSecret::GenerateNewSecret()};
 
     cfg.local_aggregation_backfill_days = 0;
@@ -93,7 +94,7 @@
 
     auto event_aggregator_mgr = std::make_unique<TestEventAggregatorManager>(
         cfg, fs_, *encoder_, observation_writer_.get(), *metadata_builder_);
-    event_aggregator_mgr->SetSteadyClock(steady_clock);
+    event_aggregator_mgr->SetSteadyClock(std::move(steady_clock));
     return event_aggregator_mgr;
   }
 
@@ -127,7 +128,7 @@
   bool BackUpHappened() { return fs_.FileExists(aggregate_store_path()); }
 
   static uint32_t NumberOfKVPairsInStore(EventAggregatorManager* event_aggregator_mgr) {
-    return event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore().by_report_key().size();
+    return event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore().by_report_key().size();
   }
 
   // Given a ProjectContext |project_context| and the MetricReportId of a UNIQUE_N_DAY_ACTIVES
@@ -144,12 +145,12 @@
             .ValueOrDie();
     event_record->event()->set_day_index(day_index);
     event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
-    return event_aggregator_mgr->GetEventAggregator()->AddUniqueActivesEvent(
-        metric_report_id.second, *event_record);
+    return event_aggregator_mgr->GetEventAggregator().AddUniqueActivesEvent(metric_report_id.second,
+                                                                            *event_record);
   }
 
   static uint32_t GetNumberOfUniqueActivesAggregates(EventAggregatorManager* event_aggregator_mgr) {
-    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore();
     uint32_t num_aggregates = 0;
     for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
       if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
@@ -168,7 +169,7 @@
       EventAggregatorManager* event_aggregator_mgr,
       const std::shared_ptr<const ProjectContext>& project_context,
       const MetricReportId& metric_report_id, uint32_t day_index, uint32_t event_code) {
-    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_.CopyLocalAggregateStore();
     std::string key;
     if (!SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id), &key)) {
       return AssertionFailure() << "Could not serialize key with metric id  "
@@ -293,7 +294,7 @@
   // Provide the EventAggregator with the all_report_types registry.
   auto project_context = GetTestProject(kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of locally aggregated reports in the
@@ -310,7 +311,7 @@
   // Provide the EventAggregator with the all_report_types registry.
   std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK,
@@ -352,7 +353,7 @@
   // Provide the EventAggregator with the all_report_types registry.
   std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK,
@@ -382,7 +383,7 @@
   // Provide the EventAggregator with the all_report_types registry.
   std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   event_aggregator_mgr->Disable(true);
@@ -429,7 +430,7 @@
   // Provide the EventAggregator with the all_report_types registry.
   std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
 
   EXPECT_EQ(StatusCode::OK,
@@ -501,7 +502,7 @@
   TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
 
   EXPECT_EQ(StatusCode::OK, event_aggregator_mgr->GetEventAggregator()
-                                ->UpdateAggregationConfigs(*project_context)
+                                .UpdateAggregationConfigs(*project_context)
                                 .error_code());
   EXPECT_EQ(StatusCode::OK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
                                                   expected_id, day_index, /*event_code*/ 1)
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index 52b012f..f6b8110 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -17,6 +17,7 @@
 
 #include "src/lib/util/clock.h"
 #include "src/lib/util/datetime_util.h"
+#include "src/lib/util/not_null.h"
 #include "src/lib/util/proto_util.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation/aggregation_utils.h"
@@ -121,7 +122,7 @@
     unowned_test_clock_ = test_clock_.get();
     day_store_created_ = CurrentDayIndex();
     test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
-    event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
+    event_aggregator_mgr_->SetSteadyClock(util::TESTONLY_TakeRawPointer(test_steady_clock_));
   }
 
   // Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -142,10 +143,10 @@
                           time_zone);
   }
 
-  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
+  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_.backfill_days_; }
 
   LocalAggregateStore CopyLocalAggregateStore() {
-    return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
   }
 
   void TriggerAndWaitForDoScheduledTasks() {
@@ -187,7 +188,7 @@
             .ValueOrDie();
     event_record->event()->set_day_index(day_index);
     event_record->event()->mutable_event_occurred_event()->set_event_code(event_code);
-    auto status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+    auto status = event_aggregator_mgr_->GetEventAggregator().AddUniqueActivesEvent(
         metric_report_id.second, *event_record);
     if (logged_activity == nullptr) {
       return status;
@@ -219,7 +220,7 @@
     event_count_event->set_component(component);
     event_count_event->add_event_code(event_code);
     event_count_event->set_count(count);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddEventCountEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddEventCountEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -251,7 +252,7 @@
     elapsed_time_event->set_component(component);
     elapsed_time_event->add_event_code(event_code);
     elapsed_time_event->set_elapsed_micros(micros);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddElapsedTimeEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -284,7 +285,7 @@
     frame_rate_event->add_event_code(event_code);
     int64_t frames_per_1000_seconds = std::lround(fps * 1000.0);
     frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddFrameRateEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -319,7 +320,7 @@
       memory_usage_event->add_event_code(event_code);
     }
     memory_usage_event->set_bytes(bytes);
-    Status status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+    Status status = event_aggregator_mgr_->GetEventAggregator().AddMemoryUsageEvent(
         metric_report_id.second, *event_record);
     if (logged_values == nullptr) {
       return status;
@@ -347,7 +348,7 @@
   // of reference.
   bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
                                     uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more UniqueActives
     // aggregates than |logged_activity| and |day_last_garbage_collected_|
     // should imply.
@@ -462,7 +463,7 @@
 
   bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
                                        uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_.CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more PerDeviceNumeric
     // aggregates than |logged_values| and |day_last_garbage_collected_| should
     // imply.
@@ -708,7 +709,7 @@
 
   void SetUp() override {
     EventAggregatorTest::SetUp();
-    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
   }
 
   // Adds an EventOccurredEvent to the local aggregations for the MetricReportId of a locally
@@ -836,7 +837,7 @@
   auto unique_actives_project_context =
       GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
   EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
-                  ->UpdateAggregationConfigs(*unique_actives_project_context)
+                  .UpdateAggregationConfigs(*unique_actives_project_context)
                   .ok());
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of locally aggregated reports in the unique_actives
@@ -875,7 +876,7 @@
   auto unique_actives_project_context =
       GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
   EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
-                  ->UpdateAggregationConfigs(*unique_actives_project_context)
+                  .UpdateAggregationConfigs(*unique_actives_project_context)
                   .ok());
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of locally aggregated reports in the unique_actives
@@ -886,7 +887,7 @@
   auto unique_actives_noise_free_project_context =
       GetTestProject(logger::testing::unique_actives_noise_free::kCobaltRegistryBase64);
   EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
-                  ->UpdateAggregationConfigs(*unique_actives_noise_free_project_context)
+                  .UpdateAggregationConfigs(*unique_actives_noise_free_project_context)
                   .ok());
   // Check that the number of key-value pairs in the LocalAggregateStore is
   // now equal to the number of distinct MetricReportIds of locally
@@ -931,7 +932,7 @@
   std::shared_ptr<ProjectContext> unique_actives_project_context =
       GetTestProject(logger::testing::unique_actives::kCobaltRegistryBase64);
   EXPECT_TRUE(event_aggregator_mgr_->GetEventAggregator()
-                  ->UpdateAggregationConfigs(*unique_actives_project_context)
+                  .UpdateAggregationConfigs(*unique_actives_project_context)
                   .ok());
   // Attempt to log a UniqueActivesEvent for
   // |kEventsOccurredMetricReportId|, which is not in the unique_actives
@@ -946,9 +947,9 @@
   bad_event_record->event()->mutable_event_occurred_event()->set_event_code(0u);
   EXPECT_EQ(StatusCode::INVALID_ARGUMENT,
             event_aggregator_mgr_->GetEventAggregator()
-                ->AddUniqueActivesEvent(logger::testing::unique_actives_noise_free::
-                                            kEventsOccurredEventsOccurredUniqueDevicesReportId,
-                                        *bad_event_record)
+                .AddUniqueActivesEvent(logger::testing::unique_actives_noise_free::
+                                           kEventsOccurredEventsOccurredUniqueDevicesReportId,
+                                       *bad_event_record)
                 .error_code());
   // Attempt to call AddUniqueActivesEvent() with a valid metric and report
   // ID, but with an EventRecord wrapping an Event which is not an
@@ -961,7 +962,7 @@
   EXPECT_EQ(
       StatusCode::INVALID_ARGUMENT,
       event_aggregator_mgr_->GetEventAggregator()
-          ->AddUniqueActivesEvent(
+          .AddUniqueActivesEvent(
               logger::testing::unique_actives::kFeaturesActiveFeaturesActiveUniqueDevicesReportId,
               *bad_event_record2)
           .error_code());
@@ -974,13 +975,12 @@
           logger::testing::per_device_numeric_stats::kConnectionFailuresMetricReportId.first)
           .ValueOrDie();
   bad_event_record3->event()->mutable_event_occurred_event();
-  EXPECT_EQ(
-      StatusCode::INVALID_ARGUMENT,
-      event_aggregator_mgr_->GetEventAggregator()
-          ->AddEventCountEvent(logger::testing::per_device_numeric_stats::
-                                   kConnectionFailuresConnectionFailuresPerDeviceCountReportId,
-                               *bad_event_record3)
-          .error_code());
+  EXPECT_EQ(StatusCode::INVALID_ARGUMENT,
+            event_aggregator_mgr_->GetEventAggregator()
+                .AddEventCountEvent(logger::testing::per_device_numeric_stats::
+                                        kConnectionFailuresConnectionFailuresPerDeviceCountReportId,
+                                    *bad_event_record3)
+                .error_code());
 }
 
 // Tests that the LocalAggregateStore is updated as expected when
diff --git a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
index 0021a10..209c83c 100644
--- a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
+++ b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
@@ -5,6 +5,7 @@
 #ifndef COBALT_SRC_LOCAL_AGGREGATION_TEST_UTILS_TEST_EVENT_AGGREGATOR_MGR_H_
 #define COBALT_SRC_LOCAL_AGGREGATION_TEST_UTILS_TEST_EVENT_AGGREGATOR_MGR_H_
 
+#include "src/lib/util/not_null.h"
 #include "src/local_aggregation/event_aggregator_mgr.h"
 
 namespace cobalt::local_aggregation {
@@ -21,21 +22,21 @@
   // Triggers an out of schedule run of GenerateObservations(). This does not change the schedule
   // of future runs.
   Status GenerateObservations(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return EventAggregatorManager::aggregate_store_->GenerateObservations(day_index_utc,
-                                                                          day_index_local);
+    return EventAggregatorManager::aggregate_store_.GenerateObservations(day_index_utc,
+                                                                         day_index_local);
   }
 
   // Triggers an out of schedule run of GarbageCollect(). This does not change the schedule of
   // future runs.
   Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return EventAggregatorManager::aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+    return EventAggregatorManager::aggregate_store_.GarbageCollect(day_index_utc, day_index_local);
   }
 
   // Returns the number of aggregates of type per_device_numeric_aggregates.
   size_t NumPerDeviceNumericAggregatesInStore() {
     size_t count = 0;
     for (const auto& aggregates :
-         EventAggregatorManager::aggregate_store_->protected_aggregate_store_.lock()
+         EventAggregatorManager::aggregate_store_.protected_aggregate_store_.lock()
              ->local_aggregate_store.by_report_key()) {
       if (aggregates.second.has_numeric_aggregates()) {
         count += aggregates.second.numeric_aggregates().by_component().size();
@@ -45,7 +46,9 @@
   }
 
   // Sets the EventAggregatorManager's SteadyClockInterface.
-  void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
+  void SetSteadyClock(util::NotNullUniquePtr<util::SteadyClockInterface> clock) {
+    steady_clock_ = std::move(clock);
+  }
 };
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/logger/event_loggers.cc b/src/logger/event_loggers.cc
index 4e95319..0c56cf7 100644
--- a/src/logger/event_loggers.cc
+++ b/src/logger/event_loggers.cc
@@ -33,7 +33,7 @@
 
 std::unique_ptr<EventLogger> EventLogger::Create(
     MetricDefinition::MetricType metric_type, const Encoder& encoder,
-    EventAggregator* event_aggregator, local_aggregation::LocalAggregation* local_aggregation,
+    EventAggregator& event_aggregator, local_aggregation::LocalAggregation* local_aggregation,
     const ObservationWriter* observation_writer,
     const system_data::SystemDataInterface* system_data,
     util::CivilTimeConverterInterface* civil_time_converter) {
@@ -566,7 +566,7 @@
                                                              const EventRecord& event_record) {
   switch (report.report_type()) {
     case ReportDefinition::UNIQUE_N_DAY_ACTIVES: {
-      return event_aggregator()->AddUniqueActivesEvent(report.id(), event_record);
+      return event_aggregator().AddUniqueActivesEvent(report.id(), event_record);
     }
     default:
       return Status::OkStatus();
@@ -620,7 +620,7 @@
   switch (report.report_type()) {
     case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
     case ReportDefinition::PER_DEVICE_HISTOGRAM: {
-      return event_aggregator()->AddEventCountEvent(report.id(), event_record);
+      return event_aggregator().AddEventCountEvent(report.id(), event_record);
     }
     default:
       return Status::OkStatus();
@@ -688,7 +688,7 @@
   switch (report.report_type()) {
     case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
     case ReportDefinition::PER_DEVICE_HISTOGRAM: {
-      return event_aggregator()->AddElapsedTimeEvent(report.id(), event_record);
+      return event_aggregator().AddElapsedTimeEvent(report.id(), event_record);
     }
     default:
       return Status::OkStatus();
@@ -724,7 +724,7 @@
   switch (report.report_type()) {
     case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
     case ReportDefinition::PER_DEVICE_HISTOGRAM: {
-      return event_aggregator()->AddFrameRateEvent(report.id(), event_record);
+      return event_aggregator().AddFrameRateEvent(report.id(), event_record);
     }
     default:
       return Status::OkStatus();
@@ -759,7 +759,7 @@
   switch (report.report_type()) {
     case ReportDefinition::PER_DEVICE_NUMERIC_STATS:
     case ReportDefinition::PER_DEVICE_HISTOGRAM: {
-      return event_aggregator()->AddMemoryUsageEvent(report.id(), event_record);
+      return event_aggregator().AddMemoryUsageEvent(report.id(), event_record);
     }
     default:
       return Status::OkStatus();
diff --git a/src/logger/event_loggers.h b/src/logger/event_loggers.h
index a87a175..432c15e 100644
--- a/src/logger/event_loggers.h
+++ b/src/logger/event_loggers.h
@@ -33,7 +33,7 @@
 // of EventLogger for each of several Metric types.
 class EventLogger {
  public:
-  EventLogger(const Encoder& encoder, local_aggregation::EventAggregator* event_aggregator,
+  EventLogger(const Encoder& encoder, local_aggregation::EventAggregator& event_aggregator,
               local_aggregation::LocalAggregation* local_aggregation,
               const ObservationWriter* observation_writer,
               const system_data::SystemDataInterface* system_data,
@@ -54,7 +54,7 @@
   // The remaining parameters are passed to the EventLogger constructor.
   static std::unique_ptr<EventLogger> Create(
       MetricDefinition::MetricType metric_type, const Encoder& encoder,
-      local_aggregation::EventAggregator* event_aggregator,
+      local_aggregation::EventAggregator& event_aggregator,
       local_aggregation::LocalAggregation* local_aggregation,
       const ObservationWriter* observation_writer,
       const system_data::SystemDataInterface* system_data,
@@ -73,7 +73,7 @@
 
  protected:
   const Encoder& encoder() { return encoder_; }
-  local_aggregation::EventAggregator* event_aggregator() { return event_aggregator_; }
+  local_aggregation::EventAggregator& event_aggregator() { return event_aggregator_; }
   local_aggregation::LocalAggregation* local_aggregation() { return local_aggregation_; }
 
   static Encoder::Result BadReportType(const std::string& full_metric_name,
@@ -152,7 +152,7 @@
   static void TraceLogSuccess(const EventRecord& event_record, const std::string& trace);
 
   const Encoder& encoder_;
-  local_aggregation::EventAggregator* event_aggregator_;
+  local_aggregation::EventAggregator& event_aggregator_;
   local_aggregation::LocalAggregation* local_aggregation_;
   const ObservationWriter* observation_writer_;
   const system_data::SystemDataInterface* system_data_;
diff --git a/src/logger/event_loggers_test.cc b/src/logger/event_loggers_test.cc
index ac5b46d..d71be4c 100644
--- a/src/logger/event_loggers_test.cc
+++ b/src/logger/event_loggers_test.cc
@@ -123,7 +123,7 @@
         cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
         observation_writer_.get(), civil_time_converter_.get());
     project_context_ = GetTestProject(registry_base64);
-    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
     logger_ = std::make_unique<EventLoggerClass>(
         *encoder_, event_aggregator_mgr_->GetEventAggregator(), local_aggregation_.get(),
         observation_writer_.get(), &system_data_, civil_time_converter_.get());
@@ -1508,7 +1508,7 @@
 
     event_aggregator_mgr_ = std::make_unique<TestEventAggregatorManager>(
         cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
-    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
     local_aggregation_ = std::make_unique<LocalAggregation>(
         cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
         observation_writer_.get(), civil_time_converter_.get());
@@ -1752,7 +1752,7 @@
 
     event_aggregator_mgr_ = std::make_unique<TestEventAggregatorManager>(
         cfg, fs(), *encoder_, observation_writer_.get(), *metadata_builder_);
-    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator().UpdateAggregationConfigs(*project_context_);
     local_aggregation_ = std::make_unique<LocalAggregation>(
         cfg, project_context_factory_.get(), &system_data_, *metadata_builder_, fs(),
         observation_writer_.get(), civil_time_converter_.get());
diff --git a/src/logger/logger.cc b/src/logger/logger.cc
index b33ff6f..279775e 100644
--- a/src/logger/logger.cc
+++ b/src/logger/logger.cc
@@ -41,7 +41,7 @@
 }  // namespace
 
 Logger::Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
-               EventAggregator* event_aggregator,
+               EventAggregator& event_aggregator,
                local_aggregation::LocalAggregation* local_aggregation,
                ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
                util::CivilTimeConverterInterface* civil_time_converter,
@@ -53,7 +53,7 @@
              enable_replacement_metrics, internal_metrics) {}
 
 Logger::Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
-               EventAggregator* event_aggregator,
+               EventAggregator& event_aggregator,
                local_aggregation::LocalAggregation* local_aggregation,
                ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
                util::ValidatedClockInterface* validated_clock,
@@ -74,7 +74,6 @@
       enable_replacement_metrics_(enable_replacement_metrics),
       internal_metrics_(internal_metrics) {
   CHECK(project_context_);
-  CHECK(event_aggregator_);
   CHECK(local_aggregation_);
   CHECK(observation_writer_);
   CHECK(civil_time_converter_);
@@ -85,7 +84,7 @@
   }
   // If we were passed an internal Logger then we are not an internal Logger.
   is_internal_logger_ = !internal_metrics;
-  Status status = event_aggregator_->UpdateAggregationConfigs(*project_context_);
+  Status status = event_aggregator_.UpdateAggregationConfigs(*project_context_);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to provide aggregation configurations to the "
                   "EventAggregator. "
diff --git a/src/logger/logger.h b/src/logger/logger.h
index fc10bcc..09e4d77 100644
--- a/src/logger/logger.h
+++ b/src/logger/logger.h
@@ -76,7 +76,7 @@
   // Logger to send metrics about Cobalt to Cobalt. If nullptr, no such internal
   // logging will be performed by this Logger.
   Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
-         local_aggregation::EventAggregator* event_aggregator,
+         local_aggregation::EventAggregator& event_aggregator,
          local_aggregation::LocalAggregation* local_aggregation,
          ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
          util::CivilTimeConverterInterface* civil_time_converter,
@@ -125,7 +125,7 @@
   // Logger to send metrics about Cobalt to Cobalt. If nullptr, no such internal
   // logging will be performed by this Logger.
   Logger(std::unique_ptr<ProjectContext> project_context, const Encoder& encoder,
-         local_aggregation::EventAggregator* event_aggregator,
+         local_aggregation::EventAggregator& event_aggregator,
          local_aggregation::LocalAggregation* local_aggregation,
          ObservationWriter* observation_writer, system_data::SystemDataInterface* system_data,
          util::ValidatedClockInterface* validated_clock,
@@ -202,7 +202,7 @@
   const std::shared_ptr<const ProjectContext> project_context_;
 
   const Encoder& encoder_;
-  local_aggregation::EventAggregator* event_aggregator_;
+  local_aggregation::EventAggregator& event_aggregator_;
   local_aggregation::LocalAggregation* local_aggregation_;
   const ObservationWriter* observation_writer_;
   const system_data::SystemDataInterface* system_data_;
diff --git a/src/logger/undated_event_manager.cc b/src/logger/undated_event_manager.cc
index 3085deb..e6b026e 100644
--- a/src/logger/undated_event_manager.cc
+++ b/src/logger/undated_event_manager.cc
@@ -19,7 +19,7 @@
 
 using local_aggregation::EventAggregator;
 
-UndatedEventManager::UndatedEventManager(const Encoder& encoder, EventAggregator* event_aggregator,
+UndatedEventManager::UndatedEventManager(const Encoder& encoder, EventAggregator& event_aggregator,
                                          local_aggregation::LocalAggregation* local_aggregation,
                                          ObservationWriter* observation_writer,
                                          system_data::SystemDataInterface* system_data,
@@ -32,7 +32,6 @@
       system_data_(system_data),
       civil_time_converter_(civil_time_converter),
       max_saved_events_(max_saved_events) {
-  CHECK(event_aggregator_);
   CHECK(local_aggregation_);
   CHECK(observation_writer_);
   CHECK(civil_time_converter_);
diff --git a/src/logger/undated_event_manager.h b/src/logger/undated_event_manager.h
index 1cd0d91..a7b8ae9 100644
--- a/src/logger/undated_event_manager.h
+++ b/src/logger/undated_event_manager.h
@@ -51,7 +51,7 @@
   //
   // |max_saved_events| The maximum number of saved events. When this limit is reached, old events
   // are dropped to make room for new events.
-  UndatedEventManager(const Encoder& encoder, local_aggregation::EventAggregator* event_aggregator,
+  UndatedEventManager(const Encoder& encoder, local_aggregation::EventAggregator& event_aggregator,
                       local_aggregation::LocalAggregation* local_aggregation,
                       ObservationWriter* observation_writer,
                       system_data::SystemDataInterface* system_data,
@@ -85,7 +85,7 @@
 
   // Used only to construct EventLogger instances.
   const Encoder& encoder_;
-  local_aggregation::EventAggregator* event_aggregator_;
+  local_aggregation::EventAggregator& event_aggregator_;
   local_aggregation::LocalAggregation* local_aggregation_;
   const ObservationWriter* observation_writer_;
   const system_data::SystemDataInterface* system_data_;