Migration of aggregate data when REPORT_ALL changes to SELECT_FIRST/LAST.

Implement the Merge methods for the local aggregation procedures.
Use them to migrate REPORT_ALL aggregate data to SELECT_FIRST/LAST.

Bug: 91520
Change-Id: I22fc9c8a2482d4213dac73a5ea4acd9959d4d089
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/644182
Reviewed-by: Zach Bush <zmbush@google.com>
Commit-Queue: Cameron Dale <camrdale@google.com>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
index 31c37d4..1739024 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
@@ -175,26 +175,33 @@
       system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
     }
 
-    for (EventCodesAggregateData &aggregate_data :
-         *system_profile_aggregate->mutable_by_event_code()) {
-      // Find the event codes that match the event's.
-      if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
-                     event_codes.begin(), event_codes.end())) {
-        return aggregate_data.mutable_data();
-      }
-    }
-    // Event codes were not found, so add them as a new entry.
-    if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
-      EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
-      aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
-      return aggregate_data->mutable_data();
-    }
+    return GetAggregateData(system_profile_aggregate, event_codes);
   }
   // TODO(fxbug.dev/85440): add support for finding or adding the system profile for REPORT_ALL.
 
   return nullptr;
 }
 
+AggregateData *AggregationProcedure::GetAggregateData(
+    SystemProfileAggregate *system_profile_aggregate,
+    google::protobuf::RepeatedField<uint32_t> event_codes) const {
+  for (EventCodesAggregateData &aggregate_data :
+       *system_profile_aggregate->mutable_by_event_code()) {
+    // Find the event codes that match the event's.
+    if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
+                   event_codes.begin(), event_codes.end())) {
+      return aggregate_data.mutable_data();
+    }
+  }
+  // Event codes were not found, so add them as a new entry.
+  if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
+    EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
+    aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
+    return aggregate_data->mutable_data();
+  }
+  return nullptr;
+}
+
 void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
                                            ReportAggregate *aggregate, uint64_t system_profile_hash,
                                            std::chrono::system_clock::time_point system_time) {
@@ -218,6 +225,36 @@
   }
 }
 
+void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+                                                        const SystemProfileAggregate &aggregate) {
+  if (system_profile_selection_policy_ == SELECT_FIRST) {
+    if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
+      merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
+      merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+    }
+    if (aggregate.last_seen_timestamp() > merged_aggregate->last_seen_timestamp()) {
+      merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+    }
+  } else {  // SELECT_LAST or SELECT_DEFAULT
+    if (aggregate.last_seen_timestamp() >= merged_aggregate->last_seen_timestamp()) {
+      merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
+      merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+    }
+    if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
+      merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+    }
+  }
+
+  for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) {
+    // Find or create the corresponding AggregateData in the merged system profile aggregate.
+    AggregateData *data = GetAggregateData(merged_aggregate, aggregate_data.event_codes());
+    // nullptr means there is no room in event_vector_buffer_max_ for more event codes.
+    if (data != nullptr) {
+      MergeAggregateData(data, aggregate_data.data());
+    }
+  }
+}
+
 util::TimeInfo AggregationProcedure::GetStartTimeInfo(
     const util::TimeInfo &current_time_info) const {
   if (IsDaily()) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
index 9267897..ce1ca12 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
@@ -81,6 +81,15 @@
                        uint64_t system_profile_hash,
                        std::chrono::system_clock::time_point system_time);
 
+  // Merge two instances of SystemProfileAggregate according to this aggregation procedure.
+  //
+  // The event codes and their data from |aggregate| are merged into the event codes and data in
+  // |merged_aggregate|. Both system profile aggregates must be included in the same
+  // AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
+  // merging the AggregateData in each bucket.
+  void MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+                                    const SystemProfileAggregate &aggregate);
+
   // GenerateObservations is the public interface for generating observations. It handles
   // reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
   // passing all that information down to GenerateSingleObservation.
@@ -140,15 +149,11 @@
 
   // Merge two instances of the aggregate data for this procedure.
   //
-  // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Any fields
-  // in the |bucket| (which contains |aggregate_data|) are also merged into |merged_bucket| (which
-  // contains |merged_aggregate_data|).
-  //
-  // TODO(fxbug.dev/91520): implement this in all subclasses and make it pure virtual.
+  // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
+  // AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
+  // bucket to be preserved accurately.
   virtual void MergeAggregateData(AggregateData *merged_aggregate_data,
-                                  AggregationPeriodBucket *merged_bucket,
-                                  const AggregateData &aggregate_data,
-                                  const AggregationPeriodBucket &bucket) {}
+                                  const AggregateData &aggregate_data) = 0;
 
   // GenerateSingleObservation generates an observation for the given report at the given time.
   //
@@ -205,6 +210,14 @@
                                   AggregationPeriodBucket *bucket, uint64_t system_profile_hash,
                                   std::chrono::system_clock::time_point system_time) const;
 
+  // Returns a pointer to the AggregateData of |system_profile_aggregate| that data for
+  // |event_codes| should be added to, if |system_profile_aggregate| has capacity for the event
+  // vector of |event_codes|. If it is not present, adds the event vector of |event_codes| to
+  // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a null pointer
+  // if |system_profile_aggregate| does not have capacity for new event codes.
+  AggregateData *GetAggregateData(SystemProfileAggregate *system_profile_aggregate,
+                                  google::protobuf::RepeatedField<uint32_t> event_codes) const;
+
   // Returns the starting TimeInfo to use aggregate data from for this report, when processing the
   // current_time_info.
   util::TimeInfo GetStartTimeInfo(const util::TimeInfo &current_time_info) const;
@@ -228,6 +241,11 @@
     LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
   }
 
+  void MergeAggregateData(AggregateData * /*merged_aggregate_data*/,
+                          const AggregateData & /*aggregate_data*/) override {
+    LOG(ERROR) << "MergeAggregateData is UNIMPLEMENTED for " << DebugString();
+  }
+
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
       const std::vector<AggregateDataToGenerate> & /* aggregates */,
       const std::set<std::vector<uint32_t>> & /*event_vectors*/,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
index 2815581..e675c1a 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
@@ -48,7 +48,7 @@
   ASSERT_GE(event_vector_buffer_max, 0u);
 
   uint32_t kHourId = 10000;
-  uint64_t system_profile_hash = 213ULL;
+  uint64_t system_profile_hash = uint64_t{213};
   ReportAggregate aggregate;
 
   uint64_t num_events = event_vector_buffer_max + 1;
@@ -78,14 +78,14 @@
   ReportAggregate aggregate;
 
   int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
                              count_aggregation_procedure.get(), &aggregate,
                              util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 1000;
-  uint64_t second_system_profile_hash = 324ULL;
+  uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
                              count_aggregation_procedure.get(), &aggregate,
                              util::FromUnixSeconds(second_event_time));
@@ -115,14 +115,14 @@
   ReportAggregate aggregate;
 
   int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
                              count_aggregation_procedure.get(), &aggregate,
                              util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 1000;
-  uint64_t second_system_profile_hash = 324ULL;
+  uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
                              count_aggregation_procedure.get(), &aggregate,
                              util::FromUnixSeconds(second_event_time));
@@ -149,7 +149,7 @@
   ASSERT_GE(event_vector_buffer_max, 0u);
 
   const uint32_t kDayIndex = 10000;
-  uint64_t system_profile_hash = 213ULL;
+  uint64_t system_profile_hash = uint64_t{213};
   ReportAggregate aggregate;
 
   uint64_t num_events = event_vector_buffer_max + 1;
@@ -180,13 +180,13 @@
   uint64_t num_events = event_vector_buffer_max + 1;
 
   int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
                             at_least_once_aggregation_procedure.get(), &aggregate,
                             util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 7200;
-  uint64_t second_system_profile_hash = 324ULL;
+  uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
                             at_least_once_aggregation_procedure.get(), &aggregate,
                             util::FromUnixSeconds(second_event_time));
@@ -218,14 +218,14 @@
   uint32_t kNumEventCodes = 2;
 
   int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   // Add events for 2 different event vectors: {1} and {2}.
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
                             select_first_aggregation_procedure.get(), &aggregate,
                             util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 7200;
-  uint64_t second_system_profile_hash = 324ULL;
+  uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
                             select_first_aggregation_procedure.get(), &aggregate,
                             util::FromUnixSeconds(second_event_time));
@@ -242,6 +242,130 @@
   EXPECT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
 }
 
+TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectLast) {
+  const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
+  ReportDefinition report =
+      metric.reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+
+  uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
+  ASSERT_GE(event_vector_buffer_max, 0u);
+
+  const uint32_t kDayIndex = 10000;
+  ReportAggregate aggregate;
+
+  // Manually create some events for the report as they would be with a REPORT_ALL policy.
+  // TODO(fxbug.dev/87403): use AddOccurrenceEventsForDay once REPORT_ALL is supported.
+
+  int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
+  uint64_t first_system_profile_hash = uint64_t{213};
+  AggregationPeriodBucket& bucket = (*aggregate.mutable_daily()->mutable_by_day_index())[kDayIndex];
+  SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
+  system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
+  system_profile_aggregate->set_first_seen_timestamp(first_event_time);
+  system_profile_aggregate->set_last_seen_timestamp(first_event_time);
+  for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(i);
+    data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+  }
+
+  int64_t second_event_time = first_event_time + 7200;
+  uint64_t second_system_profile_hash = uint64_t{324};
+  system_profile_aggregate = bucket.add_system_profile_aggregates();
+  system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
+  system_profile_aggregate->set_first_seen_timestamp(second_event_time);
+  system_profile_aggregate->set_last_seen_timestamp(second_event_time);
+  // Add more event_codes than can be supported in a single aggregate.
+  for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(i);
+    data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+  }
+
+  // Merge the second system profile aggregates into the first one.
+  std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
+      kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+  ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
+  bucket = aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
+  ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
+  SystemProfileAggregate* merged_system_profile_aggregate =
+      bucket.mutable_system_profile_aggregates(0);
+  at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
+      merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+
+  EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
+  EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
+  EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
+  ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
+  for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+    EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
+    EXPECT_TRUE(
+        merged_system_profile_aggregate->by_event_code(i).data().at_least_once().at_least_once());
+  }
+}
+
+TEST_F(AggregationProcedureTest, MergeSystemProfileAggregatesForSelectFirst) {
+  const MetricDefinition& metric = GetMetricDef(kOccurrenceMetricMetricId);
+  ReportDefinition report = metric.reports(kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+
+  uint64_t event_vector_buffer_max = report.event_vector_buffer_max();
+  ASSERT_GE(event_vector_buffer_max, 0u);
+
+  uint32_t kHourId = 10000;
+  ReportAggregate aggregate;
+
+  // Manually create some events for the report as they would be with a REPORT_ALL policy.
+  // TODO(fxbug.dev/87403): use AddOccurrenceEventsForHour once REPORT_ALL is supported.
+
+  int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
+  uint64_t first_system_profile_hash = uint64_t{213};
+  AggregationPeriodBucket& bucket = (*aggregate.mutable_hourly()->mutable_by_hour_id())[kHourId];
+  SystemProfileAggregate* system_profile_aggregate = bucket.add_system_profile_aggregates();
+  system_profile_aggregate->set_system_profile_hash(first_system_profile_hash);
+  system_profile_aggregate->set_first_seen_timestamp(first_event_time);
+  system_profile_aggregate->set_last_seen_timestamp(first_event_time);
+  for (uint32_t i = 1; i <= event_vector_buffer_max / 2; i++) {
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(i);
+    data->mutable_data()->set_count(i);
+  }
+
+  int64_t second_event_time = first_event_time + 1000;
+  uint64_t second_system_profile_hash = uint64_t{324};
+  system_profile_aggregate = bucket.add_system_profile_aggregates();
+  system_profile_aggregate->set_system_profile_hash(second_system_profile_hash);
+  system_profile_aggregate->set_first_seen_timestamp(second_event_time);
+  system_profile_aggregate->set_last_seen_timestamp(second_event_time);
+  // Add more event_codes than can be supported in a single aggregate.
+  for (uint32_t i = event_vector_buffer_max / 2; i <= event_vector_buffer_max * 3 / 2; i++) {
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(i);
+    data->mutable_data()->set_count(i);
+  }
+
+  // Merge the second system profile aggregates into the first one.
+  std::unique_ptr<AggregationProcedure> at_least_once_aggregation_procedure = GetProcedureFor(
+      kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
+  bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
+  ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
+  SystemProfileAggregate* merged_system_profile_aggregate =
+      bucket.mutable_system_profile_aggregates(0);
+  at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
+      merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+
+  EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
+  EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
+  EXPECT_EQ(merged_system_profile_aggregate->last_seen_timestamp(), second_event_time);
+  ASSERT_EQ(merged_system_profile_aggregate->by_event_code_size(), event_vector_buffer_max);
+  for (uint32_t i = 0; i < event_vector_buffer_max; ++i) {
+    EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).event_codes(0), i + 1);
+    // Middle event code is double due to merge from both system profiles.
+    EXPECT_EQ(merged_system_profile_aggregate->by_event_code(i).data().count(),
+              i + 1 == event_vector_buffer_max / 2 ? 2 * (i + 1) : i + 1);
+  }
+}
+
 TEST_F(AggregationProcedureTest, GenerateHourlyObservation) {
   std::unique_ptr<AggregationProcedure> count_aggregation_procedure = GetProcedureFor(
       kOccurrenceMetricMetricId, kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex);
@@ -250,7 +374,7 @@
   ASSERT_GE(event_vector_buffer_max, 0u);
 
   uint32_t kHourId = 10000;
-  uint64_t system_profile_hash = 213ULL;
+  uint64_t system_profile_hash = uint64_t{213};
   ReportAggregate aggregate;
 
   uint64_t num_events = event_vector_buffer_max + 1;
@@ -284,7 +408,7 @@
 
   uint32_t kFinalDayIndex = 10000;
   uint32_t kFirstDayIndex = 10000 - 6;
-  uint64_t system_profile_hash = 213ULL;
+  uint64_t system_profile_hash = uint64_t{213};
   ReportAggregate aggregate;
 
   uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
@@ -322,12 +446,12 @@
   uint32_t kFirstDayIndex = 10000 - 6;
   ReportAggregate aggregate;
 
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
   AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
                             at_least_once_aggregation_procedure.get(), &aggregate);
 
-  uint64_t second_system_profile_hash = 426ULL;
+  uint64_t second_system_profile_hash = uint64_t{426};
   uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
                             at_least_once_aggregation_procedure.get(), &aggregate);
@@ -360,12 +484,12 @@
   uint32_t kFirstDayIndex = 10000 - 6;
   ReportAggregate aggregate;
 
-  uint64_t first_system_profile_hash = 213ULL;
+  uint64_t first_system_profile_hash = uint64_t{213};
   uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
   AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
                             select_first_aggregation_procedure.get(), &aggregate);
 
-  uint64_t second_system_profile_hash = 426ULL;
+  uint64_t second_system_profile_hash = uint64_t{426};
   uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
                             select_first_aggregation_procedure.get(), &aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
index ab2159b..8472172 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
@@ -21,6 +21,18 @@
   aggregate_data->mutable_at_least_once()->set_at_least_once(true);
 }
 
+void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                         const AggregateData &aggregate_data) {
+  if (aggregate_data.at_least_once().at_least_once()) {
+    merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+    if (aggregate_data.at_least_once().last_day_index() >
+        merged_aggregate_data->at_least_once().last_day_index()) {
+      merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+          aggregate_data.at_least_once().last_day_index());
+    }
+  }
+}
+
 std::string AtLeastOnceAggregationProcedure::DebugString() const { return "AT_LEAST_ONCE"; }
 
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
index 3168fe9..5acf645 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
@@ -21,6 +21,10 @@
   void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
                            AggregateData *aggregate_data,
                            AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
   // Call observation generation more frequently so that data can be sent as soon as possible.
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
index 5e69bd7..77bc7a1 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
@@ -25,7 +25,8 @@
     return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
   }
 
-  std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+  std::unique_ptr<AtLeastOnceAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                int report_index) {
     return std::make_unique<AtLeastOnceAggregationProcedure>(GetMetricDef(metric_id),
                                                              GetReportDef(metric_id, report_index));
   }
@@ -39,7 +40,7 @@
   std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
 
   const uint32_t kDayIndex = 10000;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
   uint32_t kNumEventCodes = 100;
   ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
 
@@ -58,6 +59,63 @@
   }
 }
 
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_at_least_once()->set_at_least_once(true);
+  data.mutable_at_least_once()->set_last_day_index(101);
+  AggregateData merged_data;
+  merged_data.mutable_at_least_once()->set_at_least_once(true);
+  merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+  std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+      kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+      kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_at_least_once());
+  EXPECT_FALSE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 0);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_at_least_once()->set_at_least_once(true);
+  data.mutable_at_least_once()->set_last_day_index(101);
+  AggregateData merged_data;
+
+  std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+      kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(AtLeastOnceAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_at_least_once()->set_at_least_once(true);
+  merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+  std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
+      kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
+}
+
 TEST_F(AtLeastOnceAggregationProcedureTest, GenerateObservation1DayReport) {
   uint32_t metric_id = kOccurrenceMetricMetricId;
   int report_index = kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex;
@@ -66,7 +124,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
 
   const uint32_t kNumEventCodes = 100;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -117,7 +175,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
 
   const uint32_t kNumEventCodes = 7;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -190,7 +248,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
 
   const uint32_t kNumEventCodes = 7;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -267,7 +325,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
 
   const uint32_t kNumEventCodes = 7;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -348,7 +406,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 777ULL;
+  const uint64_t system_profile_hash = uint64_t{777};
   const uint32_t num_events = 1;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), num_events);
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
index 301fa6f..961ade1 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
@@ -39,6 +39,11 @@
   aggregate_data->set_count(aggregate_data->count() + occurrence_count);
 }
 
+void CountAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                   const AggregateData &aggregate_data) {
+  merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+}
+
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
 CountAggregationProcedure::GenerateSingleObservation(
     const std::vector<AggregateDataToGenerate> &buckets,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
index 6450932..155b201 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
@@ -22,6 +22,9 @@
   void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
                            AggregationPeriodBucket * /*bucket*/) override;
 
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
       const std::vector<AggregateDataToGenerate> &buckets,
       const std::set<std::vector<uint32_t>> &event_vectors,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
index 2f26277..7f7d17d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h"
+
 #include <gtest/gtest.h>
 
 #include "src/lib/util/datetime_util.h"
@@ -12,7 +14,13 @@
 
 namespace cobalt::local_aggregation {
 
-class CountAggregationProcedureTest : public testing::TestAggregationProcedure {};
+class CountAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<CountAggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+    return std::make_unique<CountAggregationProcedure>(GetMetricDef(metric_id),
+                                                       GetReportDef(metric_id, report_index));
+  }
+};
 
 TEST_F(CountAggregationProcedureTest, UpdateAggregateWorks) {
   uint32_t metric_id = kOccurrenceMetricMetricId;
@@ -21,7 +29,7 @@
 
   ReportAggregate aggregate;
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 56789ULL;
+  const uint64_t system_profile_hash = uint64_t{56789};
   const uint32_t kNumEventCodes = 100;
   AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, procedure.get(),
                              &aggregate);
@@ -45,7 +53,7 @@
 
   ReportAggregate aggregate;
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 56789ULL;
+  const uint64_t system_profile_hash = uint64_t{56789};
   const uint32_t kNumEventCodes = 100;
   AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash,
                                       procedure.get(), &aggregate);
@@ -62,6 +70,54 @@
   }
 }
 
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.set_count(10);
+  AggregateData merged_data;
+  merged_data.set_count(20);
+
+  std::unique_ptr<CountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 30);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<CountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 0);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.set_count(10);
+  AggregateData merged_data;
+
+  std::unique_ptr<CountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 10);
+}
+
+TEST_F(CountAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.set_count(20);
+
+  std::unique_ptr<CountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 20);
+}
+
 TEST_F(CountAggregationProcedureTest, GenerateObservationWorks) {
   uint32_t metric_id = kOccurrenceMetricMetricId;
   int report_index = kOccurrenceMetricFleetwideOccurrenceCountsReportReportIndex;
@@ -72,7 +128,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 56789ULL;
+  const uint64_t system_profile_hash = uint64_t{56789};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
                                &aggregate);
@@ -109,7 +165,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 56789ULL;
+  const uint64_t system_profile_hash = uint64_t{56789};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
                                &aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
index 754b139..d1ea447 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
@@ -53,6 +53,13 @@
   }
 }
 
+void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                              const AggregateData &aggregate_data) {
+  for (const auto &[index, count] : aggregate_data.integer_histogram().histogram()) {
+    (*merged_aggregate_data->mutable_integer_histogram()->mutable_histogram())[index] += count;
+  }
+}
+
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
 IntegerHistogramAggregationProcedure::GenerateHourlyObservation(
     const AggregateDataToGenerate &bucket) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
index ce6f006..6112ded 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
@@ -21,6 +21,9 @@
   void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
                            AggregationPeriodBucket * /*bucket*/) override;
 
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
       const AggregateDataToGenerate &bucket) override;
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
index 5535308..581e08b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h"
+
 #include <gtest/gtest.h>
 
 #include "src/lib/util/datetime_util.h"
@@ -47,6 +49,12 @@
                                  util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
     }
   }
+
+  std::unique_ptr<IntegerHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                     int report_index) {
+    return std::make_unique<IntegerHistogramAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
 };
 
 TEST_F(IntegerHistogramAggregationProcedureTest, UpdateAggregateWorksInteger) {
@@ -59,7 +67,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 1867ULL;
+  const uint64_t system_profile_hash = uint64_t{1867};
   LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
@@ -80,7 +88,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 1867ULL;
+  const uint64_t system_profile_hash = uint64_t{1867};
   LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}},
                             system_profile_hash, procedure.get(), &aggregate);
 
@@ -92,6 +100,73 @@
   EXPECT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
 }
 
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
+  data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
+  AggregateData merged_data;
+  merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
+  merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
+
+  std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 50);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_integer_histogram());
+  EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_integer_histogram()->mutable_histogram()->insert({0, 10});
+  data.mutable_integer_histogram()->mutable_histogram()->insert({1, 20});
+  AggregateData merged_data;
+
+  std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(0), 10);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 20);
+}
+
+TEST_F(IntegerHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_integer_histogram()->mutable_histogram()->insert({1, 30});
+  merged_data.mutable_integer_histogram()->mutable_histogram()->insert({2, 40});
+
+  std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(1), 30);
+  ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(2));
+  EXPECT_EQ(merged_data.integer_histogram().histogram().at(2), 40);
+}
+
 TEST_F(IntegerHistogramAggregationProcedureTest, GenerateObservationWorksInteger) {
   uint32_t metric_id = kIntegerMetricMetricId;
   int report_index = kIntegerMetricFleetwideHistogramsReportIndex;
@@ -102,7 +177,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 1867ULL;
+  const uint64_t system_profile_hash = uint64_t{1867};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
   }
@@ -139,7 +214,7 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 1867ULL;
+  const uint64_t system_profile_hash = uint64_t{1867};
   const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash,
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
index 8c26130..30ef332 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
@@ -94,6 +94,13 @@
   aggregate_data->set_integer_value(aggregate_data->integer_value() +
                                     event_record.event()->integer_event().value());
 }
+
+void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                            const AggregateData &aggregate_data) {
+  merged_aggregate_data->set_integer_value(merged_aggregate_data->integer_value() +
+                                           aggregate_data.integer_value());
+}
+
 int64_t SumNumericStatAggregationProcedure::CollectValue(
     const std::vector<const AggregateData *> &aggregates) {
   int64_t value = 0;
@@ -113,6 +120,19 @@
     aggregate_data->set_integer_value(event_record.event()->integer_event().value());
   }
 }
+
+void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                            const AggregateData &aggregate_data) {
+  if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+    if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
+      merged_aggregate_data->set_integer_value(
+          std::min(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+    } else {
+      merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+    }
+  }
+}
+
 int64_t MinNumericStatAggregationProcedure::CollectValue(
     const std::vector<const AggregateData *> &aggregates) {
   int64_t value = std::numeric_limits<int64_t>::max();
@@ -132,6 +152,19 @@
     aggregate_data->set_integer_value(event_record.event()->integer_event().value());
   }
 }
+
+void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                            const AggregateData &aggregate_data) {
+  if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+    if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
+      merged_aggregate_data->set_integer_value(
+          std::max(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+    } else {
+      merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+    }
+  }
+}
+
 int64_t MaxNumericStatAggregationProcedure::CollectValue(
     const std::vector<const AggregateData *> &aggregates) {
   int64_t value = std::numeric_limits<int64_t>::min();
@@ -148,6 +181,14 @@
   sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
   sum_and_count->set_count(sum_and_count->count() + 1);
 }
+
+void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                             const AggregateData &aggregate_data) {
+  SumAndCount *sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+  sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
+  sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
+}
+
 int64_t MeanNumericStatAggregationProcedure::CollectValue(
     const std::vector<const AggregateData *> &aggregates) {
   int64_t sum = 0;
@@ -166,6 +207,11 @@
       event_record.event()->integer_event().value());
 }
 
+void MedianNumericStatAggregationProcedure::MergeAggregateData(
+    AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
+  merged_aggregate_data->mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
+}
+
 namespace {
 std::vector<int64_t> CollectValues(const std::vector<const AggregateData *> &aggregates) {
   std::vector<int64_t> values;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
index 71a0fa0..9ebcd06 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
@@ -36,28 +36,95 @@
   virtual int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) = 0;
 };
 
-#define PROCEDURE(type)                                                                      \
-  class type##NumericStatAggregationProcedure : public NumericStatAggregationProcedure {     \
-   public:                                                                                   \
-    explicit type##NumericStatAggregationProcedure(const MetricDefinition &metric,           \
-                                                   const ReportDefinition &report)           \
-        : NumericStatAggregationProcedure(metric, report) {}                                 \
-                                                                                             \
-    void UpdateAggregateData(const logger::EventRecord &event_record,                        \
-                             AggregateData *aggregate_data,                                  \
-                             AggregationPeriodBucket * /*bucket*/) override;                 \
-                                                                                             \
-    [[nodiscard]] std::string DebugString() const override { return #type " NUMERIC_STAT"; } \
-                                                                                             \
-   private:                                                                                  \
-    int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;     \
-  }
+class SumNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+  explicit SumNumericStatAggregationProcedure(const MetricDefinition &metric,
+                                              const ReportDefinition &report)
+      : NumericStatAggregationProcedure(metric, report) {}
 
-PROCEDURE(Sum);
-PROCEDURE(Min);
-PROCEDURE(Max);
-PROCEDURE(Mean);
-PROCEDURE(Median);
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
+  [[nodiscard]] std::string DebugString() const override { return "SUM_NUMERIC_STAT"; }
+
+ private:
+  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MinNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+  explicit MinNumericStatAggregationProcedure(const MetricDefinition &metric,
+                                              const ReportDefinition &report)
+      : NumericStatAggregationProcedure(metric, report) {}
+
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
+  [[nodiscard]] std::string DebugString() const override { return "MIN_NUMERIC_STAT"; }
+
+ private:
+  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MaxNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+  explicit MaxNumericStatAggregationProcedure(const MetricDefinition &metric,
+                                              const ReportDefinition &report)
+      : NumericStatAggregationProcedure(metric, report) {}
+
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
+  [[nodiscard]] std::string DebugString() const override { return "MAX_NUMERIC_STAT"; }
+
+ private:
+  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MeanNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+  explicit MeanNumericStatAggregationProcedure(const MetricDefinition &metric,
+                                               const ReportDefinition &report)
+      : NumericStatAggregationProcedure(metric, report) {}
+
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
+  [[nodiscard]] std::string DebugString() const override { return "MEAN_NUMERIC_STAT"; }
+
+ private:
+  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
+
+class MedianNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
+ public:
+  explicit MedianNumericStatAggregationProcedure(const MetricDefinition &metric,
+                                                 const ReportDefinition &report)
+      : NumericStatAggregationProcedure(metric, report) {}
+
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
+  [[nodiscard]] std::string DebugString() const override { return "MEDIAN_NUMERIC_STAT"; }
+
+ private:
+  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+};
 
 class PercentileNNumericStatAggregationProcedure : public MedianNumericStatAggregationProcedure {
  public:
@@ -66,7 +133,7 @@
       : MedianNumericStatAggregationProcedure(metric, report),
         percentile_n_(report.local_aggregation_procedure_percentile_n()) {}
 
-  [[nodiscard]] std::string DebugString() const override { return "PercentileN NUMERIC_STAT"; }
+  [[nodiscard]] std::string DebugString() const override { return "PERCENTILE_N_NUMERIC_STAT"; }
 
  private:
   int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
index a3ac06e..9e096ef 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
@@ -7,6 +7,7 @@
 #include <limits>
 #include <memory>
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "src/lib/util/datetime_util.h"
@@ -22,6 +23,8 @@
 namespace cobalt::local_aggregation {
 
 using TimeInfo = util::TimeInfo;
+using ::testing::IsEmpty;
+using ::testing::UnorderedElementsAre;
 
 class NumericStatAggregationProcedureTest
     : public testing::TestAggregationProcedure,
@@ -130,23 +133,23 @@
 void CheckDebugString(int64_t daily_report_type, AggregationProcedure *procedure) {
   switch (daily_report_type) {
     case kIntegerMetricUniqueDeviceNumericStatsReport7DaySumReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "Sum NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "SUM_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMinReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "Min NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "MIN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMaxReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "Max NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "MAX_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMeanReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "Mean NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "MEAN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMedianReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "Median NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "MEDIAN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7Day75thPercentileReportIndex:
     case kIntegerMetricUniqueDeviceNumericStatsReport7Day99thPercentileReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "PercentileN NUMERIC_STAT");
+      ASSERT_EQ(procedure->DebugString(), "PERCENTILE_N_NUMERIC_STAT");
       break;
   }
 }
@@ -163,7 +166,7 @@
 
   ReportAggregate report_aggregate;
   const uint32_t kDayIndex = 7;
-  const uint64_t system_profile_hash = 987ULL;
+  const uint64_t system_profile_hash = uint64_t{987};
 
   int64_t window_size = static_cast<int64_t>(integer_sequence.size()) / kDayIndex;
   std::vector<int64_t> window;
@@ -209,7 +212,7 @@
 
   ReportAggregate report_aggregate;
   const uint32_t kHourId = 20;
-  const uint64_t system_profile_hash = 987ULL;
+  const uint64_t system_profile_hash = uint64_t{987};
 
   for (uint32_t hour = 0; hour <= kHourId; hour += 2) {
     LogIntegerSequence(integer_sequence, TimeInfo::FromHourId(hour), system_profile_hash,
@@ -284,4 +287,370 @@
                          }),
                          TestName);
 
+class SumNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<SumNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                   int report_index) {
+    return std::make_unique<SumNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.set_integer_value(200);
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_value(), 300);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.set_integer_value(200);
+  AggregateData merged_data;
+
+  std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_value(), 200);
+}
+
+TEST_F(SumNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MinNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<MinNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                   int report_index) {
+    return std::make_unique<MinNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.set_integer_value(-15);
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.set_integer_value(-15);
+  AggregateData merged_data;
+
+  std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MinNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MaxNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<MaxNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                   int report_index) {
+    return std::make_unique<MaxNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.set_integer_value(-15);
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), 0);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.set_integer_value(-15);
+  AggregateData merged_data;
+
+  std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), -15);
+}
+
+TEST_F(MaxNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.set_integer_value(100);
+
+  std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.has_integer_value());
+  EXPECT_EQ(merged_data.integer_value(), 100);
+}
+
+class MeanNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<MeanNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                    int report_index) {
+    return std::make_unique<MeanNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_sum_and_count()->set_count(10);
+  data.mutable_sum_and_count()->set_sum(200);
+  AggregateData merged_data;
+  merged_data.mutable_sum_and_count()->set_count(20);
+  merged_data.mutable_sum_and_count()->set_sum(100);
+
+  std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 30);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 0);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_sum_and_count()->set_count(10);
+  data.mutable_sum_and_count()->set_sum(200);
+  AggregateData merged_data;
+
+  std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 10);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
+}
+
+TEST_F(MeanNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_sum_and_count()->set_count(20);
+  merged_data.mutable_sum_and_count()->set_sum(100);
+
+  std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 20);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
+}
+
+class MedianNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<MedianNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                      int report_index) {
+    return std::make_unique<MedianNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_integer_values()->add_value(-15);
+  data.mutable_integer_values()->add_value(150);
+  AggregateData merged_data;
+  merged_data.mutable_integer_values()->add_value(100);
+  merged_data.mutable_integer_values()->add_value(10);
+
+  std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_integer_values()->add_value(-15);
+  data.mutable_integer_values()->add_value(150);
+  AggregateData merged_data;
+
+  std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
+}
+
+TEST_F(MedianNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_integer_values()->add_value(100);
+  merged_data.mutable_integer_values()->add_value(10);
+
+  std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
+      GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
+}
+
+class PercentileNNumericStatAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<PercentileNNumericStatAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                           int report_index) {
+    return std::make_unique<PercentileNNumericStatAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
+};
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_integer_values()->add_value(-15);
+  data.mutable_integer_values()->add_value(150);
+  AggregateData merged_data;
+  merged_data.mutable_integer_values()->add_value(100);
+  merged_data.mutable_integer_values()->add_value(10);
+
+  std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+      kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+      kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_integer_values()->add_value(-15);
+  data.mutable_integer_values()->add_value(150);
+  AggregateData merged_data;
+
+  std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+      kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
+}
+
+TEST_F(PercentileNNumericStatAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_integer_values()->add_value(100);
+  merged_data.mutable_integer_values()->add_value(10);
+
+  std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
+      kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
+}
+
 }  // namespace cobalt::local_aggregation
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
index cef9424..23138ea 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
@@ -23,6 +23,18 @@
   aggregate_data->mutable_at_least_once()->set_at_least_once(true);
 }
 
+void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                         const AggregateData &aggregate_data) {
+  if (aggregate_data.at_least_once().at_least_once()) {
+    merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+    if (aggregate_data.at_least_once().last_day_index() >
+        merged_aggregate_data->at_least_once().last_day_index()) {
+      merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+          aggregate_data.at_least_once().last_day_index());
+    }
+  }
+}
+
 std::string SelectFirstAggregationProcedure::DebugString() const { return "SELECT_FIRST"; }
 
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
index 814cee1..0a540a7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
@@ -18,6 +18,10 @@
   void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
                            AggregateData *aggregate_data,
                            AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
   // Call observation generation more frequently so that data can be sent as soon as possible.
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
index 75ec93c..1f6ee10 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
@@ -23,7 +23,8 @@
     return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
   }
 
-  std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+  std::unique_ptr<SelectFirstAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                int report_index) {
     return std::make_unique<SelectFirstAggregationProcedure>(GetMetricDef(metric_id),
                                                              GetReportDef(metric_id, report_index));
   }
@@ -36,7 +37,7 @@
   std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
 
   const uint32_t kDayIndex = 10000;
-  const uint64_t system_profile_hash = 999ULL;
+  const uint64_t system_profile_hash = uint64_t{999};
   std::unique_ptr<logger::EventRecord> record = GetEventRecord(metric_id);
   record->event()->set_day_index(kDayIndex);
 
@@ -57,6 +58,67 @@
   ASSERT_TRUE(system_profile_agg.by_event_code(0).data().at_least_once().at_least_once());
 }
 
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_at_least_once()->set_at_least_once(true);
+  data.mutable_at_least_once()->set_last_day_index(101);
+  AggregateData merged_data;
+  merged_data.mutable_at_least_once()->set_at_least_once(true);
+  merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+  std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_at_least_once());
+  EXPECT_FALSE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 0);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_at_least_once()->set_at_least_once(true);
+  data.mutable_at_least_once()->set_last_day_index(101);
+  AggregateData merged_data;
+
+  std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
+}
+
+TEST_F(SelectFirstAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_at_least_once()->set_at_least_once(true);
+  merged_data.mutable_at_least_once()->set_last_day_index(100);
+
+  std::unique_ptr<SelectFirstAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_TRUE(merged_data.at_least_once().at_least_once());
+  EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
+}
+
 TEST_F(SelectFirstAggregationProcedureTest, GenerateObservation1DayReport) {
   uint32_t metric_id = kOccurrenceMetricMetricId;
   int report_index = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex;
@@ -65,7 +127,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 999ULL;
+  const uint64_t system_profile_hash = uint64_t{999};
 
   const uint32_t kNumEventCodes = 2;
 
@@ -108,7 +170,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 999ULL;
+  const uint64_t system_profile_hash = uint64_t{999};
 
   const uint32_t kNumEventCodes = 2;
 
@@ -170,7 +232,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 999ULL;
+  const uint64_t system_profile_hash = uint64_t{999};
 
   const uint32_t kNumEventCodes = 2;
 
@@ -237,7 +299,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 999ULL;
+  const uint64_t system_profile_hash = uint64_t{999};
 
   const uint32_t kNumEventCodes = 2;
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
index f276749..75d7e4d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
@@ -22,6 +22,11 @@
                             event_record.event()->occurrence_event().count());
 }
 
+void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                              const AggregateData &aggregate_data) {
+  merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+}
+
 std::string SelectMostCommonAggregationProcedure::DebugString() const {
   return "SELECT_MOST_COMMON";
 }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
index e4f67a4..e2430ff 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
@@ -20,6 +20,10 @@
 
   void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
                            AggregationPeriodBucket * /*bucket*/) override;
+
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
index cf1a45d..67ae97e 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
@@ -23,7 +23,8 @@
     return logger::EventRecord::MakeEventRecord(GetProjectContext(), metric_id).ValueOrDie();
   }
 
-  std::unique_ptr<AggregationProcedure> GetProcedure(uint32_t metric_id, int report_index) {
+  std::unique_ptr<SelectMostCommonAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                     int report_index) {
     return std::make_unique<SelectMostCommonAggregationProcedure>(
         GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
   }
@@ -37,7 +38,7 @@
   std::unique_ptr<AggregationProcedure> procedure = GetProcedure(metric_id, report_index);
 
   const uint32_t kDayIndex = 10000;
-  const uint64_t system_profile_hash = 1234568ULL;
+  const uint64_t system_profile_hash = uint64_t{1234568};
   uint32_t kNumEventCodes = 100;
   ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
 
@@ -56,6 +57,58 @@
   }
 }
 
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.set_count(100);
+  AggregateData merged_data;
+  merged_data.set_count(121);
+
+  std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 221);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 0);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.set_count(100);
+  AggregateData merged_data;
+
+  std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 100);
+}
+
+TEST_F(SelectMostCommonAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.set_count(121);
+
+  std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId,
+                   kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.count(), 121);
+}
+
 TEST_F(SelectMostCommonAggregationProcedureTest, GenerateObservation1DayReportNoEvents) {
   uint32_t metric_id = kOccurrenceMetricMetricId;
   int report_index = kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex;
@@ -83,7 +136,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 1234568ULL;
+  const uint64_t system_profile_hash = uint64_t{1234568};
 
   const uint32_t kNumEventCodes = 100;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
@@ -122,7 +175,7 @@
   const uint32_t kDayIndex = 10000;
   util::TimeInfo time_info;
   time_info.day_index = kDayIndex;
-  const uint64_t system_profile_hash = 1234568ULL;
+  const uint64_t system_profile_hash = uint64_t{1234568};
 
   const uint32_t kNumEventCodes = 7;
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
index bf4cd9b..6f232e3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
@@ -37,6 +37,18 @@
   }
 }
 
+void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+                                                             const AggregateData &aggregate_data) {
+  // Merge in the aggregate data's mapping of indexes to their count.
+  // This only works correctly if the AggregateData objects are both part of the same
+  // AggregationPeriodBucket, such that their string_index values both refer to the same repeated
+  // string_hashes field in the bucket.
+  for (const auto &[string_index, count] : aggregate_data.string_histogram().histogram()) {
+    (*merged_aggregate_data->mutable_string_histogram()->mutable_histogram())[string_index] +=
+        count;
+  }
+}
+
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
 StringHistogramAggregationProcedure::GenerateHourlyObservation(
     const AggregateDataToGenerate &bucket) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
index 60edd72..aa67ed8 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
@@ -21,6 +21,9 @@
   void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
                            AggregationPeriodBucket *bucket) override;
 
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
       const AggregateDataToGenerate &bucket) override;
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
index d370ce4..6f358e6 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
@@ -17,7 +19,7 @@
 using ::testing::Contains;
 
 class StringHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
- public:
+ protected:
   void LogStringEvents(uint32_t hour_id, uint32_t num_event_codes,
                        const std::vector<std::string>& strings, uint64_t system_profile_hash,
                        AggregationProcedure* procedure, ReportAggregate* aggregate) {
@@ -34,6 +36,12 @@
       }
     }
   }
+
+  std::unique_ptr<StringHistogramAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                    int report_index) {
+    return std::make_unique<StringHistogramAggregationProcedure>(
+        GetMetricDef(metric_id), GetReportDef(metric_id, report_index));
+  }
 };
 
 TEST_F(StringHistogramAggregationProcedureTest, UpdateAggregateWorks) {
@@ -43,7 +51,7 @@
   ReportAggregate aggregate;
   const uint32_t kNumEventCodes = 100;
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 111111ULL;
+  const uint64_t system_profile_hash = uint64_t{111111};
   const std::vector<std::string> kTestStrings = {
       "Nunc dictum justo ac arcu.",
       "Suspendisse ullamcorper mi vel pulvinar dictum.",
@@ -62,6 +70,73 @@
   ASSERT_EQ(system_profile_agg.by_event_code_size(), kNumEventCodes);
 }
 
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
+  data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
+  AggregateData merged_data;
+  merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
+  merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
+
+  std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.string_histogram().histogram_size(), 3);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 50);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_FALSE(merged_data.has_string_histogram());
+  EXPECT_EQ(merged_data.string_histogram().histogram_size(), 0);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_string_histogram()->mutable_histogram()->insert({0, 10});
+  data.mutable_string_histogram()->mutable_histogram()->insert({1, 20});
+  AggregateData merged_data;
+
+  std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(0), 10);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 20);
+}
+
+TEST_F(StringHistogramAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_string_histogram()->mutable_histogram()->insert({1, 30});
+  merged_data.mutable_string_histogram()->mutable_histogram()->insert({2, 40});
+
+  std::unique_ptr<StringHistogramAggregationProcedure> procedure =
+      GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(1), 30);
+  ASSERT_TRUE(merged_data.string_histogram().histogram().contains(2));
+  EXPECT_EQ(merged_data.string_histogram().histogram().at(2), 40);
+}
+
 TEST_F(StringHistogramAggregationProcedureTest, GenerateObservationWorks) {
   std::unique_ptr<AggregationProcedure> procedure =
       GetProcedureFor(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
@@ -69,7 +144,7 @@
   ReportAggregate aggregate;
   const uint32_t kNumEventCodes = 10;
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 111111ULL;
+  const uint64_t system_profile_hash = uint64_t{111111};
   const std::vector<std::string> kTestStrings = {
       "Nunc dictum justo ac arcu.",
       "Suspendisse ullamcorper mi vel pulvinar dictum.",
@@ -122,7 +197,7 @@
   ReportAggregate aggregate;
   const uint32_t kNumEventCodes = 10;
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 111111ULL;
+  const uint64_t system_profile_hash = uint64_t{111111};
   // The string_buffer_max is 5, this is too many
   const std::vector<std::string> kTestStrings = {
       "Nunc dictum justo ac arcu.",
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
index ab6dc27..a7572dd 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
@@ -22,6 +22,13 @@
   sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
 }
 
+void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData* merged_aggregate_data,
+                                                         const AggregateData& aggregate_data) {
+  SumAndCount* sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+  sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
+  sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
+}
+
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
 SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate& bucket) {
   std::vector<std::tuple<std::vector<uint32_t>, int64_t, uint32_t>> data;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
index 62cd875..fbc28b3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
@@ -20,6 +20,9 @@
   void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
                            AggregationPeriodBucket *bucket) override;
 
+  void MergeAggregateData(AggregateData *merged_aggregate_data,
+                          const AggregateData &aggregate_data) override;
+
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
       const AggregateDataToGenerate &bucket) override;
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
index 15d8876..0bfed7b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
@@ -2,6 +2,8 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include "src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h"
+
 #include <gtest/gtest.h>
 
 #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
@@ -11,7 +13,14 @@
 
 namespace cobalt::local_aggregation {
 
-class SumAndCountAggregationProcedureTest : public testing::TestAggregationProcedure {};
+class SumAndCountAggregationProcedureTest : public testing::TestAggregationProcedure {
+ protected:
+  std::unique_ptr<SumAndCountAggregationProcedure> GetProcedure(uint32_t metric_id,
+                                                                int report_index) {
+    return std::make_unique<SumAndCountAggregationProcedure>(GetMetricDef(metric_id),
+                                                             GetReportDef(metric_id, report_index));
+  }
+};
 
 TEST_F(SumAndCountAggregationProcedureTest, UpdateAggregateWorks) {
   uint32_t metric_id = kIntegerMetricMetricId;
@@ -24,7 +33,7 @@
 
   const int64_t kValue = 42;
   const uint32_t kHourId = 1;
-  const uint64_t system_profile_hash = 634354ULL;
+  const uint64_t system_profile_hash = uint64_t{634354};
   AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, procedure.get(),
                    &aggregate);
 
@@ -41,6 +50,62 @@
   }
 }
 
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataBothSet) {
+  AggregateData data;
+  data.mutable_sum_and_count()->set_count(10);
+  data.mutable_sum_and_count()->set_sum(200);
+  AggregateData merged_data;
+  merged_data.mutable_sum_and_count()->set_count(20);
+  merged_data.mutable_sum_and_count()->set_sum(100);
+
+  std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 30);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataNeitherSet) {
+  AggregateData data;
+  AggregateData merged_data;
+
+  std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 0);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataFromSet) {
+  AggregateData data;
+  data.mutable_sum_and_count()->set_count(10);
+  data.mutable_sum_and_count()->set_sum(200);
+  AggregateData merged_data;
+
+  std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 10);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
+}
+
+TEST_F(SumAndCountAggregationProcedureTest, MergeAggregateDataToSet) {
+  AggregateData data;
+  AggregateData merged_data;
+  merged_data.mutable_sum_and_count()->set_count(20);
+  merged_data.mutable_sum_and_count()->set_sum(100);
+
+  std::unique_ptr<SumAndCountAggregationProcedure> procedure =
+      GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
+  procedure->MergeAggregateData(&merged_data, data);
+
+  EXPECT_EQ(merged_data.sum_and_count().count(), 20);
+  EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
+}
+
 TEST_F(SumAndCountAggregationProcedureTest, GenerateObservationWorks) {
   uint32_t metric_id = kIntegerMetricMetricId;
   int report_index = kIntegerMetricFleetwideMeansReportReportIndex;
@@ -52,7 +117,7 @@
 
   const int64_t kValue = 42;
   const uint32_t kEndHourId = 11;
-  const uint64_t system_profile_hash = 634354ULL;
+  const uint64_t system_profile_hash = uint64_t{634354};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, procedure.get(),
                      &aggregate);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
index cd13e23..194d2a7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
@@ -48,8 +48,8 @@
     current_metric_id_ = metric_id;
     current_report_index_ = report_index;
 
-    const auto& metric = GetMetricDef(current_metric_id_);
-    const auto& report = metric.reports(current_report_index_);
+    const MetricDefinition& metric = GetMetricDef(current_metric_id_);
+    const ReportDefinition& report = metric.reports(current_report_index_);
     current_event_vector_buffer_max_ = report.event_vector_buffer_max();
 
     auto agg_or = AggregationProcedure::Get(metric, report);
@@ -83,7 +83,7 @@
       AggregationProcedure* procedure, ReportAggregate* aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     auto record = MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
-    auto* event = record->event()->mutable_occurrence_event();
+    OccurrenceEvent* event = record->event()->mutable_occurrence_event();
     event->add_event_code(0);
     event->set_count(count);
 
@@ -110,7 +110,7 @@
       AggregationProcedure* procedure, ReportAggregate* aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
-    auto* event = record->event()->mutable_occurrence_event();
+    OccurrenceEvent* event = record->event()->mutable_occurrence_event();
     event->add_event_code(0);
     event->set_count(count);
 
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
index 2c1b08a..68b17a3 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
+++ b/src/local_aggregation_1_1/local_aggregate_storage/BUILD.gn
@@ -23,12 +23,14 @@
     "$cobalt_root/src/lib/util:hash",
     "$cobalt_root/src/lib/util:protected_fields",
     "$cobalt_root/src/local_aggregation_1_1:proto",
+    "$cobalt_root/src/local_aggregation_1_1/aggregation_procedures",
     "$cobalt_root/src/logger:internal_metrics",
     "$cobalt_root/src/logger:logger_interface",
     "$cobalt_root/src/logger:project_context_factory",
     "$cobalt_root/src/pb:metadata_builder",
     "$cobalt_root/src/public/lib:registry_identifiers",
     "$cobalt_root/src/public/lib:status",
+    "$cobalt_root/src/public/lib/statusor",
     "$cobalt_root/src/registry:cobalt_registry_proto",
   ]
   configs += [ "$cobalt_root:cobalt_config" ]
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
index 9e56cdf..a03fcf0 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage_test.cc
@@ -101,7 +101,7 @@
         storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
     ASSERT_TRUE(agg_or.ok());
     MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
-    agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
+    agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile());
     agg.Save();
   }
 
@@ -430,7 +430,7 @@
         storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
     ASSERT_TRUE(agg_or.ok());
     MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
-    agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
+    agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile());
     agg.Save();
   }
 
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
index bbc2eaa..99d6fad 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
@@ -8,10 +8,12 @@
 
 #include "src/lib/util/datetime_util.h"
 #include "src/lib/util/hash.h"
+#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
 #include "src/logging.h"
 #include "src/pb/metadata_builder.h"
+#include "src/public/lib/statusor/statusor.h"
 
 namespace cobalt::local_aggregation {
 
@@ -76,6 +78,40 @@
   StoreFilteredSystemProfile(system_profile_hash, filtered_system_profile);
 }
 
+bool LocalAggregateStorage::MigrateReportAllData(AggregationPeriodBucket& bucket,
+                                                 const MetricDefinition& metric_definition,
+                                                 const ReportDefinition& report_def) {
+  lib::statusor::StatusOr<std::unique_ptr<AggregationProcedure>> procedure_or =
+      AggregationProcedure::Get(metric_definition, report_def);
+  if (!procedure_or.ok()) {
+    LOG(ERROR) << "Failed to create aggregation procedure to migrate REPORT_ALL data for report "
+               << metric_definition.id() << "-" << report_def.id() << ": " << procedure_or.status();
+    return false;
+  }
+  std::unique_ptr<AggregationProcedure> procedure = procedure_or.ConsumeValueOrDie();
+  if (procedure == nullptr) {
+    // This is a non cobalt 1.1 report type, should be silently ignored.
+    return false;
+  }
+
+  google::protobuf::RepeatedPtrField<SystemProfileAggregate>* system_profile_aggregates =
+      bucket.mutable_system_profile_aggregates();
+  auto system_profile_aggregates_it = system_profile_aggregates->begin();
+
+  // Merge all the other aggregates into the first entry.
+  SystemProfileAggregate& merged_system_profile_aggregate = *system_profile_aggregates_it;
+
+  ++system_profile_aggregates_it;
+  while (system_profile_aggregates_it != system_profile_aggregates->end()) {
+    procedure->MergeSystemProfileAggregates(&merged_system_profile_aggregate,
+                                            *system_profile_aggregates_it);
+
+    // After merging, remove the old aggregate from the repeated field.
+    system_profile_aggregates_it = system_profile_aggregates->erase(system_profile_aggregates_it);
+  }
+  return true;
+}
+
 bool LocalAggregateStorage::MigrateStoredData(MetricAggregate& metric,
                                               const MetricDefinition& metric_definition,
                                               const MetadataBuilder* metadata_builder) {
@@ -121,6 +157,15 @@
                          "for report: "
                       << report_id << " hour id: " << hour_id;
           }
+          if (report_def->system_profile_selection() != REPORT_ALL &&
+              bucket.system_profile_aggregates_size() > 1) {
+            if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
+              changed = true;
+              LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
+                        << SystemProfileSelectionPolicy_Name(
+                               report_def->system_profile_selection());
+            }
+          }
         }
         break;
       case ReportAggregate::kDaily:
@@ -134,6 +179,15 @@
                          "for report: "
                       << report_id << " day index: " << day_index;
           }
+          if (report_def->system_profile_selection() != REPORT_ALL &&
+              bucket.system_profile_aggregates_size() > 1) {
+            if (MigrateReportAllData(bucket, metric_definition, *report_def)) {
+              changed = true;
+              LOG(INFO) << "Migrated the previously REPORT_ALL report " << report_id << " to: "
+                        << SystemProfileSelectionPolicy_Name(
+                               report_def->system_profile_selection());
+            }
+          }
         }
         break;
       default:
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
index 196ca33..0b0e916 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h
@@ -193,6 +193,10 @@
                                       const ReportDefinition &report_def,
                                       const MetadataBuilder *metadata_builder);
 
+  static bool MigrateReportAllData(AggregationPeriodBucket &bucket,
+                                   const MetricDefinition &metric_definition,
+                                   const ReportDefinition &report_def);
+
   const int64_t per_project_reserved_bytes_;
   struct ByteTracking {
     // The total number of bytes used across all projects.
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
index 70b59dd..99d0bdd 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage_test.cc
@@ -4,16 +4,22 @@
 
 #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "absl/strings/escaping.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation_1_1/testing/test_registry.cb.h"
 #include "src/logging.h"
+#include "src/public/lib/statusor/statusor.h"
 #include "src/system_data/fake_system_data.h"
 
 namespace cobalt::local_aggregation {
 
+using lib::statusor::StatusOr;
+using ::testing::Contains;
+using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
+
 namespace {
 
 std::unique_ptr<CobaltRegistry> GetRegistry() {
@@ -86,4 +92,155 @@
   EXPECT_EQ(storage_->AmountStored(proj), 0);
 }
 
+TEST_F(LocalAggregateStorageTest, MigrateDailyReportAllToSelectLast) {
+  // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
+  // a REPORT_ALL metric.
+  uint64_t system_profile_hash_1 = uint64_t{1234};
+  uint64_t system_profile_hash_2 = uint64_t{5678};
+  uint32_t day_index = 1000;
+  {
+    StatusOr<MetricAggregateRef> agg_or =
+        storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+                                         .ForProject(kProjectId)
+                                         .ForMetric(kOccurrenceMetricMetricId));
+    ASSERT_TRUE(agg_or.ok());
+    MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
+
+    ReportAggregate& report_agg =
+        (*agg.aggregate()
+              ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
+    report_agg.mutable_daily()->set_last_day_index(day_index);
+    AggregationPeriodBucket& mutable_bucket =
+        (*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
+
+    SystemProfileAggregate* system_profile_aggregate =
+        mutable_bucket.add_system_profile_aggregates();
+    system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
+    system_profile_aggregate->set_first_seen_timestamp(9000);
+    system_profile_aggregate->set_last_seen_timestamp(9001);
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(1);
+    data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+
+    system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
+    system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
+    system_profile_aggregate->set_first_seen_timestamp(10000);
+    system_profile_aggregate->set_last_seen_timestamp(10001);
+    data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(2);
+    data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
+
+    ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
+  }  // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
+
+  ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
+                  .ConsumeValueOrDie(),
+              Contains("1"));
+
+  // Reload the storage, triggering a migration.
+  ReplaceRegistry();
+
+  StatusOr<MetricAggregateRef> agg_or =
+      storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+                                       .ForProject(kProjectId)
+                                       .ForMetric(kOccurrenceMetricMetricId));
+  ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
+  MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
+  const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
+      kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
+  ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index));
+  const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
+
+  ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
+  const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
+  EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_2);
+  EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
+  EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
+
+  ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
+  const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
+  ASSERT_EQ(data.event_codes_size(), 1);
+  EXPECT_EQ(data.event_codes(0), 1);
+  EXPECT_TRUE(data.data().at_least_once().at_least_once());
+  const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
+  ASSERT_EQ(data2.event_codes_size(), 1);
+  EXPECT_EQ(data2.event_codes(0), 2);
+  EXPECT_TRUE(data2.data().at_least_once().at_least_once());
+}
+
+TEST_F(LocalAggregateStorageTest, MigrateHourlyReportAllToSelectFirst) {
+  // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for
+  // a REPORT_ALL metric.
+  uint64_t system_profile_hash_1 = uint64_t{1234};
+  uint64_t system_profile_hash_2 = uint64_t{5678};
+  uint32_t hour_id = 100001;
+  {
+    StatusOr<MetricAggregateRef> agg_or =
+        storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+                                         .ForProject(kProjectId)
+                                         .ForMetric(kOccurrenceMetricMetricId));
+    ASSERT_TRUE(agg_or.ok());
+    MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
+
+    ReportAggregate& report_agg =
+        (*agg.aggregate()->mutable_by_report_id())[kOccurrenceMetricHourlyDeviceHistogramsReportId];
+    report_agg.mutable_hourly()->set_last_hour_id(hour_id);
+    AggregationPeriodBucket& mutable_bucket =
+        (*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id];
+
+    SystemProfileAggregate* system_profile_aggregate =
+        mutable_bucket.add_system_profile_aggregates();
+    system_profile_aggregate->set_system_profile_hash(system_profile_hash_1);
+    system_profile_aggregate->set_first_seen_timestamp(9000);
+    system_profile_aggregate->set_last_seen_timestamp(9001);
+    EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(1);
+    data->mutable_data()->set_count(3);
+
+    system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
+    system_profile_aggregate->set_system_profile_hash(system_profile_hash_2);
+    system_profile_aggregate->set_first_seen_timestamp(10000);
+    system_profile_aggregate->set_last_seen_timestamp(10001);
+    data = system_profile_aggregate->add_by_event_code();
+    data->add_event_codes(2);
+    data->mutable_data()->set_count(5);
+
+    ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
+  }  // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
+
+  ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
+                  .ConsumeValueOrDie(),
+              Contains("1"));
+
+  // Reload the storage, triggering a migration.
+  ReplaceRegistry();
+
+  StatusOr<MetricAggregateRef> agg_or =
+      storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
+                                       .ForProject(kProjectId)
+                                       .ForMetric(kOccurrenceMetricMetricId));
+  ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
+  MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
+  const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
+      kOccurrenceMetricHourlyDeviceHistogramsReportId);
+  ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id));
+  const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id);
+
+  ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
+  const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
+  EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_1);
+  EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000);
+  EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001);
+
+  ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2);
+  const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0);
+  ASSERT_EQ(data.event_codes_size(), 1);
+  EXPECT_EQ(data.event_codes(0), 1);
+  EXPECT_EQ(data.data().count(), 3);
+  const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1);
+  ASSERT_EQ(data2.event_codes_size(), 1);
+  EXPECT_EQ(data2.event_codes(0), 2);
+  EXPECT_EQ(data2.data().count(), 5);
+}
+
 }  // namespace cobalt::local_aggregation