[cobalt[LocalAggregation] Make Observation History a protected field.

- moves obs_history in a ProtectedField<>
- Defines Get/Set methods for the *LastGeneratedDayIndex, so that it
separates accessing the underlying data structures from observation
generation
- Adds tests for Get/Set methods
- makes the methods pulic
- nit organization of the AggregateStore's private fields

Bug: 40853

Change-Id: I364b31f79c9c24d5784a77ed4d16fe26f63d9b57
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index c9dc50e..994b569 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -182,9 +182,9 @@
   CHECK_LE(backfill_days, kMaxAllowedBackfillDays)
       << "backfill_days must be less than or equal to " << kMaxAllowedBackfillDays;
   backfill_days_ = backfill_days;
-  auto locked = protected_aggregate_store_.lock();
+  auto locked_store = protected_aggregate_store_.lock();
   auto restore_aggregates_status =
-      local_aggregate_proto_store_->Read(&(locked->local_aggregate_store));
+      local_aggregate_proto_store_->Read(&(locked_store->local_aggregate_store));
   switch (restore_aggregates_status.error_code()) {
     case StatusCode::OK: {
       VLOG(4) << "Read LocalAggregateStore from disk.";
@@ -194,7 +194,7 @@
       VLOG(4) << "No file found for local_aggregate_proto_store. Proceeding "
                  "with empty LocalAggregateStore. File will be created on "
                  "first snapshot of the LocalAggregateStore.";
-      locked->local_aggregate_store = MakeNewLocalAggregateStore();
+      locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
       break;
     }
     default: {
@@ -203,18 +203,19 @@
                  << "\nError message: " << restore_aggregates_status.error_message()
                  << "\nError details: " << restore_aggregates_status.error_details()
                  << "\nProceeding with empty LocalAggregateStore.";
-      locked->local_aggregate_store = MakeNewLocalAggregateStore();
+      locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
     }
   }
-  if (auto status = MaybeUpgradeLocalAggregateStore(&(locked->local_aggregate_store));
+  if (auto status = MaybeUpgradeLocalAggregateStore(&(locked_store->local_aggregate_store));
       status != kOK) {
     LOG(ERROR) << "Failed to upgrade LocalAggregateStore to current version with status " << status
                << ".\nProceeding with empty "
                   "LocalAggregateStore.";
-    locked->local_aggregate_store = MakeNewLocalAggregateStore();
+    locked_store->local_aggregate_store = MakeNewLocalAggregateStore();
   }
 
-  auto restore_history_status = obs_history_proto_store_->Read(&obs_history_);
+  auto locked_obs_history = protected_obs_history_.lock();
+  auto restore_history_status = obs_history_proto_store_->Read(&locked_obs_history->obs_history);
   switch (restore_history_status.error_code()) {
     case StatusCode::OK: {
       VLOG(4) << "Read AggregatedObservationHistoryStore from disk.";
@@ -232,14 +233,15 @@
                  << "\nError message: " << restore_history_status.error_message()
                  << "\nError details: " << restore_history_status.error_details()
                  << "\nProceeding with empty AggregatedObservationHistoryStore.";
-      obs_history_ = MakeNewObservationHistoryStore();
+      locked_obs_history->obs_history = MakeNewObservationHistoryStore();
     }
   }
-  if (auto status = MaybeUpgradeObservationHistoryStore(&obs_history_); status != kOK) {
+  if (auto status = MaybeUpgradeObservationHistoryStore(&locked_obs_history->obs_history);
+      status != kOK) {
     LOG(ERROR)
         << "Failed to upgrade AggregatedObservationHistoryStore to current version with status "
         << status << ".\nProceeding with empty AggregatedObservationHistoryStore.";
-    obs_history_ = MakeNewObservationHistoryStore();
+    locked_obs_history->obs_history = MakeNewObservationHistoryStore();
   }
 }
 
@@ -349,7 +351,8 @@
 }
 
 Status AggregateStore::BackUpObservationHistory() {
-  auto status = obs_history_proto_store_->Write(obs_history_);
+  auto obs_history = protected_obs_history_.lock()->obs_history;
+  auto status = obs_history_proto_store_->Write(obs_history);
   if (!status.ok()) {
     LOG(ERROR) << "Failed to back up the AggregatedObservationHistoryStore. "
                   "::cobalt::util::Status error code: "
@@ -614,11 +617,12 @@
 
 }  // namespace
 
-uint32_t AggregateStore::UniqueActivesLastGeneratedDayIndex(const std::string& report_key,
-                                                            uint32_t event_code,
-                                                            uint32_t aggregation_days) const {
-  auto report_history = obs_history_.by_report_key().find(report_key);
-  if (report_history == obs_history_.by_report_key().end()) {
+uint32_t AggregateStore::GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+                                                               uint32_t event_code,
+                                                               uint32_t aggregation_days) const {
+  auto obs_history = protected_obs_history_.const_lock()->obs_history;
+  auto report_history = obs_history.by_report_key().find(report_key);
+  if (report_history == obs_history.by_report_key().end()) {
     return 0u;
   }
   auto event_code_history =
@@ -633,6 +637,17 @@
   return window_history->second;
 }
 
+void AggregateStore::SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key,
+                                                           uint32_t event_code,
+                                                           uint32_t aggregation_days,
+                                                           uint32_t value) {
+  auto locked = protected_obs_history_.lock();
+  (*(*(*locked->obs_history.mutable_by_report_key())[report_key]
+          .mutable_unique_actives_history()
+          ->mutable_by_event_code())[event_code]
+        .mutable_by_window_size())[aggregation_days] = value;
+}
+
 Status AggregateStore::GenerateSingleUniqueActivesObservation(
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     uint32_t event_code, const OnDeviceAggregationWindow& window, bool was_active) const {
@@ -688,7 +703,7 @@
       // been generated for this report, event code, and window size. If
       // that day index is later than |final_day_index|, no Observation is
       // generated on this invocation.
-      auto last_gen = UniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days());
+      auto last_gen = GetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days());
       auto first_day_index = std::max(last_gen + 1, backfill_period_start);
       // The latest day index on which |event_type| is known to have
       // occurred, so far. This value will be updated as we search
@@ -721,12 +736,8 @@
         if (status != kOK) {
           return status;
         }
-        // Update |obs_history_| with the latest date of Observation
-        // generation for this report, event code, and window size.
-        (*(*(*obs_history_.mutable_by_report_key())[report_key]
-                .mutable_unique_actives_history()
-                ->mutable_by_event_code())[event_code]
-              .mutable_by_window_size())[window.days()] = obs_day_index;
+
+        SetUniqueActivesLastGeneratedDayIndex(report_key, event_code, window.days(), obs_day_index);
       }
     }
   }
@@ -735,12 +746,13 @@
 
 ////////// GenerateObsFromNumericAggregates and helper methods /////////////
 
-uint32_t AggregateStore::PerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
-                                                               const std::string& component,
-                                                               uint32_t event_code,
-                                                               uint32_t aggregation_days) const {
-  const auto& report_history = obs_history_.by_report_key().find(report_key);
-  if (report_history == obs_history_.by_report_key().end()) {
+uint32_t AggregateStore::GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+                                                                  const std::string& component,
+                                                                  uint32_t event_code,
+                                                                  uint32_t aggregation_days) const {
+  auto obs_history = protected_obs_history_.const_lock()->obs_history;
+  const auto& report_history = obs_history.by_report_key().find(report_key);
+  if (report_history == obs_history.by_report_key().end()) {
     return 0u;
   }
   if (!report_history->second.has_per_device_numeric_history()) {
@@ -763,15 +775,37 @@
   return window_history->second;
 }
 
-uint32_t AggregateStore::ReportParticipationLastGeneratedDayIndex(
+void AggregateStore::SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+                                                              const std::string& component,
+                                                              uint32_t event_code,
+                                                              uint32_t aggregation_days,
+                                                              uint32_t value) {
+  auto locked = protected_obs_history_.lock();
+  (*(*(*(*locked->obs_history.mutable_by_report_key())[report_key]
+            .mutable_per_device_numeric_history()
+            ->mutable_by_component())[component]
+          .mutable_by_event_code())[event_code]
+        .mutable_by_window_size())[aggregation_days] = value;
+}
+
+uint32_t AggregateStore::GetReportParticipationLastGeneratedDayIndex(
     const std::string& report_key) const {
-  const auto& report_history = obs_history_.by_report_key().find(report_key);
-  if (report_history == obs_history_.by_report_key().end()) {
+  auto obs_history = protected_obs_history_.const_lock()->obs_history;
+  const auto& report_history = obs_history.by_report_key().find(report_key);
+  if (report_history == obs_history.by_report_key().end()) {
     return 0u;
   }
   return report_history->second.report_participation_history().last_generated();
 }
 
+void AggregateStore::SetReportParticipationLastGeneratedDayIndex(const std::string& report_key,
+                                                                 uint32_t value) {
+  auto locked = protected_obs_history_.lock();
+  (*locked->obs_history.mutable_by_report_key())[report_key]
+      .mutable_report_participation_history()
+      ->set_last_generated(value);
+}
+
 Status AggregateStore::GenerateSinglePerDeviceNumericObservation(
     const MetricRef metric_ref, const ReportDefinition* report, uint32_t obs_day_index,
     const std::string& component, uint32_t event_code, const OnDeviceAggregationWindow& window,
@@ -862,8 +896,8 @@
           LOG(INFO) << "Skipping unsupported aggregation window.";
           continue;
         }
-        auto last_gen =
-            PerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days());
+        auto last_gen = GetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code,
+                                                                 window.days());
         auto first_day_index = std::max(last_gen + 1, backfill_period_start);
         for (auto obs_day_index = first_day_index; obs_day_index <= final_day_index;
              obs_day_index++) {
@@ -956,27 +990,21 @@
                 return kInvalidArguments;
             }
           }
-          // Update |obs_history_| with the latest date of Observation
-          // generation for this report, component, event code, and window.
-          (*(*(*(*obs_history_.mutable_by_report_key())[report_key]
-                    .mutable_per_device_numeric_history()
-                    ->mutable_by_component())[component]
-                  .mutable_by_event_code())[event_code]
-                .mutable_by_window_size())[window.days()] = obs_day_index;
+
+          SetPerDeviceNumericLastGeneratedDayIndex(report_key, component, event_code, window.days(),
+                                                   obs_day_index);
         }
       }
     }
   }
   // Generate any necessary ReportParticipationObservations for this report.
-  auto participation_last_gen = ReportParticipationLastGeneratedDayIndex(report_key);
+  auto participation_last_gen = GetReportParticipationLastGeneratedDayIndex(report_key);
   auto participation_first_day_index = std::max(participation_last_gen + 1, backfill_period_start);
   for (auto obs_day_index = participation_first_day_index; obs_day_index <= final_day_index;
        obs_day_index++) {
     GenerateSingleReportParticipationObservation(
         metric_ref, &report_aggregates.aggregation_config().report(), obs_day_index);
-    (*obs_history_.mutable_by_report_key())[report_key]
-        .mutable_report_participation_history()
-        ->set_last_generated(obs_day_index);
+    SetReportParticipationLastGeneratedDayIndex(report_key, obs_day_index);
   }
   return kOK;
 }
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 7875d43..2cd0929 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -146,6 +146,45 @@
   logger::Status GenerateObservations(uint32_t final_day_index_utc,
                                       uint32_t final_day_index_local = 0u);
 
+  // Returns the most recent day index for which an Observation was generated
+  // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
+  // according to |protected_obs_history|. Returns 0 if no Observation has been generated
+  // for the given arguments.
+  uint32_t GetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
+                                                 uint32_t aggregation_days) const;
+
+  // Sets the most recent day index for which an Observation was generated
+  // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
+  // to |value| in |protected_obs_history|
+  void SetUniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
+                                             uint32_t aggregation_days, uint32_t value);
+
+  // Returns the most recent day index for which an Observation was generated for a given
+  // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
+  // aggregation window, according to |protected_obs_history|. Returns 0 if no Observation has been
+  // generated for the given arguments.
+  uint32_t GetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+                                                    const std::string& component,
+                                                    uint32_t event_code,
+                                                    uint32_t aggregation_days) const;
+
+  // Sets the most recent day index for which an Observation was generated for a given
+  // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
+  // aggregation window, according to |protected_obs_history|.
+  void SetPerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
+                                                const std::string& component, uint32_t event_code,
+                                                uint32_t aggregation_days, uint32_t value);
+
+  // Returns the most recent day index for which a
+  // ReportParticipationObservation was generated for a given report, according
+  // to |obs_history_|. Returns 0 if no Observation has been generated for the
+  // given arguments.
+  uint32_t GetReportParticipationLastGeneratedDayIndex(const std::string& report_key) const;
+
+  // Set the most recent day index for which a ReportParticipationObservation was generated for a
+  // given report to |value|, according to |protected_obs_history|
+  void SetReportParticipationLastGeneratedDayIndex(const std::string& report_key, uint32_t value);
+
  private:
   friend class EventAggregator;  // used for transition during redesign.
   friend class AggregateStoreTest;
@@ -185,27 +224,6 @@
   logger::Status MaybeUpgradeLocalAggregateStore(LocalAggregateStore* store);
   logger::Status MaybeUpgradeObservationHistoryStore(AggregatedObservationHistoryStore* store);
 
-  // Returns the most recent day index for which an Observation was generated
-  // for a given UNIQUE_N_DAY_ACTIVES report, event code, and day-based aggregation window,
-  // according to |obs_history_|. Returns 0 if no Observation has been generated
-  // for the given arguments.
-  uint32_t UniqueActivesLastGeneratedDayIndex(const std::string& report_key, uint32_t event_code,
-                                              uint32_t aggregation_days) const;
-
-  // Returns the most recent day index for which an Observation was generated for a given
-  // PER_DEVICE_NUMERIC_STATS or PER_DEVICE_HISTOGRAM report, component, event code, and day-based
-  // aggregation window, according to |obs_history_|. Returns 0 if no Observation has been generated
-  // for the given arguments.
-  uint32_t PerDeviceNumericLastGeneratedDayIndex(const std::string& report_key,
-                                                 const std::string& component, uint32_t event_code,
-                                                 uint32_t aggregation_days) const;
-
-  // Returns the most recent day index for which a
-  // ReportParticipationObservation was generated for a given report, according
-  // to |obs_history_|. Returns 0 if no Observation has been generated for the
-  // given arguments.
-  uint32_t ReportParticipationLastGeneratedDayIndex(const std::string& report_key) const;
-
   // For a fixed report of type UNIQUE_N_DAY_ACTIVES, generates an Observation
   // for each event code of the parent metric, for each day-based aggregation window of the
   // report ending on |final_day_index|, unless an Observation with those parameters was generated
@@ -282,14 +300,25 @@
     LocalAggregateStore local_aggregate_store;
   };
 
-  const logger::Encoder* encoder_;
-  const logger::ObservationWriter* observation_writer_;
-  util::ConsistentProtoStore* local_aggregate_proto_store_;
-  util::ConsistentProtoStore* obs_history_proto_store_;
-  util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
-  // Not protected by a mutex. Should only be accessed by the Event Aggregator's |worker_thread_|.
-  AggregatedObservationHistoryStore obs_history_;
+  struct AggregatedObservationHistoryStoreFields {
+    AggregatedObservationHistoryStore obs_history;
+  };
+
+  // The number of past days for which the AggregateStore generates and sends Observations, in
+  // addition to a requested day index.
   size_t backfill_days_ = 0;
+
+  // Objects used to generate observations.
+  const logger::Encoder* encoder_;                       // not owned
+  const logger::ObservationWriter* observation_writer_;  // not owned
+
+  // Used for loading and backing up the proto stores to disk.
+  util::ConsistentProtoStore* local_aggregate_proto_store_;  // not owned
+  util::ConsistentProtoStore* obs_history_proto_store_;      // not owned
+
+  // In memory store of local aggregations and data needed to derive them.
+  util::ProtectedFields<AggregateStoreFields> protected_aggregate_store_;
+  util::ProtectedFields<AggregatedObservationHistoryStoreFields> protected_obs_history_;
 };
 
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index 5237df2..62d1482 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -1278,6 +1278,43 @@
                                    "", kTestEventCode, kTestDayIndex, /*value*/ 4));
 }
 
+TEST_F(AggregateStoreTest, SetUniqueActivesLastGeneratedDayIndex) {
+  const std::string kReportKey = "test_key";
+  const int64_t kFirstValue = 3;
+
+  EXPECT_EQ(0u,
+            GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                                       /*aggregation_days*/ 1));
+  GetAggregateStore()->SetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                             /*aggregation_days*/ 1, kFirstValue);
+  EXPECT_EQ(kFirstValue,
+            GetAggregateStore()->GetUniqueActivesLastGeneratedDayIndex(kReportKey, kTestEventCode,
+                                                                       /*aggregation_days*/ 1));
+}
+TEST_F(AggregateStoreTest, SetPerDeviceNumericLastGeneratedDayIndex) {
+  const std::string kReportKey = "test_key";
+  const int64_t kFirstValue = 3;
+
+  EXPECT_EQ(0u, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+                    kReportKey, "", kTestEventCode,
+                    /*aggregation_days*/ 1));
+  GetAggregateStore()->SetPerDeviceNumericLastGeneratedDayIndex(kReportKey, "", kTestEventCode,
+                                                                /*aggregation_days*/ 1,
+                                                                kFirstValue);
+  EXPECT_EQ(kFirstValue, GetAggregateStore()->GetPerDeviceNumericLastGeneratedDayIndex(
+                             kReportKey, "", kTestEventCode,
+                             /*aggregation_days*/ 1));
+}
+TEST_F(AggregateStoreTest, SetReportParticipationLastGeneratedDayIndex) {
+  const std::string kReportKey = "test_key";
+  const int64_t kFirstValue = 3;
+
+  EXPECT_EQ(0u, GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+  GetAggregateStore()->SetReportParticipationLastGeneratedDayIndex(kReportKey, kFirstValue);
+  EXPECT_EQ(kFirstValue,
+            GetAggregateStore()->GetReportParticipationLastGeneratedDayIndex(kReportKey));
+}
+
 // Tests that EventAggregator::GenerateObservations() returns a positive
 // status and that the expected number of Observations is generated when no
 // Events have been logged to the EventAggregator.