[privacy/LocalAggregation] Add Disable/DeleteData

- Disable allows disabling local aggregation, which means that the aggregate
  store will ignore all incoming events.
- DeleteData will delete all device-specific data from the aggregate
  store. The only data that will remain is data that is derived from the
  Cobalt registry.

Bug: 37244
Change-Id: Ibd0eb29eaa3456b782c384bbb252037a7dc7a9ef
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 994b569..34da7c3 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -183,6 +183,7 @@
       << "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
   backfill_days_ = backfill_days;
   auto locked_store = protected_aggregate_store_.lock();
+  locked_store->empty_local_aggregate_store = MakeNewLocalAggregateStore();
   auto restore_aggregates_status =
       local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
   switch (restore_aggregates_status.error_code()) {
@@ -261,11 +262,23 @@
     }
     (*locked->local_aggregate_store.mutable_by_report_key())[key] = report_aggregates;
   }
+
+  // Make sure that the 'empty' store has the key as well.
+  ReportAggregates empty_report_aggregates;
+  if (locked->empty_local_aggregate_store.by_report_key().count(key) == 0) {
+    if (!PopulateReportAggregates(project_context, metric, report, &empty_report_aggregates)) {
+      return kInvalidArguments;
+    }
+    (*locked->empty_local_aggregate_store.mutable_by_report_key())[key] = empty_report_aggregates;
+  }
   return kOK;
 }
 
 Status AggregateStore::SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
                                  uint32_t report_id, uint64_t event_code, uint32_t day_index) {
+  if (is_disabled_) {
+    return kOK;
+  }
   std::string key;
   if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &key)) {
     return kInvalidArguments;
@@ -293,6 +306,9 @@
                                               uint32_t metric_id, uint32_t report_id,
                                               const std::string& component, uint64_t event_code,
                                               uint32_t day_index, int64_t value) {
+  if (is_disabled_) {
+    return kOK;
+  }
   std::string report_key;
   if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &report_key)) {
     return kInvalidArguments;
@@ -806,6 +822,22 @@
       ->set_last_generated(value);
 }
 
+void AggregateStore::DeleteData() {
+  LOG(INFO) << "AggregateStore: Deleting stored data";
+
+  {
+    auto locked = protected_aggregate_store_.lock();
+    locked->local_aggregate_store = locked->empty_local_aggregate_store;
+  }
+  protected_obs_history_.lock()->obs_history = MakeNewObservationHistoryStore();
+}
+
+void AggregateStore::Disable(bool is_disabled) {
+  LOG(INFO) << "AggregateStore: " << (is_disabled ? "Disabling" : "Enabling")
+            << " event aggregate storage.";
+  is_disabled_ = is_disabled;
+}
+
 Status AggregateStore::GenerateSinglePerDeviceNumericObservation(
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 729566f..d59c891 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -82,6 +82,9 @@
   // event with the given |customer_id|, |project_id|, |metric_id|, |report_id| and |event_code|.
   // Expects that MaybeInsertReportConfig() has been called previously for the ids being passed.
   // Returns kInvalidArguments if the operation fails, and kOK otherwise.
+  //
+  // N.B. If the AggregateStore has been disabled (is_disabled_ == true), this method will do
+  // nothing, and will always return kOK.
   logger::Status SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
                            uint32_t report_id, uint64_t event_code, uint32_t day_index);
 
@@ -89,6 +92,9 @@
   // indexed by |customer_id|, |project_id|, |metric_id|, |report_id|, |component|, |event_code| and
   // |day_index|. Expects that MaybeInsertReportConfig() has been called previously for the ids
   // being passed. Returns kInvalidArguments if the operation fails, and kOK otherwise.
+  //
+  // N.B. If the AggregateStore has been disabled (is_disabled_ == true), this method will do
+  // nothing, and will always return kOK.
   logger::Status UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id,
                                         uint32_t metric_id, uint32_t report_id,
                                         const std::string& component, uint64_t event_code,
@@ -185,6 +191,19 @@
   // given report to |value|, according to |protected_obs_history|
   void SetReportParticipationLastGeneratedDayIndex(const std::string& report_key, uint32_t value);
 
+  // DeleteData removes all device-specific information from the LocalAggregateStore and the
+  // AggregatedObservationHistoryStore. The only data that remains is the data derived from the
+  // Metrics Registry in `MaybeInsertReportConfig`.
+  void DeleteData();
+
+  // Disable allows enabling/disabling the AggregateStore. When the store is disabled, the following
+  // will happen:
+  //
+  // 1. Calls to SetActive and UpdateNumericAggregate will do nothing and immediately return kOK.
+  // 2. Calls to MaybeInsertReportConfig will continue to function, but since this only stores
+  //    information derived from the Metrics Registry, this is not a problem.
+  void Disable(bool is_disabled);
+
  private:
   friend class AggregateStoreTest;
   friend class EventAggregatorTest;
@@ -297,12 +316,23 @@
 
   struct AggregateStoreFields {
     LocalAggregateStore local_aggregate_store;
+
+    // When clients connect to Cobalt, their ProjectContext is supplied to the AggregateStore in
+    // MaybeInsertReportConfig. This creates structures in the LocalAggregateStore that are required
+    // for SetActive and UpdateNumericAggregate to function. In order to allow deleting the data
+    // from the AggregateStore without needing to restart Cobalt, we store a copy of all these
+    // report configs here without any device-specific information. This way, when the DeleteData
+    // method is called, we can replace local_aggregate_store with empty_local_aggregate_store, and
+    // both SetActive and UpdateNumericAggregate will continue to function as expected.
+    LocalAggregateStore empty_local_aggregate_store;
   };
 
   struct AggregatedObservationHistoryStoreFields {
     AggregatedObservationHistoryStore obs_history;
   };
 
+  bool is_disabled_ = false;
+
   // The number of past days for which the AggregateStore generates and sends Observations, in
   // addition to a requested day index.
   size_t backfill_days_ = 0;
diff --git a/src/local_aggregation/event_aggregator_mgr.cc b/src/local_aggregation/event_aggregator_mgr.cc
index 6310e5d..4d7bb1a 100644
--- a/src/local_aggregation/event_aggregator_mgr.cc
+++ b/src/local_aggregation/event_aggregator_mgr.cc
@@ -71,7 +71,7 @@
     {
       auto locked = protected_worker_thread_controller_.lock();
       locked->shut_down = true;
-      locked->shutdown_notifier.notify_all();
+      locked->wakeup_notifier.notify_all();
     }
     worker_thread_.join();
   } else {
@@ -97,14 +97,18 @@
     // Sleep until the next scheduled backup of the LocalAggregateStore or
     // until notified of shutdown. Back up the LocalAggregateStore after
     // waking.
-    locked->shutdown_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
+    locked->wakeup_notifier.wait_for(locked, aggregate_backup_interval_, [&locked]() {
       if (locked->immediate_run_trigger) {
         locked->immediate_run_trigger = false;
         return true;
       }
-      return locked->shut_down;
+      return locked->shut_down || locked->back_up_now;
     });
     aggregate_store_->BackUpLocalAggregateStore();
+    if (locked->back_up_now) {
+      locked->back_up_now = false;
+      aggregate_store_->BackUpObservationHistory();
+    }
     // If the worker thread was woken up by a shutdown request, exit.
     // Otherwise, complete any scheduled Observation generation and garbage
     // collection.
@@ -168,6 +172,12 @@
   return aggregate_store_->GenerateObservations(final_day_index_utc, final_day_index_local);
 }
 
+void EventAggregatorManager::TriggerBackups() {
+  auto locked = protected_worker_thread_controller_.lock();
+  locked->back_up_now = true;
+  locked->wakeup_notifier.notify_all();
+}
+
 void EventAggregatorManager::Reset() {
   aggregate_store_ = std::make_unique<AggregateStore>(
       encoder_, observation_writer_, owned_local_aggregate_proto_store_.get(),
diff --git a/src/local_aggregation/event_aggregator_mgr.h b/src/local_aggregation/event_aggregator_mgr.h
index 90dd243..0267f66 100644
--- a/src/local_aggregation/event_aggregator_mgr.h
+++ b/src/local_aggregation/event_aggregator_mgr.h
@@ -125,6 +125,12 @@
   logger::Status GenerateObservationsNoWorker(uint32_t final_day_index_utc,
                                               uint32_t final_day_index_local = 0u);
 
+  void Disable(bool is_disabled) { aggregate_store_->Disable(is_disabled); }
+  void DeleteData() {
+    aggregate_store_->DeleteData();
+    TriggerBackups();
+  }
+
  private:
   friend class TestEventAggregatorManager;
   friend class EventAggregatorManagerTest;
@@ -157,6 +163,12 @@
   void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
                         std::chrono::steady_clock::time_point steady_time);
 
+  // Triggers the work thread to wake up and back up the LocalAggregateStore and the
+  // ObservationHistory.
+  //
+  // TODO(zmbush): Rename "backup" nomenclature to "checkpoint".
+  void TriggerBackups();
+
   struct WorkerThreadController {
     // Setting this value to true requests that the worker thread stop.
     bool shut_down = true;
@@ -166,17 +178,12 @@
     // its work, it will reset this value to false.
     bool immediate_run_trigger = false;
 
-    // Used to wait on to execute periodic EventAggregator tasks.
-    std::condition_variable_any shutdown_notifier;
-  };
+    // Setting this value to true requests that the worker thread wake up and back up the aggregate
+    // store and the observation history, before going back to sleep.
+    bool back_up_now = false;
 
-  struct ShutDownFlag {
-    // Used to trigger a shutdown of the EventAggregator.
-    bool shut_down = true;
-    // Used in tests to manually trigger a run of the EventAggregator's scheduled tasks.
-    bool manual_trigger = false;
     // Used to wait on to execute periodic EventAggregator tasks.
-    std::condition_variable_any shutdown_notifier;
+    std::condition_variable_any wakeup_notifier;
   };
 
   const logger::Encoder* encoder_;
diff --git a/src/local_aggregation/event_aggregator_mgr_test.cc b/src/local_aggregation/event_aggregator_mgr_test.cc
index 104824e..53d78ab 100644
--- a/src/local_aggregation/event_aggregator_mgr_test.cc
+++ b/src/local_aggregation/event_aggregator_mgr_test.cc
@@ -203,7 +203,7 @@
       // Acquire the lock to manually trigger the scheduled tasks.
       auto locked = event_aggregator_mgr->protected_worker_thread_controller_.lock();
       locked->immediate_run_trigger = true;
-      locked->shutdown_notifier.notify_all();
+      locked->wakeup_notifier.notify_all();
     }
     while (true) {
       // Reacquire the lock to make sure that the scheduled tasks have completed.
@@ -288,6 +288,106 @@
             NumberOfKVPairsInStore(event_aggregator_mgr.get()));
 }
 
+TEST_F(EventAggregatorManagerTest, LogEventsAfterDelete) {
+  auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+  event_aggregator_mgr->Start(GetTestSystemClock());
+
+  auto day_index = CurrentDayIndex();
+  // Provide the EventAggregator with the all_report_types registry.
+  std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+  EXPECT_EQ(kOK,
+            event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+  EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                       kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+  event_aggregator_mgr->DeleteData();
+
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+  EXPECT_EQ(2, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+  EXPECT_FALSE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+                                                   kDeviceBootsMetricReportId, day_index,
+                                                   /*event_code*/ 0u));
+  EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+                                                  kFeaturesActiveMetricReportId, day_index,
+                                                  /*event_code*/ 4u));
+  EXPECT_TRUE(ContainsValidUniqueActivesAggregate(event_aggregator_mgr.get(), project_context,
+                                                  kEventsOccurredMetricReportId, day_index,
+                                                  /*event_code*/ 1u));
+
+  ShutDown(event_aggregator_mgr.get());
+  EXPECT_TRUE(BackUpHappened());
+}
+
+TEST_F(EventAggregatorManagerTest, DeleteData) {
+  auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+  event_aggregator_mgr->Start(GetTestSystemClock());
+
+  auto day_index = CurrentDayIndex();
+  // Provide the EventAggregator with the all_report_types registry.
+  std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+  EXPECT_EQ(kOK,
+            event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+  EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                       kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+  EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+  event_aggregator_mgr->DeleteData();
+  EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+}
+
+TEST_F(EventAggregatorManagerTest, Disable) {
+  auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
+  event_aggregator_mgr->Start(GetTestSystemClock());
+
+  auto day_index = CurrentDayIndex();
+  // Provide the EventAggregator with the all_report_types registry.
+  std::shared_ptr<ProjectContext> project_context = GetTestProject(kCobaltRegistryBase64);
+  EXPECT_EQ(kOK,
+            event_aggregator_mgr->GetEventAggregator()->UpdateAggregationConfigs(*project_context));
+
+  event_aggregator_mgr->Disable(true);
+
+  EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                       kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+
+  EXPECT_EQ(0, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+
+  event_aggregator_mgr->Disable(false);
+
+  EXPECT_EQ(kOK, AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                       kDeviceBootsMetricReportId, day_index, /*event_code*/ 0u));
+
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kFeaturesActiveMetricReportId, day_index, /*event_code*/ 4u));
+  EXPECT_EQ(kOK,
+            AddUniqueActivesEvent(event_aggregator_mgr.get(), project_context,
+                                  kEventsOccurredMetricReportId, day_index, /*event_code*/ 1u));
+  EXPECT_EQ(3, GetNumberOfUniqueActivesAggregates(event_aggregator_mgr.get()));
+}
+
 TEST_F(EventAggregatorManagerTest, LogEvents) {
   auto event_aggregator_mgr = GetEventAggregatorManager(GetTestSteadyClock());
   event_aggregator_mgr->Start(GetTestSystemClock());
diff --git a/src/local_aggregation/event_aggregator_test.cc b/src/local_aggregation/event_aggregator_test.cc
index c0ddbc8..73911b2 100644
--- a/src/local_aggregation/event_aggregator_test.cc
+++ b/src/local_aggregation/event_aggregator_test.cc
@@ -155,7 +155,7 @@
       // Acquire the lock to manually trigger the scheduled tasks.
       auto locked = event_aggregator_mgr_->protected_worker_thread_controller_.lock();
       locked->immediate_run_trigger = true;
-      locked->shutdown_notifier.notify_all();
+      locked->wakeup_notifier.notify_all();
     }
     while (true) {
       // Reacquire the lock to make sure that the scheduled tasks have completed.