[cobalt][LocalAggregation] Remove direct store access from
AddNumericEvent()

- adds a function to aggregation_utils to handle the logic of updating
a known aggregate. This function will later be used by
GenerateObservations as well.
- currently the EventAggregation directly locks and and modifies the
AggregateStore's internal state. This CL moves the responsibility of
locking and setting the correct state of the store to the Aggregate Store.
- for more information see go/cobalt-local-aggregation-redesign

Bug: 40853
Change-Id: Ic7c1e504ceac70100cb8318b51de4995b5eb0899
diff --git a/src/local_aggregation/BUILD.gn b/src/local_aggregation/BUILD.gn
index 4266f01..8a9db2a 100644
--- a/src/local_aggregation/BUILD.gn
+++ b/src/local_aggregation/BUILD.gn
@@ -29,6 +29,8 @@
   ]
 
   public_deps = [
+    "$cobalt_root/src:logging",
+    "$cobalt_root/src/logger:status",
     "$cobalt_root/src/registry:cobalt_registry_proto",
   ]
 
diff --git a/src/local_aggregation/aggregate_store.cc b/src/local_aggregation/aggregate_store.cc
index 01aa746..c9dc50e 100644
--- a/src/local_aggregation/aggregate_store.cc
+++ b/src/local_aggregation/aggregate_store.cc
@@ -287,6 +287,45 @@
   return kOK;
 }
 
+Status AggregateStore::UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id,
+                                              uint32_t metric_id, uint32_t report_id,
+                                              const std::string& component, uint64_t event_code,
+                                              uint32_t day_index, int64_t value) {
+  std::string report_key;
+  if (!PopulateReportKey(customer_id, project_id, metric_id, report_id, &report_key)) {
+    return kInvalidArguments;
+  }
+
+  auto locked = protected_aggregate_store_.lock();
+  auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(report_key);
+  if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) {
+    LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
+    return kInvalidArguments;
+  }
+  if (!aggregates->second.has_numeric_aggregates()) {
+    LOG(ERROR) << "The local aggregates for this report key are not of a "
+                  "compatible type.";
+    return kInvalidArguments;
+  }
+
+  auto aggregates_by_day =
+      (*(*aggregates->second.mutable_numeric_aggregates()->mutable_by_component())[component]
+            .mutable_by_event_code())[event_code]
+          .mutable_by_day_index();
+  bool has_stored_aggregate = ((*aggregates_by_day).find(day_index) != aggregates_by_day->end());
+  auto day_aggregate = (*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate();
+
+  auto [status, updated_value] = GetUpdatedAggregate(
+      aggregates->second.aggregation_config().report().aggregation_type(),
+      has_stored_aggregate ? std::optional<int64_t>{day_aggregate->value()} : std::nullopt, value);
+  if (status == kOK) {
+    day_aggregate->set_value(updated_value);
+    return kOK;
+  }
+
+  return status;
+}
+
 RepeatedField<uint32_t> UnpackEventCodesProto(uint64_t packed_event_codes) {
   RepeatedField<uint32_t> fields;
   for (auto code : config::UnpackEventCodes(packed_event_codes)) {
diff --git a/src/local_aggregation/aggregate_store.h b/src/local_aggregation/aggregate_store.h
index 5166e34..d3222d2 100644
--- a/src/local_aggregation/aggregate_store.h
+++ b/src/local_aggregation/aggregate_store.h
@@ -95,6 +95,15 @@
   logger::Status SetActive(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
                            uint32_t report_id, uint64_t event_code, uint32_t day_index);
 
+  // Updates the LocalAggregateStore by adding |value| to the current daily aggregate in the bucket
+  // indexed by |customer_id|, |project_id|, |metric_id|, |report_id|, |component|, |event_code| and
+  // |day_index|. Expects that MaybeInsertReportConfig() has been called previously for the ids
+  // being passed. Returns kInvalidArguments if the operation fails, and kOK otherwise.
+  logger::Status UpdateNumericAggregate(uint32_t customer_id, uint32_t project_id,
+                                        uint32_t metric_id, uint32_t report_id,
+                                        const std::string& component, uint64_t event_code,
+                                        uint32_t day_index, int64_t value);
+
   // Writes a snapshot of the LocalAggregateStore to
   // |local_aggregate_proto_store_|.
   logger::Status BackUpLocalAggregateStore();
diff --git a/src/local_aggregation/aggregate_store_test.cc b/src/local_aggregation/aggregate_store_test.cc
index a074d7b..5237df2 100644
--- a/src/local_aggregation/aggregate_store_test.cc
+++ b/src/local_aggregation/aggregate_store_test.cc
@@ -87,6 +87,8 @@
 constexpr uint32_t kTestProjectId = 123;
 constexpr uint32_t kTestMetricId = 1;
 constexpr uint32_t kTestReportId = 3;
+constexpr uint64_t kTestEventCode = 1;
+constexpr uint32_t kTestDayIndex = 56;
 
 // A map keyed by base64-encoded, serialized ReportAggregationKeys. The value at
 // a key is a map of event codes to sets of day indices. Used in tests as
@@ -112,17 +114,29 @@
                                           std::move(project_config));
 }
 
-MetricDefinition GetEventOccurredMetric(uint32_t customer_id, uint32_t project_id,
-                                        uint32_t metric_id) {
+MetricDefinition GetMetricWithId(uint32_t customer_id, uint32_t project_id, uint32_t metric_id) {
   MetricDefinition metric_definition;
   metric_definition.set_metric_name("test_metric");
   metric_definition.set_id(metric_id);
   metric_definition.set_customer_id(customer_id);
   metric_definition.set_project_id(project_id);
+  return metric_definition;
+}
+
+MetricDefinition GetEventOccurredMetric(uint32_t customer_id, uint32_t project_id,
+                                        uint32_t metric_id) {
+  MetricDefinition metric_definition = GetMetricWithId(customer_id, project_id, metric_id);
   metric_definition.set_metric_type(MetricDefinition::EVENT_OCCURRED);
   return metric_definition;
 }
 
+MetricDefinition GetEventCountMetric(uint32_t customer_id, uint32_t project_id,
+                                     uint32_t metric_id) {
+  MetricDefinition metric_definition = GetMetricWithId(customer_id, project_id, metric_id);
+  metric_definition.set_metric_type(MetricDefinition::EVENT_COUNT);
+  return metric_definition;
+}
+
 std::tuple<MetricDefinition, ReportDefinition> GetUniqueActivesMetricAndReport(uint32_t customer_id,
                                                                                uint32_t project_id,
                                                                                uint32_t metric_id,
@@ -139,6 +153,22 @@
   return std::make_tuple(metric_definition, report_definition);
 }
 
+std::tuple<MetricDefinition, ReportDefinition> GetPerDeviceNumericStatsMetricAndReport(
+    uint32_t customer_id, uint32_t project_id, uint32_t metric_id, uint32_t report_id,
+    ReportDefinition::OnDeviceAggregationType aggregation_type) {
+  ReportDefinition report_definition;
+  report_definition.set_report_name("test_per_device_numeric_stats_report");
+  report_definition.set_id(report_id);
+  report_definition.set_report_type(ReportDefinition::PER_DEVICE_NUMERIC_STATS);
+  report_definition.set_aggregation_type(aggregation_type);
+  *report_definition.add_aggregation_window() = MakeDayWindow(1);
+
+  MetricDefinition metric_definition = GetEventCountMetric(customer_id, project_id, metric_id);
+  *metric_definition.add_reports() = report_definition;
+
+  return std::make_tuple(metric_definition, report_definition);
+}
+
 std::tuple<MetricDefinition, ReportDefinition> GetNotLocallyAggregatedMetricAndReport(
     uint32_t customer_id, uint32_t project_id, uint32_t metric_id, uint32_t report_id) {
   ReportDefinition report_definition;
@@ -292,6 +322,42 @@
     return by_day_index->second.activity_daily_aggregate().activity_indicator();
   }
 
+  std::optional<int64_t> GetValue(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
+                                  uint32_t report_id, const std::string& component,
+                                  uint64_t event_code, uint32_t day_index) {
+    std::string key;
+    ReportAggregationKey key_data;
+    key_data.set_customer_id(customer_id);
+    key_data.set_project_id(project_id);
+    key_data.set_metric_id(metric_id);
+    key_data.set_report_id(report_id);
+    SerializeToBase64(key_data, &key);
+
+    auto locked = event_aggregator_->aggregate_store_->protected_aggregate_store_.lock();
+
+    auto aggregates = locked->local_aggregate_store.by_report_key().find(key);
+    if (aggregates == locked->local_aggregate_store.by_report_key().end()) {
+      return std::nullopt;
+    }
+
+    auto by_component = aggregates->second.numeric_aggregates().by_component().find(component);
+    if (by_component == aggregates->second.numeric_aggregates().by_component().end()) {
+      return std::nullopt;
+    }
+
+    auto by_event_code = by_component->second.by_event_code().find(event_code);
+    if (by_event_code == by_component->second.by_event_code().end()) {
+      return std::nullopt;
+    }
+
+    auto by_day_index = by_event_code->second.by_day_index().find(day_index);
+    if (by_day_index == by_event_code->second.by_day_index().end()) {
+      return std::nullopt;
+    }
+
+    return by_day_index->second.numeric_daily_aggregate().value();
+  }
+
   AggregateStore* GetAggregateStore() { return event_aggregator_->aggregate_store_.get(); }
 
   Status GarbageCollect(uint32_t day_index_utc, uint32_t day_index_local = 0u) {
@@ -1087,17 +1153,15 @@
   auto [metric, report] = GetUniqueActivesMetricAndReport(kTestCustomerId, kTestProjectId,
                                                           kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
-  const uint64_t kEventCode = 1;
-  const uint32_t kDayIndex = 56;
 
   EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
 
-  EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, kEventCode,
-                        kDayIndex));
+  EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                        kTestEventCode, kTestDayIndex));
   ASSERT_EQ(kOK, GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
-                                                kTestReportId, kEventCode, kDayIndex));
-  EXPECT_TRUE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, kEventCode,
-                       kDayIndex));
+                                                kTestReportId, kTestEventCode, kTestDayIndex));
+  EXPECT_TRUE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                       kTestEventCode, kTestDayIndex));
 }
 
 // SetActive returns kOK when the same config is inserted twice.
@@ -1105,19 +1169,17 @@
   auto [metric, report] = GetUniqueActivesMetricAndReport(kTestCustomerId, kTestProjectId,
                                                           kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
-  const uint64_t kEventCode = 1;
-  const uint32_t kDayIndex = 56;
 
   EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
-  EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, kEventCode,
-                        kDayIndex));
+  EXPECT_FALSE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                        kTestEventCode, kTestDayIndex));
 
   ASSERT_EQ(kOK, GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
-                                                kTestReportId, kEventCode, kDayIndex));
+                                                kTestReportId, kTestEventCode, kTestDayIndex));
   ASSERT_EQ(kOK, GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
-                                                kTestReportId, kEventCode, kDayIndex));
-  EXPECT_TRUE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, kEventCode,
-                       kDayIndex));
+                                                kTestReportId, kTestEventCode, kTestDayIndex));
+  EXPECT_TRUE(IsActive(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                       kTestEventCode, kTestDayIndex));
 }
 
 // SetActive fails due to the fact that UpdateAggregationConfigs was not called for the given report
@@ -1126,13 +1188,94 @@
   auto [metric, report] = GetNotLocallyAggregatedMetricAndReport(kTestCustomerId, kTestProjectId,
                                                                  kTestMetricId, kTestReportId);
   auto project_context = GetProjectContextFor(metric);
-  const uint64_t kEventCode = 1;
-  const uint32_t kDayIndex = 56;
 
   EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
   ASSERT_EQ(kInvalidArguments,
             GetAggregateStore()->SetActive(kTestCustomerId, kTestProjectId, kTestMetricId,
-                                           kTestReportId, kEventCode, kDayIndex));
+                                           kTestReportId, kTestEventCode, kTestDayIndex));
+}
+
+// The Aggregate store updates the local store with the sum of two values when the aggregation is of
+// type SUM.
+TEST_F(AggregateStoreTest, UpdateAggregationSUM) {
+  auto [metric, report] = GetPerDeviceNumericStatsMetricAndReport(
+      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, ReportDefinition::SUM);
+  auto project_context = GetProjectContextFor(metric);
+
+  const int64_t kFirstValue = 3;
+  const int64_t kSecondValue = 7;
+
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kFirstValue));
+  EXPECT_EQ(kFirstValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                                  kTestEventCode, kTestDayIndex));
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kSecondValue));
+  EXPECT_EQ(kSecondValue + kFirstValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId,
+                                                 kTestReportId, "", kTestEventCode, kTestDayIndex));
+}
+
+// The Aggregate store updates the local store with the sum of two values when the aggregation is of
+// type MAX.
+TEST_F(AggregateStoreTest, UpdateAggregationMAX) {
+  auto [metric, report] = GetPerDeviceNumericStatsMetricAndReport(
+      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, ReportDefinition::MAX);
+  auto project_context = GetProjectContextFor(metric);
+
+  const int64_t kFirstValue = 3;
+  const int64_t kSecondValue = 7;
+
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kFirstValue));
+  EXPECT_EQ(kFirstValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                                  kTestEventCode, kTestDayIndex));
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kSecondValue));
+  EXPECT_EQ(kSecondValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                                   "", kTestEventCode, kTestDayIndex));
+}
+// The Aggregate store updates the local store with the sum of two values when the aggregation is of
+// type MIN.
+TEST_F(AggregateStoreTest, UpdateAggregationMIN) {
+  auto [metric, report] = GetPerDeviceNumericStatsMetricAndReport(
+      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, ReportDefinition::MIN);
+  auto project_context = GetProjectContextFor(metric);
+
+  const int64_t kFirstValue = 3;
+  const int64_t kSecondValue = 7;
+
+  EXPECT_EQ(kOK, event_aggregator_->UpdateAggregationConfigs(*project_context));
+
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kFirstValue));
+  EXPECT_EQ(kFirstValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                                  kTestEventCode, kTestDayIndex));
+  EXPECT_EQ(kOK, GetAggregateStore()->UpdateNumericAggregate(
+                     kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                     kTestEventCode, kTestDayIndex, kSecondValue));
+  EXPECT_EQ(kFirstValue, GetValue(kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, "",
+                                  kTestEventCode, kTestDayIndex));
+}
+
+// UpdateAggregationConfigs fails due to the fact that UpdateAggregationConfigs was not called for
+// the given report.
+TEST_F(AggregateStoreTest, UpdateAggregationFail) {
+  auto [metric, report] = GetPerDeviceNumericStatsMetricAndReport(
+      kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId, ReportDefinition::SUM);
+  auto project_context = GetProjectContextFor(metric);
+
+  ASSERT_EQ(kInvalidArguments, GetAggregateStore()->UpdateNumericAggregate(
+                                   kTestCustomerId, kTestProjectId, kTestMetricId, kTestReportId,
+                                   "", kTestEventCode, kTestDayIndex, /*value*/ 4));
 }
 
 // Tests that EventAggregator::GenerateObservations() returns a positive
diff --git a/src/local_aggregation/aggregation_utils.cc b/src/local_aggregation/aggregation_utils.cc
index 3cedb76..4dc4103 100644
--- a/src/local_aggregation/aggregation_utils.cc
+++ b/src/local_aggregation/aggregation_utils.cc
@@ -3,8 +3,15 @@
 // found in the LICENSE file.
 #include "src/local_aggregation/aggregation_utils.h"
 
+#include <optional>
+
+#include "src/logging.h"
+
 namespace cobalt::local_aggregation {
 
+using logger::kInvalidArguments;
+using logger::kOK;
+
 OnDeviceAggregationWindow MakeDayWindow(int num_days) {
   OnDeviceAggregationWindow window;
   window.set_days(AggregationDays(num_days));
@@ -17,4 +24,23 @@
   return window;
 }
 
+std::tuple<logger::Status, int64_t> GetUpdatedAggregate(
+    ReportDefinition::OnDeviceAggregationType aggregation_type,
+    std::optional<int64_t> stored_aggregate, int64_t new_value) {
+  if (!stored_aggregate.has_value()) {
+    return std::make_tuple(kOK, new_value);
+  }
+
+  switch (aggregation_type) {
+    case ReportDefinition::SUM:
+      return std::make_tuple(kOK, new_value + stored_aggregate.value());
+    case ReportDefinition::MAX:
+      return std::make_tuple(kOK, std::max(new_value, stored_aggregate.value()));
+    case ReportDefinition::MIN:
+      return std::make_tuple(kOK, std::min(new_value, stored_aggregate.value()));
+    default:
+      LOG(ERROR) << "Unexpected aggregation type " << aggregation_type;
+      return std::make_tuple(kInvalidArguments, 0);
+  }
+}
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/aggregation_utils.h b/src/local_aggregation/aggregation_utils.h
index 7442beb..1e4ff3a 100644
--- a/src/local_aggregation/aggregation_utils.h
+++ b/src/local_aggregation/aggregation_utils.h
@@ -4,7 +4,11 @@
 #ifndef COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_UTILS_H_
 #define COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_UTILS_H_
 
+#include <optional>
+
+#include "src/logger/status.h"
 #include "src/registry/aggregation_window.pb.h"
+#include "src/registry/report_definition.pb.h"
 
 namespace cobalt::local_aggregation {
 
@@ -14,6 +18,19 @@
 // Creates and returns an OnDeviceAggregationWindow of |num_hours| hours.
 OnDeviceAggregationWindow MakeHourWindow(int num_hours);
 
+// Encapsulates the logic for how a stored numeric aggregate should be updated given the
+// |aggregation_type|. Returns an updated value if the status is kOK. Returns an error and 0
+// otherwise.
+//
+// aggregation_type: the aggregation_type found in the ReportDefinition of the event aggregate being
+//                   updated.
+// stored_aggregate: the value stored in the AggregateStore. Should not contain a value if no
+//                   aggregate exists.
+// new_value: the value with which the aggregate should be updated.
+std::tuple<logger::Status, int64_t> GetUpdatedAggregate(
+    ReportDefinition::OnDeviceAggregationType aggregation_type,
+    std::optional<int64_t> stored_aggregate, int64_t new_value);
+
 }  // namespace cobalt::local_aggregation
 
 #endif  // COBALT_SRC_LOCAL_AGGREGATION_AGGREGATION_UTILS_H_
diff --git a/src/local_aggregation/aggregation_utils_test.cc b/src/local_aggregation/aggregation_utils_test.cc
index ed9a316..fccbe7c 100644
--- a/src/local_aggregation/aggregation_utils_test.cc
+++ b/src/local_aggregation/aggregation_utils_test.cc
@@ -3,10 +3,14 @@
 // found in the LICENSE file.
 #include "src/local_aggregation/aggregation_utils.h"
 
+#include <optional>
+
 #include "third_party/googletest/googletest/include/gtest/gtest.h"
 
 namespace cobalt::local_aggregation {
 
+using logger::kOK;
+
 TEST(AggregationUtilsTest, MakeDayWindow) {
   const int kSevenDays = 7;
   auto window = MakeDayWindow(kSevenDays);
@@ -21,4 +25,93 @@
   EXPECT_EQ(kOneHour, window.hours());
 }
 
+/*************************************GetUpdatedAggregate*****************************************/
+
+/*** SUM tests ***/
+
+TEST(AggregationUtilsTest, GetUpdatedAggregateSumNoPriorValue) {
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::SUM, std::nullopt, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue, updated_value);
+}
+
+TEST(AggregationUtilsTest, GetUpdatedAggregateSumWithPriorValue) {
+  const int64_t kStoredAggregate = 45;
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::SUM, kStoredAggregate, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue + kStoredAggregate, updated_value);
+}
+
+/*** MAX tests ***/
+
+TEST(AggregationUtilsTest, GetUpdatedAggregateMaxNoPriorValue) {
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MAX, std::nullopt, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue, updated_value);
+}
+
+// Check that the function does not update the aggregate if it is larger than the new value.
+TEST(AggregationUtilsTest, GetUpdatedAggregateMaxWithLargerPriorValue) {
+  const int64_t kStoredAggregate = 45;
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MAX, kStoredAggregate, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kStoredAggregate, updated_value);
+}
+
+// Check that the function updates the aggregate if it is smaller than the new value.
+TEST(AggregationUtilsTest, GetUpdatedAggregateMaxWithSmallerPriorValue) {
+  const int64_t kStoredAggregate = 2;
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MAX, kStoredAggregate, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue, updated_value);
+}
+
+/*** MIN tests ***/
+
+TEST(AggregationUtilsTest, GetUpdatedAggregateMinNoPriorValue) {
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MIN, std::nullopt, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue, updated_value);
+}
+
+// Check that the function does not update the aggregate if it is smaller than the new value.
+TEST(AggregationUtilsTest, GetUpdatedAggregateMinWithSmallerPriorValue) {
+  const int64_t kStoredAggregate = 2;
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MIN, kStoredAggregate, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kStoredAggregate, updated_value);
+}
+
+// Check that the function updates the aggregate if it is larger than the new value.
+TEST(AggregationUtilsTest, GetUpdatedAggregateMinWithLargerPriorValue) {
+  const int64_t kStoredAggregate = 45;
+  const int64_t kNewValue = 4;
+  auto [status, updated_value] =
+      GetUpdatedAggregate(ReportDefinition::MIN, kStoredAggregate, kNewValue);
+
+  EXPECT_EQ(kOK, status);
+  EXPECT_EQ(kNewValue, updated_value);
+}
+
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation/event_aggregator.cc b/src/local_aggregation/event_aggregator.cc
index 1bd30f6..f0a2e38 100644
--- a/src/local_aggregation/event_aggregator.cc
+++ b/src/local_aggregation/event_aggregator.cc
@@ -25,26 +25,9 @@
 using logger::ProjectContext;
 using logger::Status;
 using util::ConsistentProtoStore;
-using util::SerializeToBase64;
 using util::SteadyClock;
 using util::TimeToDayIndex;
 
-namespace {
-
-// Populates a ReportAggregationKey proto message and then populates a string
-// with the base64 encoding of the serialized proto.
-bool PopulateReportKey(uint32_t customer_id, uint32_t project_id, uint32_t metric_id,
-                       uint32_t report_id, std::string* key) {
-  ReportAggregationKey key_data;
-  key_data.set_customer_id(customer_id);
-  key_data.set_project_id(project_id);
-  key_data.set_metric_id(metric_id);
-  key_data.set_report_id(report_id);
-  return SerializeToBase64(key_data, key);
-}
-
-}  // namespace
-
 EventAggregator::EventAggregator(const Encoder* encoder,
                                  const ObservationWriter* observation_writer,
                                  ConsistentProtoStore* local_aggregate_proto_store,
@@ -155,15 +138,13 @@
   if (!ValidateEventType(Event::kCountEvent, *event)) {
     return kInvalidArguments;
   }
+
   auto* metric = event_record.metric();
-  std::string key;
-  if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id,
-                         &key)) {
-    return kInvalidArguments;
-  }
   const CountEvent& count_event = event->count_event();
-  return AddNumericEvent(key, event->day_index(), count_event.component(),
-                         config::PackEventCodes(count_event.event_code()), count_event.count());
+
+  return aggregate_store_->UpdateNumericAggregate(
+      metric->customer_id(), metric->project_id(), metric->id(), report_id, count_event.component(),
+      config::PackEventCodes(count_event.event_code()), event->day_index(), count_event.count());
 }
 
 Status EventAggregator::AddElapsedTimeEvent(uint32_t report_id, const EventRecord& event_record) {
@@ -171,16 +152,14 @@
   if (!ValidateEventType(Event::kElapsedTimeEvent, *event)) {
     return kInvalidArguments;
   }
-  std::string key;
+
   auto* metric = event_record.metric();
-  if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id,
-                         &key)) {
-    return kInvalidArguments;
-  }
   const ElapsedTimeEvent& elapsed_time_event = event->elapsed_time_event();
-  return AddNumericEvent(key, event->day_index(), elapsed_time_event.component(),
-                         config::PackEventCodes(elapsed_time_event.event_code()),
-                         elapsed_time_event.elapsed_micros());
+
+  return aggregate_store_->UpdateNumericAggregate(
+      metric->customer_id(), metric->project_id(), metric->id(), report_id,
+      elapsed_time_event.component(), config::PackEventCodes(elapsed_time_event.event_code()),
+      event->day_index(), elapsed_time_event.elapsed_micros());
 }
 
 Status EventAggregator::AddFrameRateEvent(uint32_t report_id, const EventRecord& event_record) {
@@ -188,16 +167,13 @@
   if (!ValidateEventType(Event::kFrameRateEvent, *event)) {
     return kInvalidArguments;
   }
-  std::string key;
   auto* metric = event_record.metric();
-  if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id,
-                         &key)) {
-    return kInvalidArguments;
-  }
   const FrameRateEvent& frame_rate_event = event->frame_rate_event();
-  return AddNumericEvent(key, event->day_index(), frame_rate_event.component(),
-                         config::PackEventCodes(frame_rate_event.event_code()),
-                         frame_rate_event.frames_per_1000_seconds());
+
+  return aggregate_store_->UpdateNumericAggregate(
+      metric->customer_id(), metric->project_id(), metric->id(), report_id,
+      frame_rate_event.component(), config::PackEventCodes(frame_rate_event.event_code()),
+      event->day_index(), frame_rate_event.frames_per_1000_seconds());
 }
 
 Status EventAggregator::AddMemoryUsageEvent(uint32_t report_id, const EventRecord& event_record) {
@@ -205,58 +181,14 @@
   if (!ValidateEventType(Event::kMemoryUsageEvent, *event)) {
     return kInvalidArguments;
   }
-  std::string key;
-  auto* metric = event_record.metric();
-  if (!PopulateReportKey(metric->customer_id(), metric->project_id(), metric->id(), report_id,
-                         &key)) {
-    return kInvalidArguments;
-  }
-  const MemoryUsageEvent& memory_usage_event = event->memory_usage_event();
-  return AddNumericEvent(key, event->day_index(), memory_usage_event.component(),
-                         config::PackEventCodes(memory_usage_event.event_code()),
-                         memory_usage_event.bytes());
-}
 
-Status EventAggregator::AddNumericEvent(const std::string& report_key, uint32_t day_index,
-                                        const std::string& component, uint64_t event_code,
-                                        int64_t value) {
-  auto locked = aggregate_store_->protected_aggregate_store_.lock();
-  auto aggregates = locked->local_aggregate_store.mutable_by_report_key()->find(report_key);
-  if (aggregates == locked->local_aggregate_store.mutable_by_report_key()->end()) {
-    LOG(ERROR) << "The Local Aggregate Store received an unexpected key.";
-    return kInvalidArguments;
-  }
-  if (!aggregates->second.has_numeric_aggregates()) {
-    LOG(ERROR) << "The local aggregates for this report key are not of a "
-                  "compatible type.";
-    return kInvalidArguments;
-  }
-  auto aggregates_by_day =
-      (*(*aggregates->second.mutable_numeric_aggregates()->mutable_by_component())[component]
-            .mutable_by_event_code())[event_code]
-          .mutable_by_day_index();
-  bool first_event_today = ((*aggregates_by_day).find(day_index) == aggregates_by_day->end());
-  auto day_aggregate = (*aggregates_by_day)[day_index].mutable_numeric_daily_aggregate();
-  const auto& aggregation_type =
-      aggregates->second.aggregation_config().report().aggregation_type();
-  switch (aggregation_type) {
-    case ReportDefinition::SUM:
-      day_aggregate->set_value(value + day_aggregate->value());
-      return kOK;
-    case ReportDefinition::MAX:
-      day_aggregate->set_value(std::max(value, day_aggregate->value()));
-      return kOK;
-    case ReportDefinition::MIN:
-      if (first_event_today) {
-        day_aggregate->set_value(value);
-      } else {
-        day_aggregate->set_value(std::min(value, day_aggregate->value()));
-      }
-      return kOK;
-    default:
-      LOG(ERROR) << "Unexpected aggregation type " << aggregation_type;
-      return kInvalidArguments;
-  }
+  auto* metric = event_record.metric();
+  const MemoryUsageEvent& memory_usage_event = event->memory_usage_event();
+
+  return aggregate_store_->UpdateNumericAggregate(
+      metric->customer_id(), metric->project_id(), metric->id(), report_id,
+      memory_usage_event.component(), config::PackEventCodes(memory_usage_event.event_code()),
+      event->day_index(), memory_usage_event.bytes());
 }
 
 Status EventAggregator::GenerateObservationsNoWorker(uint32_t final_day_index_utc,
diff --git a/src/local_aggregation/event_aggregator.h b/src/local_aggregation/event_aggregator.h
index fb0d10c..43fff36 100644
--- a/src/local_aggregation/event_aggregator.h
+++ b/src/local_aggregation/event_aggregator.h
@@ -207,13 +207,6 @@
   void DoScheduledTasks(std::chrono::system_clock::time_point system_time,
                         std::chrono::steady_clock::time_point steady_time);
 
-  // Adds a numeric value to the LocalAggregateStore by adding |value| to the
-  // current daily aggregate in the bucket indexed by |report_key|, |day_index|,
-  // |component|, and |event_code|. This is a helper method called by
-  // AddCountEvent andAddElapsedTimeEvent.
-  logger::Status AddNumericEvent(const std::string& report_key, uint32_t day_index,
-                                 const std::string& component, uint64_t event_code, int64_t value);
-
   // Sets the EventAggregator's SteadyClockInterface. Only for use in tests.
   void SetSteadyClock(util::SteadyClockInterface* clock) { steady_clock_.reset(clock); }