[cobalt][LocalAggregation] Move EventAggregatorManager code to the
appropriate files

- removes thread management and memory management from the
EventAggregator and moves it to the EventAggregatorManager
- Small changes to tests to match this change
- deletes GenerateObservationsNoWorker()
- the AggregateStoreTest is minimally changed as it will be reworked in
a following CL.

Bug: 40853
Change-Id: Id409279bc92edbbe50089a0eaa32bc8e5d5230aa
diff --git a/src/bin/test_app/test_app.cc b/src/bin/test_app/test_app.cc
index 33269cc..a2caedf 100644
--- a/src/bin/test_app/test_app.cc
+++ b/src/bin/test_app/test_app.cc
@@ -418,8 +418,7 @@
 }
 
 bool RealLoggerFactory::GenerateAggregatedObservations(uint32_t day_index) {
-  return kOK == event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->GenerateObservations(
-                    day_index);
+  return kOK == event_aggregator_mgr_->aggregate_store_->GenerateObservations(day_index);
 }
 
 bool RealLoggerFactory::SendAccumulatedObservations() {
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 7875d43..c1c5e16 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -147,7 +147,6 @@
                                       uint32_t final_day_index_local = 0u);
 
  private:
-  friend class EventAggregator;  // used for transition during redesign.
   friend class AggregateStoreTest;
   friend class EventAggregatorTest;
   friend class EventAggregatorManagerTest;
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 5237df2..272e216 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -18,7 +18,7 @@
 #include "src/lib/util/datetime_util.h"
 #include "src/lib/util/proto_util.h"
 #include "src/local_aggregation/aggregation_utils.h"
-#include "src/local_aggregation/event_aggregator.h"
+#include "src/local_aggregation/event_aggregator_mgr.h"
 #include "src/logger/logger_test_utils.h"
 #include "src/logger/testing_constants.h"
 #include "src/pb/event.pb.h"
@@ -204,9 +204,9 @@
   }
 
   void ResetEventAggregator() {
-    event_aggregator_ = std::make_unique<EventAggregator>(encoder_.get(), observation_writer_.get(),
-                                                          local_aggregate_proto_store_.get(),
-                                                          obs_history_proto_store_.get());
+    event_aggregator_mgr_ = std::make_unique<EventAggregatorManager>(
+        encoder_.get(), observation_writer_.get(), local_aggregate_proto_store_.get(),
+        obs_history_proto_store_.get());
     // Pass this clock to the EventAggregator::Start method, if it is called.
     test_clock_ = std::make_unique<IncrementingSystemClock>(std::chrono::system_clock::duration(0));
     // Initilize it to 10 years after the beginning of time.
@@ -215,13 +215,13 @@
     unowned_test_clock_ = test_clock_.get();
     day_store_created_ = CurrentDayIndex();
     test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
-    event_aggregator_->SetSteadyClock(test_steady_clock_);
+    event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
   }
 
   // Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
   // before destructing the objects which the EventAggregator points to but does
   // not own.
-  void TearDown() override { event_aggregator_.reset(); }
+  void TearDown() override { event_aggregator_mgr_.reset(); }
 
   // Advances |test_clock_| by |num_seconds| seconds.
   void AdvanceClock(int num_seconds) {
@@ -236,45 +236,45 @@
                           time_zone);
   }
 
-  size_t GetBackfillDays() { return event_aggregator_->aggregate_store_->backfill_days_; }
+  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
 
   void SetBackfillDays(size_t num_days) {
-    event_aggregator_->aggregate_store_->backfill_days_ = num_days;
+    event_aggregator_mgr_->aggregate_store_->backfill_days_ = num_days;
   }
 
   Status BackUpLocalAggregateStore() {
-    return event_aggregator_->aggregate_store_->BackUpLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_->BackUpLocalAggregateStore();
   }
 
   Status BackUpObservationHistory() {
-    return event_aggregator_->aggregate_store_->BackUpObservationHistory();
+    return event_aggregator_mgr_->aggregate_store_->BackUpObservationHistory();
   }
 
   LocalAggregateStore MakeNewLocalAggregateStore(
       uint32_t version = kCurrentLocalAggregateStoreVersion) {
-    return event_aggregator_->aggregate_store_->MakeNewLocalAggregateStore(version);
+    return event_aggregator_mgr_->aggregate_store_->MakeNewLocalAggregateStore(version);
   }
 
   AggregatedObservationHistoryStore MakeNewObservationHistoryStore(
       uint32_t version = kCurrentObservationHistoryStoreVersion) {
-    return event_aggregator_->aggregate_store_->MakeNewObservationHistoryStore(version);
+    return event_aggregator_mgr_->aggregate_store_->MakeNewObservationHistoryStore(version);
   }
 
   Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store) {
-    return event_aggregator_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
+    return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeLocalAggregateStore(store);
   }
 
   Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store) {
-    return event_aggregator_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
+    return event_aggregator_mgr_->aggregate_store_->MaybeUpgradeObservationHistoryStore(store);
   }
 
   LocalAggregateStore CopyLocalAggregateStore() {
-    return event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
   }
 
   Status GenerateObservations(uint32_t final_day_index_utc, uint32_t final_day_index_local = 0u) {
-    return event_aggregator_->GenerateObservationsNoWorker(final_day_index_utc,
-                                                           final_day_index_local);
+    return event_aggregator_mgr_->aggregate_store_->GenerateObservations(final_day_index_utc,
+                                                                         final_day_index_local);
   }
 
   bool IsReportInStore(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
@@ -287,7 +287,7 @@
     key_data.set_report_id(report_id);
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
     return locked->local_aggregate_store.by_report_key().count(key) > 0;
   }
 
@@ -301,7 +301,7 @@
     key_data.set_report_id(report_id);
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
 
     auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
     if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -333,7 +333,7 @@
     key_data.set_report_id(report_id);
     SerializeToBase64(key_data, &key);
 
-    auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+    auto locked = event_aggregator_mgr_->aggregate_store_->protected_aggregate_store_.lock();
 
     auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
     if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
@@ -358,18 +358,18 @@
     return by_day_index->second.numeric_daily_aggregate().value();
   }
 
-  AggregateStore* GetAggregateStore() { return event_aggregator_->aggregate_store_.get(); }
+  AggregateStore* GetAggregateStore() { return event_aggregator_mgr_->aggregate_store_.get(); }
 
   Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return event_aggregator_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
+    return event_aggregator_mgr_->aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
   }
 
   void DoScheduledTasksNow() {
     // Steady values don't matter, just tell DoScheduledTasks to run everything.
     auto steady_time = std::chrono::steady_clock::now();
-    event_aggregator_->next_generate_obs_ = steady_time;
-    event_aggregator_->next_gc_ = steady_time;
-    event_aggregator_->DoScheduledTasks(unowned_test_clock_->now(), steady_time);
+    event_aggregator_mgr_->next_generate_obs_ = steady_time;
+    event_aggregator_mgr_->next_gc_ = steady_time;
+    event_aggregator_mgr_->DoScheduledTasks(unowned_test_clock_->now(), steady_time);
   }
 
   // Clears the FakeObservationStore and resets the counts of Observations
@@ -392,7 +392,8 @@
     EventRecord event_record(std::move(project_context), metric_report_id.first);
     event_record.event()->set_day_index(day_index);
     event_record.event()->mutable_occurrence_event()->set_event_code(event_code);
-    auto status = event_aggregator_->AddUniqueActivesEvent(metric_report_id.second, event_record);
+    auto status = event_aggregator_mgr_->GetEventAggregator()->AddUniqueActivesEvent(
+        metric_report_id.second, event_record);
     if (logged_activity == nullptr) {
       return status;
     }
@@ -421,7 +422,8 @@
     count_event->set_component(component);
     count_event->add_event_code(event_code);
     count_event->set_count(count);
-    auto status = event_aggregator_->AddCountEvent(metric_report_id.second, event_record);
+    auto status = event_aggregator_mgr_->GetEventAggregator()->AddCountEvent(
+        metric_report_id.second, event_record);
     if (logged_values == nullptr) {
       return status;
     }
@@ -450,7 +452,8 @@
     elapsed_time_event->set_component(component);
     elapsed_time_event->add_event_code(event_code);
     elapsed_time_event->set_elapsed_micros(micros);
-    auto status = event_aggregator_->AddElapsedTimeEvent(metric_report_id.second, event_record);
+    auto status = event_aggregator_mgr_->GetEventAggregator()->AddElapsedTimeEvent(
+        metric_report_id.second, event_record);
     if (logged_values == nullptr) {
       return status;
     }
@@ -480,7 +483,8 @@
     frame_rate_event->add_event_code(event_code);
     int64_t frames_per_1000_seconds = std::round(fps * 1000.0);
     frame_rate_event->set_frames_per_1000_seconds(frames_per_1000_seconds);
-    auto status = event_aggregator_->AddFrameRateEvent(metric_report_id.second, event_record);
+    auto status = event_aggregator_mgr_->GetEventAggregator()->AddFrameRateEvent(
+        metric_report_id.second, event_record);
     if (logged_values == nullptr) {
       return status;
     }
@@ -512,7 +516,8 @@
       memory_usage_event->add_event_code(event_code);
     }
     memory_usage_event->set_bytes(bytes);
-    auto status = event_aggregator_->AddMemoryUsageEvent(metric_report_id.second, event_record);
+    auto status = event_aggregator_mgr_->GetEventAggregator()->AddMemoryUsageEvent(
+        metric_report_id.second, event_record);
     if (logged_values == nullptr) {
       return status;
     }
@@ -539,7 +544,7 @@
   // of reference.
   bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
                                     uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more UniqueActives
     // aggregates than |logged_activity| and |day_last_garbage_collected_|
     // should imply.
@@ -654,7 +659,7 @@
 
   bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
                                        uint32_t /*current_day_index*/) {
-    auto local_aggregate_store = event_aggregator_->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more PerDeviceNumeric
     // aggregates than |logged_values| and |day_last_garbage_collected_| should
     // imply.
@@ -869,7 +874,7 @@
                : day_store_created_ - backfill_days;
   }
 
-  std::unique_ptr<EventAggregator> event_aggregator_;
+  std::unique_ptr<EventAggregatorManager> event_aggregator_mgr_;
   std::unique_ptr<MockConsistentProtoStore> local_aggregate_proto_store_;
   std::unique_ptr<MockConsistentProtoStore> obs_history_proto_store_;
   std::unique_ptr<ObservationWriter> observation_writer_;
@@ -901,7 +906,7 @@
 
   void SetUp() override {
     AggregateStoreTest::SetUp();
-    event_aggregator_->UpdateAggregationConfigs(*project_context_);
+    event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context_);
   }
 
   // Adds an OccurrenceEvent to the local aggregations for the MetricReportId of a locally
@@ -1015,17 +1020,17 @@
  protected:
   void SetUp() override { AggregateStoreTest::SetUp(); }
 
-  void ShutDownWorkerThread() { event_aggregator_->ShutDown(); }
+  void ShutDownWorkerThread() { event_aggregator_mgr_->ShutDown(); }
 
   bool in_shutdown_state() { return (shutdown_flag_set() && !worker_joinable()); }
 
   bool in_run_state() { return (!shutdown_flag_set() && worker_joinable()); }
 
   bool shutdown_flag_set() {
-    return event_aggregator_->protected_worker_thread_controller_.const_lock()->shut_down;
+    return event_aggregator_mgr_->protected_worker_thread_controller_.const_lock()->shut_down;
   }
 
-  bool worker_joinable() { return event_aggregator_->worker_thread_.joinable(); }
+  bool worker_joinable() { return event_aggregator_mgr_->worker_thread_.joinable(); }
 };
 
 // Tests that the Read() method of each ConsistentProtoStore is called once
@@ -1154,7 +1159,8 @@
                                                           kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
 
   EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
                         kTestEventCode, kTestDayIndex));
@@ -1170,7 +1176,8 @@
                                                           kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
   EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
                         kTestEventCode, kTestDayIndex));
 
@@ -1189,7 +1196,8 @@
                                                                  kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
   ASSERT_EQ(kInvalidArguments,
             GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
                                            kTestReportId, kTestEventCode, kTestDayIndex));
@@ -1205,7 +1213,8 @@
   const int64_t kFirstValue = 3;
   const int64_t kSecondValue = 7;
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
 
   EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
                      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1229,7 +1238,8 @@
   const int64_t kFirstValue = 3;
   const int64_t kSecondValue = 7;
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
 
   EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
                      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1252,7 +1262,8 @@
   const int64_t kFirstValue = 3;
   const int64_t kSecondValue = 7;
 
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
 
   EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
                      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
@@ -1284,7 +1295,8 @@
 TEST_F(AggregateStoreTest, GenerateObservationsNoEvents) {
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
   // Generate locally aggregated Observations for the current day index.
   EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
   std::vector<Observation2> observations(0);
@@ -1298,7 +1310,8 @@
 TEST_F(AggregateStoreTest, GenerateObservationsTwice) {
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
   // Check that Observations are generated when GenerateObservations is called
   // for the current day index for the first time.
   auto current_day_index = CurrentDayIndex();
@@ -1338,7 +1351,8 @@
   ResetEventAggregator();
   // Provide the all_report_types test registry to the EventAggregator.
   auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
-  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+  EXPECT_EQ(
+      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
   EXPECT_EQ(kOK, GenerateObservations(CurrentDayIndex()));
   std::vector<Observation2> observations(0);
   EXPECT_TRUE(FetchAggregatedObservations(
diff --git a/src/local_aggregation/event_aggregator.cc b/src/local_aggregation/event_aggregator.cc
index f0a2e38..d54aa98 100644
--- a/src/local_aggregation/event_aggregator.cc
+++ b/src/local_aggregation/event_aggregator.cc
@@ -16,50 +16,14 @@
 
 namespace cobalt::local_aggregation {
 
-using logger::Encoder;
 using logger::EventRecord;
 using logger::kInvalidArguments;
 using logger::kOK;
-using logger::kOther;
-using logger::ObservationWriter;
 using logger::ProjectContext;
 using logger::Status;
-using util::ConsistentProtoStore;
-using util::SteadyClock;
-using util::TimeToDayIndex;
 
-EventAggregator::EventAggregator(const Encoder* encoder,
-                                 const ObservationWriter* observation_writer,
-                                 ConsistentProtoStore* local_aggregate_proto_store,
-                                 ConsistentProtoStore* obs_history_proto_store,
-                                 const size_t backfill_days,
-                                 const std::chrono::seconds aggregate_backup_interval,
-                                 const std::chrono::seconds generate_obs_interval,
-                                 const std::chrono::seconds gc_interval) {
-  CHECK_LE(aggregate_backup_interval.count(), generate_obs_interval.count())
-      << "aggregate_backup_interval must be less than or equal to "
-         "generate_obs_interval";
-  CHECK_LE(aggregate_backup_interval.count(), gc_interval.count())
-      << "aggregate_backup_interval must be less than or equal to gc_interval";
-  aggregate_backup_interval_ = aggregate_backup_interval;
-  generate_obs_interval_ = generate_obs_interval;
-  gc_interval_ = gc_interval;
-
-  aggregate_store_ =
-      std::make_unique<AggregateStore>(encoder, observation_writer, local_aggregate_proto_store,
-                                       obs_history_proto_store, backfill_days);
-
-  steady_clock_ = std::make_unique<SteadyClock>();
-}
-
-void EventAggregator::Start(std::unique_ptr<util::SystemClockInterface> clock) {
-  auto locked = protected_worker_thread_controller_.lock();
-  locked->shut_down = false;
-  std::thread t(std::bind(
-      [this](std::unique_ptr<util::SystemClockInterface>& clock) { this->Run(std::move(clock)); },
-      std::move(clock)));
-  worker_thread_ = std::move(t);
-}
+EventAggregator::EventAggregator(AggregateStore* aggregate_store)
+    : aggregate_store_(aggregate_store) {}
 
 // TODO(pesk): update the EventAggregator's view of a Metric
 // or ReportDefinition when appropriate.
@@ -190,107 +154,4 @@
       memory_usage_event.component(), config::PackEventCodes(memory_usage_event.event_code()),
       event->day_index(), memory_usage_event.bytes());
 }
-
-Status EventAggregator::GenerateObservationsNoWorker(uint32_t final_day_index_utc,
-                                                     uint32_t final_day_index_local) {
-  if (worker_thread_.joinable()) {
-    LOG(ERROR) << "GenerateObservationsNoWorker() was called while "
-                  "worker thread was running.";
-    return kOther;
-  }
-  return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local);
-}
-
-void EventAggregator::ShutDown() {
-  if (worker_thread_.joinable()) {
-    {
-      auto locked = protected_worker_thread_controller_.lock();
-      locked->shut_down = true;
-      locked->shutdown_notifier.notify_all();
-    }
-    worker_thread_.join();
-  } else {
-    protected_worker_thread_controller_.lock()->shut_down = true;
-  }
-}
-
-void EventAggregator::Run(std::unique_ptr<util::SystemClockInterface> system_clock) {
-  std::chrono::steady_clock::time_point steady_time = steady_clock_->now();
-  // Schedule Observation generation to happen in the first cycle.
-  next_generate_obs_ = steady_time;
-  // Schedule garbage collection to happen |gc_interval_| seconds from now.
-  next_gc_ = steady_time + gc_interval_;
-  // Acquire the mutex protecting the shutdown flag and condition variable.
-  auto locked = protected_worker_thread_controller_.lock();
-  while (true) {
-    // If shutdown has been requested, back up the LocalAggregateStore and
-    // exit.
-    if (locked->shut_down) {
-      aggregate_store_->BackUpLocalAggregateStore();
-      return;
-    }
-    // Sleep until the next scheduled backup of the LocalAggregateStore or
-    // until notified of shutdown. Back up the LocalAggregateStore after
-    // waking.
-    locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
-      if (locked->immediate_run_trigger) {
-        locked->immediate_run_trigger = false;
-        return true;
-      }
-      return locked->shut_down;
-    });
-    aggregate_store_->BackUpLocalAggregateStore();
-    // If the worker thread was woken up by a shutdown request, exit.
-    // Otherwise, complete any scheduled Observation generation and garbage
-    // collection.
-    if (locked->shut_down) {
-      return;
-    }
-    // Check whether it is time to generate Observations or to garbage-collect
-    // the LocalAggregate store. If so, do that task and schedule the next
-    // occurrence.
-    DoScheduledTasks(system_clock->now(), steady_clock_->now());
-  }
-}
-
-void EventAggregator::DoScheduledTasks(std::chrono::system_clock::time_point system_time,
-                                       std::chrono::steady_clock::time_point steady_time) {
-  auto current_time_t = std::chrono::system_clock::to_time_t(system_time);
-  auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1;
-  auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1;
-
-  // Skip the tasks (but do schedule a retry) if either day index is too small.
-  uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + aggregate_store_->backfill_days_;
-  bool skip_tasks =
-      (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index);
-  if (steady_time >= next_generate_obs_) {
-    next_generate_obs_ += generate_obs_interval_;
-    if (skip_tasks) {
-      LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the "
-                                "current day index is too small.";
-    } else {
-      auto obs_status = aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
-      if (obs_status == kOK) {
-        aggregate_store_->BackUpObservationHistory();
-      } else {
-        LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
-      }
-    }
-  }
-  if (steady_time >= next_gc_) {
-    next_gc_ += gc_interval_;
-    if (skip_tasks) {
-      LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
-                                "current day index is too small.";
-    } else {
-      auto gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
-      if (gc_status == kOK) {
-        aggregate_store_->BackUpLocalAggregateStore();
-      } else {
-        LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
-      }
-    }
-  }
-}
-
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator.h b/src/local_aggregation/event_aggregator.h
index 116cfda..d6a887c 100644
--- a/src/local_aggregation/event_aggregator.h
+++ b/src/local_aggregation/event_aggregator.h
@@ -18,19 +18,7 @@
 #include "src/logger/observation_writer.h"
 #include "src/logger/status.h"
 
-
-namespace cobalt {
-
-// Forward declaration used for friend tests. These will be removed once a better solution is
-// designed.
-// TODO(ninai): remove this
-namespace internal {
-
-class RealLoggerFactory;
-
-}  // namespace internal
-
-namespace local_aggregation {
+namespace cobalt::local_aggregation {
 
 // The EventAggregator manages the Loggers' interactions with the local aggregation.
 //
@@ -39,74 +27,13 @@
 // provide the EventAggregator with its view of Cobalt's metric and report
 // registry.
 // (2) When logging an Event for a locally aggregated report, a Logger
-// calls a Log*() method with the EventRecord and the report id for the event.
-//
-// Functionality that the EventAggregator curently has but will be moved to the
-// EventAggregatorManager:
-// A worker thread calls on AggregateStore methods to do the following tasks at intervals
-// specified in the EventAggregator's constructor:
-// (1) Calls BackUp*() to back up the EventAggregator's state to the file system.
-// (2) Calls GenerateObservations() with the previous day's day index to generate all Observations
-// for rolling windows ending on that day index, as well as any missing Observations for a specified
-// number of days in the past.
-// (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates
-// for any windows of interest in the future.
+// calls an Add*() method with the EventRecord and the report id for the event.
 class EventAggregator {
  public:
   // Constructs an EventAggregator.
   //
-  // encoder: the singleton instance of an Encoder on the system.
-  //
-  // local_aggregate_proto_store: A ConsistentProtoStore to be used by the
-  // AggregateStore to store snapshots of its in-memory store of event
-  // aggregates.
-  //
-  // obs_history_proto_store: A ConsistentProtoStore to be used by the
-  // AggregateStore to store snapshots of its in-memory history of generated
-  // Observations.
-  //
-  // backfill_days: the number of past days for which the AggregateStore
-  // generates and sends Observations, in addition to a requested day index.
-  // See the comment above AggreateStoe::GenerateObservations for more
-  // detail. The constructor CHECK-fails if a value larger than
-  // |kEventAggregatorMaxAllowedBackfillDays| is passed.
-  //
-  // aggregate_backup_interval: the interval in seconds at which a snapshot of
-  // the in-memory store of event aggregates should be written to
-  // |local_aggregate_proto_store|.
-  //
-  // generate_obs_interval: the interval in seconds at which the
-  // EventAggregator should generate Observations.
-  //
-  // gc_interval: the interval in seconds at which the LocalAggregateStore
-  // should be garbage-collected.
-  //
-  // The constructor CHECK-fails if the value of |aggregate_backup_interval| is
-  // larger than either of |generate_obs_interval| or |gc_interval|. In
-  // practice, the value of |aggregate_backup_interval| should be small relative
-  // to the values of |generate_obs_interval| and |gc_interval|, since each of
-  // Observation generation and garbage collection will be done at the smallest
-  // multiple of |aggregate_backup_interval| which is greater than or equal to
-  // its specified interval.
-  EventAggregator(const logger::Encoder* encoder,
-                  const logger::ObservationWriter* observation_writer,
-                  util::ConsistentProtoStore* local_aggregate_proto_store,
-                  util::ConsistentProtoStore* obs_history_proto_store, size_t backfill_days = 0,
-                  std::chrono::seconds aggregate_backup_interval = std::chrono::minutes(1),
-                  std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
-                  std::chrono::seconds gc_interval = kOneDay);
-
-  // Shut down the worker thread before destructing the EventAggregator.
-  ~EventAggregator() { ShutDown(); }
-
-  // Starts the worker thread.
-  //
-  // |clock| The clock that should be used by the worker thread for scheduling
-  //         tasks and determining the current day and hour. On systems on which
-  //         the clock may be initially inaccurate, the caller should wait to
-  //         invoke this method until after it is known that the clock is
-  //         accurate.
-  void Start(std::unique_ptr<util::SystemClockInterface> clock);
+  // aggregate_store: an AggregateStore, which is used to store the local aggregates.
+  explicit EventAggregator(AggregateStore* aggregate_store);
 
   // Updates the EventAggregator's view of the Cobalt metric and report
   // registry.
@@ -166,85 +93,10 @@
   // AddMemoryUsageEvent: |event_record| should wrap a MemoryUsageEvent.
   logger::Status AddMemoryUsageEvent(uint32_t report_id, const logger::EventRecord& event_record);
 
-  // Checks that the worker thread is shut down, and if so, calls
-  // AggregateStore::GenerateObservations() and returns its result. Returns kOther if the
-  // worker thread is joinable. See the documentation on AggregateStore::GenerateObservations()
-  // for a description of the parameters.
-  //
-  // This method is intended for use in tests which require a single thread to
-  // both log events to and generate Observations from an EventAggregator.
-  logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
-                                              uint32_t final_day_index_local = 0u);
-
  private:
-  friend class EventAggregatorManager;  // used for transition during redesign.
-  friend class AggregateStoreTest;
-  friend class AggregateStoreWorkerTest;
-  friend class EventAggregatorTest;
-  friend class EventAggregatorManagerTest;
-  friend class EventAggregatorWorkerTest;
-  friend class TestEventAggregatorManager;
-  friend class internal::RealLoggerFactory;
-
-  // Request that the worker thread shut down and wait for it to exit. The
-  // worker thread backs up the LocalAggregateStore before exiting.
-  void ShutDown();
-
-  // Main loop executed by the worker thread. The thread sleeps for
-  // |aggregate_backup_interval_| seconds or until notified of shutdown, then
-  // calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
-  // DoScheduledTasks() and schedules the next occurrence of any completed
-  // tasks.
-  void Run(std::unique_ptr<util::SystemClockInterface> system_clock);
-
-  // Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|,
-  // calls AggregateStore::GenerateObservations() with the day index of the previous day from
-  // |system_time| in each of UTC and local time, and then backs up the history of generated
-  // Observations. If |next_gc_| is less than or equal to |steady_time|, calls
-  // AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in
-  // each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is
-  // logged and execution continues if the operation fails.
-  void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
-                        std::chrono::steady_clock::time_point steady_time);
-
-  // Sets the EventAggregator's SteadyClockInterface. Only for use in tests.
-  void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
-
-  struct WorkerThreadController {
-    // Setting this value to true requests that the worker thread stop.
-    bool shut_down = true;
-
-    // Setting this value to true requests that the worker thread immediately perform its work
-    // rather than waiting for the next scheduled time to run. After the worker thread has completed
-    // its work, it will reset this value to false.
-    bool immediate_run_trigger = false;
-
-    // Used to wait on to execute periodic EventAggregator tasks.
-    std::condition_variable_any shutdown_notifier;
-  };
-
-  struct ShutDownFlag {
-    // Used to trigger a shutdown of the EventAggregator.
-    bool shut_down = true;
-    // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
-    bool manual_trigger = false;
-    // Used to wait on to execute periodic EventAggregator tasks.
-    std::condition_variable_any shutdown_notifier;
-  };
-
-  std::unique_ptr<AggregateStore> aggregate_store_;
-
-  std::thread worker_thread_;
-  util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
-  std::chrono::seconds aggregate_backup_interval_;
-  std::chrono::seconds generate_obs_interval_;
-  std::chrono::seconds gc_interval_;
-  std::chrono::steady_clock::time_point next_generate_obs_;
-  std::chrono::steady_clock::time_point next_gc_;
-  std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+  AggregateStore* aggregate_store_;  // not owned
 };
 
-}  // namespace local_aggregation
-}  // namespace cobalt
+}  // namespace cobalt::local_aggregation
 
 #endif  // COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_H_
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index dda9219..2418856 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -4,9 +4,14 @@
 
 #include "src/local_aggregation/event_aggregator_mgr.h"
 
+#include "src/lib/util/datetime_util.h"
+
 namespace cobalt::local_aggregation {
 
+using logger::kOK;
+using logger::Status;
 using util::ConsistentProtoStore;
+using util::TimeToDayIndex;
 
 EventAggregatorManager::EventAggregatorManager(const logger::Encoder* encoder,
                                                const logger::ObservationWriter* observation_writer,
@@ -15,14 +20,116 @@
                                                const size_t backfill_days,
                                                const std::chrono::seconds aggregate_backup_interval,
                                                const std::chrono::seconds generate_obs_interval,
-                                               const std::chrono::seconds gc_interval) {
-  event_aggregator_ = std::make_unique<EventAggregator>(
-      encoder, observation_writer, local_aggregate_proto_store, obs_history_proto_store,
-      backfill_days, aggregate_backup_interval, generate_obs_interval, gc_interval);
+                                               const std::chrono::seconds gc_interval)
+    : backfill_days_(backfill_days),
+      aggregate_backup_interval_(aggregate_backup_interval),
+      generate_obs_interval_(generate_obs_interval),
+      gc_interval_(gc_interval) {
+  aggregate_store_ =
+      std::make_unique<AggregateStore>(encoder, observation_writer, local_aggregate_proto_store,
+                                       obs_history_proto_store, backfill_days);
+  event_aggregator_ = std::make_unique<EventAggregator>(aggregate_store_.get());
 }
 
 void EventAggregatorManager::Start(std::unique_ptr<util::SystemClockInterface> clock) {
-  event_aggregator_->Start(std::move(clock));
+  auto locked = protected_worker_thread_controller_.lock();
+  locked->shut_down = false;
+  std::thread t(std::bind(
+      [this](std::unique_ptr<util::SystemClockInterface>& clock) { this->Run(std::move(clock)); },
+      std::move(clock)));
+  worker_thread_ = std::move(t);
+}
+
+void EventAggregatorManager::ShutDown() {
+  if (worker_thread_.joinable()) {
+    {
+      auto locked = protected_worker_thread_controller_.lock();
+      locked->shut_down = true;
+      locked->shutdown_notifier.notify_all();
+    }
+    worker_thread_.join();
+  } else {
+    protected_worker_thread_controller_.lock()->shut_down = true;
+  }
+}
+
+void EventAggregatorManager::Run(std::unique_ptr<util::SystemClockInterface> system_clock) {
+  std::chrono::steady_clock::time_point steady_time = steady_clock_->now();
+  // Schedule Observation generation to happen in the first cycle.
+  next_generate_obs_ = steady_time;
+  // Schedule garbage collection to happen |gc_interval_| seconds from now.
+  next_gc_ = steady_time + gc_interval_;
+  // Acquire the mutex protecting the shutdown flag and condition variable.
+  auto locked = protected_worker_thread_controller_.lock();
+  while (true) {
+    // If shutdown has been requested, back up the LocalAggregateStore and
+    // exit.
+    if (locked->shut_down) {
+      aggregate_store_->BackUpLocalAggregateStore();
+      return;
+    }
+    // Sleep until the next scheduled backup of the LocalAggregateStore or
+    // until notified of shutdown. Back up the LocalAggregateStore after
+    // waking.
+    locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
+      if (locked->immediate_run_trigger) {
+        locked->immediate_run_trigger = false;
+        return true;
+      }
+      return locked->shut_down;
+    });
+    aggregate_store_->BackUpLocalAggregateStore();
+    // If the worker thread was woken up by a shutdown request, exit.
+    // Otherwise, complete any scheduled Observation generation and garbage
+    // collection.
+    if (locked->shut_down) {
+      return;
+    }
+    // Check whether it is time to generate Observations or to garbage-collect
+    // the LocalAggregate store. If so, do that task and schedule the next
+    // occurrence.
+    DoScheduledTasks(system_clock->now(), steady_clock_->now());
+  }
+}
+
+void EventAggregatorManager::DoScheduledTasks(std::chrono::system_clock::time_point system_time,
+                                              std::chrono::steady_clock::time_point steady_time) {
+  auto current_time_t = std::chrono::system_clock::to_time_t(system_time);
+  auto yesterday_utc = TimeToDayIndex(current_time_t, MetricDefinition::UTC) - 1;
+  auto yesterday_local_time = TimeToDayIndex(current_time_t, MetricDefinition::LOCAL) - 1;
+
+  // Skip the tasks (but do schedule a retry) if either day index is too small.
+  uint32_t min_allowed_day_index = kMaxAllowedAggregationDays + backfill_days_;
+  bool skip_tasks =
+      (yesterday_utc < min_allowed_day_index || yesterday_local_time < min_allowed_day_index);
+  if (steady_time >= next_generate_obs_) {
+    next_generate_obs_ += generate_obs_interval_;
+    if (skip_tasks) {
+      LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping Observation generation because the "
+                                "current day index is too small.";
+    } else {
+      auto obs_status = aggregate_store_->GenerateObservations(yesterday_utc, yesterday_local_time);
+      if (obs_status == kOK) {
+        aggregate_store_->BackUpObservationHistory();
+      } else {
+        LOG(ERROR) << "GenerateObservations failed with status: " << obs_status;
+      }
+    }
+  }
+  if (steady_time >= next_gc_) {
+    next_gc_ += gc_interval_;
+    if (skip_tasks) {
+      LOG_FIRST_N(ERROR, 10) << "EventAggregator is skipping garbage collection because the "
+                                "current day index is too small.";
+    } else {
+      auto gc_status = aggregate_store_->GarbageCollect(yesterday_utc, yesterday_local_time);
+      if (gc_status == kOK) {
+        aggregate_store_->BackUpLocalAggregateStore();
+      } else {
+        LOG(ERROR) << "GarbageCollect failed with status: " << gc_status;
+      }
+    }
+  }
 }
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 12ea6d1..34b8f22 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -13,16 +13,38 @@
 #include "src/lib/util/clock.h"
 #include "src/lib/util/consistent_proto_store.h"
 #include "src/lib/util/protected_fields.h"
+#include "src/local_aggregation/aggregate_store.h"
 #include "src/local_aggregation/event_aggregator.h"
 #include "src/local_aggregation/local_aggregation.pb.h"
 #include "src/logger/encoder.h"
 #include "src/logger/observation_writer.h"
 #include "src/logger/status.h"
 
-namespace cobalt::local_aggregation {
+namespace cobalt {
 
 constexpr int64_t kHoursInADay = 24;
 
+// Forward declaration used for friend tests. These will be removed once a better solution is
+// designed.
+// TODO(ninai): remove this
+namespace internal {
+
+class RealLoggerFactory;
+
+}  // namespace internal
+
+namespace local_aggregation {
+
+// Class responsible for managing memory, threading, locks, time and the main loop.
+//
+// A worker thread calls on AggregateStore methods to do the following tasks at intervals
+// specified in the EventAggregator's constructor:
+// (1) Calls BackUp*() to back up the EventAggregator's state to the file system.
+// (2) Calls GenerateObservations() with the previous day's day index to generate all Observations
+// for rolling windows ending on that day index, as well as any missing Observations for a specified
+// number of days in the past.
+// (3) Calls GarbageCollect() to delete daily aggregates which are not needed to compute aggregates
+// for any windows of interest in the future.
 class EventAggregatorManager {
  public:
   // Constructs a class to manage local aggregation and provide EventAggregators.
@@ -68,6 +90,9 @@
                          std::chrono::seconds generate_obs_interval = std::chrono::hours(1),
                          std::chrono::seconds gc_interval = std::chrono::hours(kHoursInADay));
 
+  // Shut down the worker thread before destructing the EventAggregatorManager.
+  ~EventAggregatorManager() { ShutDown(); }
+
   // Starts the worker thread.
   //
   // |clock| The clock that should be used by the worker thread for scheduling tasks and determining
@@ -82,15 +107,76 @@
  private:
   friend class TestEventAggregatorManager;
   friend class EventAggregatorManagerTest;
+  friend class AggregateStoreTest;
+  friend class AggregateStoreWorkerTest;
+  friend class EventAggregatorTest;
+  friend class EventAggregatorManagerTest;
+  friend class EventAggregatorWorkerTest;
+  friend class internal::RealLoggerFactory;
 
-  // Sets the EventAggregator's SteadyClockInterface. Only for use in tests.
-  void SetSteadyClock(util::SteadyClockInterface* clock) {
-    event_aggregator_->SetSteadyClock(clock);
-  }
+  // Request that the worker thread shut down and wait for it to exit. The
+  // worker thread backs up the LocalAggregateStore before exiting.
+  void ShutDown();
 
+  // Main loop executed by the worker thread. The thread sleeps for
+  // |aggregate_backup_interval_| seconds or until notified of shutdown, then
+  // calls BackUpLocalAggregateStore(). If not notified of shutdown, calls
+  // DoScheduledTasks() and schedules the next occurrence of any completed
+  // tasks.
+  void Run(std::unique_ptr<util::SystemClockInterface> system_clock);
+
+  // Helper method called by Run(). If |next_generate_obs_| is less than or equal to |steady_time|,
+  // calls AggregateStore::GenerateObservations() with the day index of the previous day from
+  // |system_time| in each of UTC and local time, and then backs up the history of generated
+  // Observations. If |next_gc_| is less than or equal to |steady_time|, calls
+  // AggregateStore::GarbageCollect() with the day index of the previous day from |system_time| in
+  // each of UTC and local time and then backs up the LocalAggregateStore. In each case, an error is
+  // logged and execution continues if the operation fails.
+  void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
+                        std::chrono::steady_clock::time_point steady_time);
+
+  // Sets the EventAggregatorManager's SteadyClockInterface. Only for use in tests.
+  void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }
+
+  struct WorkerThreadController {
+    // Setting this value to true requests that the worker thread stop.
+    bool shut_down = true;
+
+    // Setting this value to true requests that the worker thread immediately perform its work
+    // rather than waiting for the next scheduled time to run. After the worker thread has completed
+    // its work, it will reset this value to false.
+    bool immediate_run_trigger = false;
+
+    // Used to wait on to execute periodic EventAggregator tasks.
+    std::condition_variable_any shutdown_notifier;
+  };
+
+  struct ShutDownFlag {
+    // Used to trigger a shutdown of the EventAggregator.
+    bool shut_down = true;
+    // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
+    bool manual_trigger = false;
+    // Used to wait on to execute periodic EventAggregator tasks.
+    std::condition_variable_any shutdown_notifier;
+  };
+
+  size_t backfill_days_;
+  std::chrono::seconds aggregate_backup_interval_;
+  std::chrono::seconds generate_obs_interval_;
+  std::chrono::seconds gc_interval_;
+
+  std::chrono::steady_clock::time_point next_generate_obs_;
+  std::chrono::steady_clock::time_point next_gc_;
+  std::unique_ptr<util::SteadyClockInterface> steady_clock_;
+
+  std::unique_ptr<AggregateStore> aggregate_store_;
   std::unique_ptr<EventAggregator> event_aggregator_;
+
+  std::thread worker_thread_;
+  util::ProtectedFields<WorkerThreadController> protected_worker_thread_controller_;
 };
 
-}  // namespace cobalt::local_aggregation
+}  // namespace local_aggregation
+}  // namespace cobalt
 
 #endif  // COBALT_SRC_LOCAL_AGGREGATION_EVENT_AGGREGATOR_MGR_H_
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index 308b54d..1df6bf3 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -89,9 +89,7 @@
     return event_aggregator_mgr_;
   }
 
-  void ShutDown(EventAggregatorManager* event_aggregator_mgr) {
-    event_aggregator_mgr->GetEventAggregator()->ShutDown();
-  }
+  void ShutDown(EventAggregatorManager* event_aggregator_mgr) { event_aggregator_mgr->ShutDown(); }
 
   bool IsInShutdownState(EventAggregatorManager* event_aggregator_mgr) {
     return (IsShutdownFlagSet(event_aggregator_mgr) && !IsWorkerJoinable(event_aggregator_mgr));
@@ -102,13 +100,11 @@
   }
 
   bool IsShutdownFlagSet(EventAggregatorManager* event_aggregator_mgr) {
-    return event_aggregator_mgr->GetEventAggregator()
-        ->protected_worker_thread_controller_.const_lock()
-        ->shut_down;
+    return event_aggregator_mgr->protected_worker_thread_controller_.const_lock()->shut_down;
   }
 
   bool IsWorkerJoinable(EventAggregatorManager* event_aggregator_mgr) {
-    return event_aggregator_mgr->GetEventAggregator()->worker_thread_.joinable();
+    return event_aggregator_mgr->worker_thread_.joinable();
   }
 
   // Returns the day index of the current day according to |test_clock_|, in
@@ -121,10 +117,7 @@
   bool BackUpHappened() { return local_aggregate_proto_store_->write_count_ >= 1; }
 
   uint32_t NumberOfKVPairsInStore(EventAggregatorManager* event_aggregator_mgr) {
-    return event_aggregator_mgr->GetEventAggregator()
-        ->aggregate_store_->CopyLocalAggregateStore()
-        .by_report_key()
-        .size();
+    return event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore().by_report_key().size();
   }
 
   // Given a ProjectContext |project_context| and the MetricReportId of a UNIQUE_N_DAY_ACTIVES
@@ -144,8 +137,7 @@
   }
 
   uint32_t GetNumberOfUniqueActivesAggregates(EventAggregatorManager* event_aggregator_mgr) {
-    auto local_aggregate_store =
-        event_aggregator_mgr->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
     uint32_t num_aggregates = 0;
     for (const auto& [report_key, aggregates] : local_aggregate_store.by_report_key()) {
       if (aggregates.type_case() != ReportAggregates::kUniqueActivesAggregates) {
@@ -164,8 +156,7 @@
       EventAggregatorManager* event_aggregator_mgr,
       const std::shared_ptr<const ProjectContext>& project_context,
       const MetricReportId& metric_report_id, uint32_t day_index, uint32_t event_code) {
-    auto local_aggregate_store =
-        event_aggregator_mgr->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr->aggregate_store_->CopyLocalAggregateStore();
     std::string key;
     if (!SerializeToBase64(MakeAggregationKey(*project_context, metric_report_id), &key)) {
       return AssertionFailure() << "Could not serialize key with metric id  "
@@ -206,16 +197,16 @@
     return AssertionSuccess();
   }
 
-  void TriggerAndWaitForDoScheduledTasks(EventAggregator* event_aggregator) {
+  void TriggerAndWaitForDoScheduledTasks(EventAggregatorManager* event_aggregator_mgr) {
     {
       // Acquire the lock to manually trigger the scheduled tasks.
-      auto locked = event_aggregator->protected_worker_thread_controller_.lock();
+      auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
       locked->immediate_run_trigger = true;
       locked->shutdown_notifier.notify_all();
     }
     while (true) {
       // Reacquire the lock to make sure that the scheduled tasks have completed.
-      auto locked = event_aggregator->protected_worker_thread_controller_.lock();
+      auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
       if (!locked->immediate_run_trigger) {
         break;
       }
@@ -368,7 +359,7 @@
   expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
                                             {7, {false, true, true, true, true}}};
 
-  TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr->GetEventAggregator());
+  TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
 
   EXPECT_EQ(kOK,
             event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
@@ -384,7 +375,7 @@
   AdvanceClock(kSecondsInOneDay);
   ResetObservationStore();
 
-  TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr->GetEventAggregator());
+  TriggerAndWaitForDoScheduledTasks(event_aggregator_mgr.get());
 
   EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
                                              update_recipient_.get()));
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index f9478ee..5629e9d 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -37,13 +37,11 @@
 using logger::ObservationWriter;
 using logger::ProjectContext;
 using logger::Status;
-using logger::testing::CheckUniqueActivesObservations;
 using logger::testing::ExpectedUniqueActivesObservations;
 using logger::testing::FakeObservationStore;
 using logger::testing::GetTestProject;
 using logger::testing::MakeAggregationConfig;
 using logger::testing::MakeAggregationKey;
-using logger::testing::MakeNullExpectedUniqueActivesObservations;
 using logger::testing::MockConsistentProtoStore;
 using logger::testing::TestUpdateRecipient;
 using system_data::ClientSecret;
@@ -124,7 +122,7 @@
     unowned_test_clock_ = test_clock_.get();
     day_store_created_ = CurrentDayIndex();
     test_steady_clock_ = new IncrementingSteadyClock(std::chrono::system_clock::duration(0));
-    event_aggregator_mgr_->GetEventAggregator()->SetSteadyClock(test_steady_clock_);
+    event_aggregator_mgr_->SetSteadyClock(test_steady_clock_);
   }
 
   // Destruct the EventAggregator (thus calling EventAggregator::ShutDown())
@@ -145,26 +143,22 @@
                           time_zone);
   }
 
-  size_t GetBackfillDays() {
-    return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->backfill_days_;
-  }
+  size_t GetBackfillDays() { return event_aggregator_mgr_->aggregate_store_->backfill_days_; }
 
   LocalAggregateStore CopyLocalAggregateStore() {
-    return event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+    return event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
   }
 
   void TriggerAndWaitForDoScheduledTasks() {
     {
       // Acquire the lock to manually trigger the scheduled tasks.
-      auto locked =
-          event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
+      auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
       locked->immediate_run_trigger = true;
       locked->shutdown_notifier.notify_all();
     }
     while (true) {
       // Reacquire the lock to make sure that the scheduled tasks have completed.
-      auto locked =
-          event_aggregator_mgr_->GetEventAggregator()->protected_worker_thread_controller_.lock();
+      auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
       if (!locked->immediate_run_trigger) {
         break;
       }
@@ -344,8 +338,7 @@
   // of reference.
   bool CheckUniqueActivesAggregates(const LoggedActivity& logged_activity,
                                     uint32_t /*current_day_index*/) {
-    auto local_aggregate_store =
-        event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more UniqueActives
     // aggregates than |logged_activity| and |day_last_garbage_collected_|
     // should imply.
@@ -460,8 +453,7 @@
 
   bool CheckPerDeviceNumericAggregates(const LoggedValues& logged_values,
                                        uint32_t /*current_day_index*/) {
-    auto local_aggregate_store =
-        event_aggregator_mgr_->GetEventAggregator()->aggregate_store_->CopyLocalAggregateStore();
+    auto local_aggregate_store = event_aggregator_mgr_->aggregate_store_->CopyLocalAggregateStore();
     // Check that the LocalAggregateStore contains no more PerDeviceNumeric
     // aggregates than |logged_values| and |day_last_garbage_collected_| should
     // imply.
@@ -811,21 +803,17 @@
  protected:
   void SetUp() override { EventAggregatorTest::SetUp(); }
 
-  void ShutDownWorkerThread() { event_aggregator_mgr_->GetEventAggregator()->ShutDown(); }
+  void ShutDownWorkerThread() { event_aggregator_mgr_->ShutDown(); }
 
   bool in_shutdown_state() { return (shutdown_flag_set() && !worker_joinable()); }
 
   bool in_run_state() { return (!shutdown_flag_set() && worker_joinable()); }
 
   bool shutdown_flag_set() {
-    return event_aggregator_mgr_->GetEventAggregator()
-        ->protected_worker_thread_controller_.const_lock()
-        ->shut_down;
+    return event_aggregator_mgr_->protected_worker_thread_controller_.const_lock()->shut_down;
   }
 
-  bool worker_joinable() {
-    return event_aggregator_mgr_->GetEventAggregator()->worker_thread_.joinable();
-  }
+  bool worker_joinable() { return event_aggregator_mgr_->worker_thread_.joinable(); }
 };
 
 // Tests that an empty LocalAggregateStore is updated with
@@ -1013,65 +1001,6 @@
   }
 }
 
-// Checks that UniqueActivesObservations with the expected values are
-// generated by the the scheduled Observation generation when some events
-// have been logged for a UNIQUE_N_DAY_ACTIVES.
-// (based on UniqueActivesNoiseFreeEventAggregatorTest::CheckObservationValuesMultiDay)
-//
-// Logging pattern:
-// Logs events for the EventsOccurred_UniqueDevices report (whose parent
-// metric has max_event_code = 4) for event codes 1 through 4.
-//
-// Expected number of Observations:
-// The call to GenerateObservations should generate a number of Observations
-// equal to the daily_num_obs field of
-// |logger::testing::unique_actives_noise_free::kExpectedAggregationParams|.
-//
-// Expected Observation values:
-// The EventsOccurred_UniqueDevices report has window sizes 1 and 7, and
-// the expected activity indicators of Observations for that report are:
-//
-// (window size)            active for event codes
-// ------------------------------------------------------
-// (1)                           1, 2, 3, 4
-// (7)                           1, 2, 3, 4
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(UniqueActivesNoiseFreeEventAggregatorTest, Run) {
-  auto day_index = CurrentDayIndex();
-
-  // Form expected Observations for the 1 day of logging.
-  ExpectedUniqueActivesObservations expected_obs = MakeNullExpectedUniqueActivesObservations(
-      logger::testing::unique_actives_noise_free::kExpectedAggregationParams, day_index);
-  const auto& expected_id =
-      logger::testing::unique_actives_noise_free::kEventsOccurredMetricReportId;
-  expected_obs[{expected_id, day_index}] = {{1, {false, true, true, true, true}},
-                                            {7, {false, true, true, true, true}}};
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  // Trigger the initial call to DoScheduledTasks and wait for it to have completed.
-  TriggerAndWaitForDoScheduledTasks();
-
-  // Generate some events.
-  for (uint32_t event_code = 1;
-       event_code <
-       logger::testing::unique_actives_noise_free::kExpectedAggregationParams.num_event_codes.at(
-           expected_id);
-       event_code++) {
-    EXPECT_EQ(kOK, AddUniqueActivesEvent(expected_id, day_index, event_code));
-  }
-  // Advance |test_clock_| by 1 day.
-  AdvanceClock(kDay);
-  // Clear the FakeObservationStore.
-  ResetObservationStore();
-
-  // Trigger another iteration of the call to DoScheduledTasks and wait for it to have completed.
-  TriggerAndWaitForDoScheduledTasks();
-
-  // Check the generated Observations against the expectation.
-  EXPECT_TRUE(CheckUniqueActivesObservations(expected_obs, observation_store_.get(),
-                                             update_recipient_.get()));
-}
-
 // Tests that the LocalAggregateStore is updated as expected when
 // EventAggregator::AddPerDeviceCountEvent() is called with valid arguments;
 // i.e., with a report ID associated to an existing key of the
@@ -1210,87 +1139,4 @@
   }
 }
 
-// Starts the worker thread, and destructs the EventAggregator without
-// explicitly shutting down the worker thread. Checks that the shutdown flag
-// and worker thread are in the expected states before and after the thread is
-// started.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, StartWorkerThread) {
-  EXPECT_TRUE(in_shutdown_state());
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  EXPECT_TRUE(in_run_state());
-}
-
-// Starts the worker thread, shuts down the worker thread, and destructs the
-// EventAggregator. Checks that the shutdown flag and worker thread are in the
-// expected states.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, StartAndShutDownWorkerThread) {
-  EXPECT_TRUE(in_shutdown_state());
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  EXPECT_TRUE(in_run_state());
-  ShutDownWorkerThread();
-  EXPECT_TRUE(in_shutdown_state());
-}
-
-// Starts the worker thread and immediately shuts it down. Checks that the
-// LocalAggregateStore was backed up during shutdown.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, BackUpBeforeShutdown) {
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  ShutDownWorkerThread();
-  EXPECT_EQ(1, local_aggregate_proto_store_->write_count_);
-}
-
-// Starts the worker thread and calls
-// EventAggregator::UpdateAggregationConfigs() on the main thread.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, UpdateAggregationConfigs) {
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  // Provide the EventAggregator with the all_report_types registry.
-  auto project_context = GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
-  EXPECT_EQ(
-      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
-  // Check that the number of key-value pairs in the LocalAggregateStore is
-  // now equal to the number of locally aggregated reports in the
-  // all_report_types registry.
-  EXPECT_EQ(logger::testing::all_report_types::kExpectedAggregationParams.metric_report_ids.size(),
-            CopyLocalAggregateStore().by_report_key().size());
-}
-
-// Starts the worker thread, provides a ProjectContext, logs some events, and
-// shuts down the worker thread. Checks that the LocalAggregateStore was
-// backed up at least once during the lifetime of the worker thread.
-//
-// TODO(ninai): remove duplicate test when the duplicate functions are removed.
-TEST_F(EventAggregatorWorkerTest, LogEvents) {
-  auto day_index = CurrentDayIndex();
-  event_aggregator_mgr_->Start(std::move(test_clock_));
-  // Provide the EventAggregator with the all_report_types registry.
-  std::shared_ptr<ProjectContext> project_context =
-      GetTestProject(logger::testing::all_report_types::kCobaltRegistryBase64);
-  EXPECT_EQ(
-      kOK, event_aggregator_mgr_->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
-  // // Adds some events to the local aggregations.
-  LoggedActivity logged_activity;
-  EXPECT_EQ(kOK, AddUniqueActivesEvent(
-                     project_context, logger::testing::all_report_types::kDeviceBootsMetricReportId,
-                     day_index, 0u, &logged_activity));
-  EXPECT_EQ(kOK,
-            AddUniqueActivesEvent(project_context,
-                                  logger::testing::all_report_types::kFeaturesActiveMetricReportId,
-                                  day_index, 4u, &logged_activity));
-  EXPECT_EQ(kOK,
-            AddUniqueActivesEvent(project_context,
-                                  logger::testing::all_report_types::kEventsOccurredMetricReportId,
-                                  day_index, 1u, &logged_activity));
-  EXPECT_TRUE(CheckUniqueActivesAggregates(logged_activity, day_index));
-  ShutDownWorkerThread();
-  EXPECT_GE(local_aggregate_proto_store_->write_count_, 1);
-}
-
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
index f3834ed..45d39eb 100644
--- a/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
+++ b/src/local_aggregation/test_utils/test_event_aggregator_mgr.h
@@ -26,23 +26,21 @@
   // Triggers an out of schedule run of GenerateObservations(). This does not change the schedule
   // of future runs.
   logger::Status GenerateObservations(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return EventAggregatorManager::event_aggregator_->aggregate_store_->GenerateObservations(
-        day_index_utc, day_index_local);
+    return EventAggregatorManager::aggregate_store_->GenerateObservations(day_index_utc,
+                                                                          day_index_local);
   }
 
   // Triggers an out of schedule run of GarbageCollect(). This does not change the schedule of
   // future runs.
   logger::Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
-    return EventAggregatorManager::event_aggregator_->aggregate_store_->GarbageCollect(
-        day_index_utc, day_index_local);
+    return EventAggregatorManager::aggregate_store_->GarbageCollect(day_index_utc, day_index_local);
   }
 
   // Returns the number of aggregates of type per_device_numeric_aggregates.
   uint32_t NumPerDeviceNumericAggregatesInStore() {
     int count = 0;
     for (const auto& aggregates :
-         EventAggregatorManager::event_aggregator_->aggregate_store_->protected_aggregate_store_
-             .lock()
+         EventAggregatorManager::aggregate_store_->protected_aggregate_store_.lock()
              ->local_aggregate_store.by_report_key()) {
       if (aggregates.second.has_numeric_aggregates()) {
         count += aggregates.second.numeric_aggregates().by_component().size();