[NoCheck] Use references in AggregationProcedure

Plain references should be used instead of pointers when the parameter is required

- Removes 1 CHECK call
- Makes AggregateDataToGenerate move only

Change-Id: I9a19b1ab44e53aae0b34a1bd42a8d5fe67035560
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/658845
Fuchsia-Auto-Submit: Zach Bush <zmbush@google.com>
Reviewed-by: Cameron Dale <camrdale@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/src/lib/util/status_builder.h b/src/lib/util/status_builder.h
index cf08df8..e59e712 100644
--- a/src/lib/util/status_builder.h
+++ b/src/lib/util/status_builder.h
@@ -239,6 +239,7 @@
 PB_ENUM_FORMATTER(MetricDefinition::MetricType);
 PB_ENUM_FORMATTER(ReportDefinition::OnDeviceAggregationType);
 PB_ENUM_FORMATTER(ReportDefinition::ReportType);
+PB_ENUM_FORMATTER(SystemProfileSelectionPolicy);
 
 #undef PB_ENUM_FORMATTER
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
index 6e2f8b9..ee94f35 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.cc
@@ -4,6 +4,7 @@
 
 #include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
 
+#include <functional>
 #include <memory>
 
 #include "src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h"
@@ -102,12 +103,12 @@
   window_.set_days(static_cast<AggregationDays>(report.local_aggregation_period()));
 }
 
-AggregationPeriodBucket *AggregationProcedure::GetAggregationPeriodBucket(
-    uint32_t time, ReportAggregate *aggregate) const {
+AggregationPeriodBucket &AggregationProcedure::GetAggregationPeriodBucket(
+    const util::TimeInfo &time, ReportAggregate &aggregate) const {
   if (IsDaily()) {
-    return &(*aggregate->mutable_daily()->mutable_by_day_index())[time];
+    return (*aggregate.mutable_daily()->mutable_by_day_index())[time.day_index];
   }
-  return &(*aggregate->mutable_hourly()->mutable_by_hour_id())[time];
+  return (*aggregate.mutable_hourly()->mutable_by_hour_id())[time.hour_id];
 }
 
 bool AggregationProcedure::IsValidEventType(Event::TypeCase type) const {
@@ -127,9 +128,11 @@
   }
 }
 
-AggregateData *AggregationProcedure::GetAggregateData(
-    const logger::EventRecord &event_record, AggregationPeriodBucket *bucket,
-    uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const {
+lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
+AggregationProcedure::GetAggregateData(const logger::EventRecord &event_record,
+                                       AggregationPeriodBucket &bucket,
+                                       uint64_t system_profile_hash,
+                                       std::chrono::system_clock::time_point system_time) const {
   google::protobuf::RepeatedField<uint32_t> event_codes;
 
   switch (metric_type_) {
@@ -146,9 +149,13 @@
       event_codes = event_record.event()->string_event().event_code();
       break;
     default:
-      LOG(ERROR) << DebugString() << ": Metric of type " << metric_type_
-                 << " does not appear to be a Cobalt 1.1 metric";
-      return nullptr;
+      return util::StatusBuilder(StatusCode::INVALID_ARGUMENT)
+          .AppendMsg(DebugString())
+          .AppendMsg(": Metric of type")
+          .AppendMsg(metric_type_)
+          .AppendMsg(" does not appear to be a Cobalt 1.1 metric")
+          .LogError()
+          .Build();
   }
 
   int64_t system_timestamp = util::ToUnixSeconds(system_time);
@@ -157,14 +164,14 @@
       system_profile_selection_policy_ == SELECT_LAST ||
       system_profile_selection_policy_ == SELECT_DEFAULT) {
     SystemProfileAggregate *system_profile_aggregate;
-    if (bucket->system_profile_aggregates_size() >= 1) {
-      if (bucket->system_profile_aggregates_size() > 1) {
-        LOG(ERROR) << "There are " << bucket->system_profile_aggregates_size()
+    if (bucket.system_profile_aggregates_size() >= 1) {
+      if (bucket.system_profile_aggregates_size() > 1) {
+        LOG(ERROR) << "There are " << bucket.system_profile_aggregates_size()
                    << " system profile aggregates for a report with system_profile_selection "
                    << system_profile_selection_policy_ << ". There should be only one.";
         // Use the first aggregate anyway.
       }
-      system_profile_aggregate = bucket->mutable_system_profile_aggregates(0);
+      system_profile_aggregate = bucket.mutable_system_profile_aggregates(0);
       if ((system_profile_selection_policy_ == SELECT_LAST ||
            system_profile_selection_policy_ == SELECT_DEFAULT) &&
           system_profile_aggregate->system_profile_hash() != system_profile_hash) {
@@ -172,59 +179,65 @@
       }
       system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
     } else {
-      system_profile_aggregate = bucket->add_system_profile_aggregates();
+      system_profile_aggregate = bucket.add_system_profile_aggregates();
       system_profile_aggregate->set_system_profile_hash(system_profile_hash);
       system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
       system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
     }
 
-    return GetAggregateData(system_profile_aggregate, event_codes);
+    return GetAggregateData(*system_profile_aggregate, event_codes);
   }
 
   if (system_profile_selection_policy_ == REPORT_ALL) {
     SystemProfileAggregate *system_profile_aggregate = nullptr;
-    for (SystemProfileAggregate &aggregate : *(bucket->mutable_system_profile_aggregates())) {
+    for (SystemProfileAggregate &aggregate : *(bucket.mutable_system_profile_aggregates())) {
       if (aggregate.system_profile_hash() == system_profile_hash) {
         system_profile_aggregate = &aggregate;
       }
     }
     if (system_profile_aggregate == nullptr) {
-      system_profile_aggregate = bucket->add_system_profile_aggregates();
+      system_profile_aggregate = bucket.add_system_profile_aggregates();
       system_profile_aggregate->set_system_profile_hash(system_profile_hash);
       system_profile_aggregate->set_first_seen_timestamp(system_timestamp);
     }
     system_profile_aggregate->set_last_seen_timestamp(system_timestamp);
 
-    return GetAggregateData(system_profile_aggregate, event_codes);
+    return GetAggregateData(*system_profile_aggregate, event_codes);
   }
 
-  return nullptr;
+  return util::StatusBuilder(StatusCode::NOT_FOUND, "Unknown system_profile_selection_policy ")
+      .AppendMsg(system_profile_selection_policy_)
+      .LogError()
+      .Build();
 }
 
-AggregateData *AggregationProcedure::GetAggregateData(
-    SystemProfileAggregate *system_profile_aggregate,
+lib::statusor::StatusOr<std::reference_wrapper<AggregateData>>
+AggregationProcedure::GetAggregateData(
+    SystemProfileAggregate &system_profile_aggregate,
     google::protobuf::RepeatedField<uint32_t> event_codes) const {
-  CHECK(system_profile_aggregate != nullptr)
-      << "Null SystemProfileAggregate passed to GetAggregateData";
   for (EventCodesAggregateData &aggregate_data :
-       *system_profile_aggregate->mutable_by_event_code()) {
+       *system_profile_aggregate.mutable_by_event_code()) {
     // Find the event codes that match the event's.
     if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
                    event_codes.begin(), event_codes.end())) {
-      return aggregate_data.mutable_data();
+      return std::ref(*aggregate_data.mutable_data());
     }
   }
   // Event codes were not found, so add them as a new entry.
-  if (system_profile_aggregate->by_event_code_size() < event_vector_buffer_max_) {
-    EventCodesAggregateData *aggregate_data = system_profile_aggregate->add_by_event_code();
+  if (system_profile_aggregate.by_event_code_size() < event_vector_buffer_max_) {
+    EventCodesAggregateData *aggregate_data = system_profile_aggregate.add_by_event_code();
     aggregate_data->mutable_event_codes()->CopyFrom(event_codes);
-    return aggregate_data->mutable_data();
+    return std::ref(*aggregate_data->mutable_data());
   }
-  return nullptr;
+
+  return util::StatusBuilder(StatusCode::RESOURCE_EXHAUSTED,
+                             "SystemProfileAggregate has reached event_vector_buffer_max")
+      .WithContext("event_vector_buffer_max", event_vector_buffer_max_)
+      .Build();
 }
 
 void AggregationProcedure::UpdateAggregate(const logger::EventRecord &event_record,
-                                           ReportAggregate *aggregate, uint64_t system_profile_hash,
+                                           ReportAggregate &aggregate, uint64_t system_profile_hash,
                                            std::chrono::system_clock::time_point system_time) {
   if (!IsValidEventType(event_record.event()->type_case())) {
     LOG(ERROR) << "Unable to log event of type " << event_record.event()->type_case()
@@ -232,46 +245,44 @@
     return;
   }
 
-  AggregationPeriodBucket *bucket;
-  if (IsDaily()) {
-    bucket = GetAggregationPeriodBucket(event_record.event()->day_index(), aggregate);
-  } else {
-    bucket = GetAggregationPeriodBucket(event_record.event()->hour_id(), aggregate);
-  }
+  std::reference_wrapper<AggregationPeriodBucket> bucket =
+      GetAggregationPeriodBucket(event_record.GetTimeInfo(), aggregate);
 
-  if (AggregateData *aggregate_data =
+  if (lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> aggregate_data =
           GetAggregateData(event_record, bucket, system_profile_hash, system_time);
-      aggregate_data != nullptr) {
-    UpdateAggregateData(event_record, aggregate_data, bucket);
+      aggregate_data.ok()) {
+    UpdateAggregateData(event_record, aggregate_data.ConsumeValueOrDie(), bucket);
   }
 }
 
-void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+void AggregationProcedure::MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
                                                         const SystemProfileAggregate &aggregate) {
   if (system_profile_selection_policy_ == SELECT_FIRST) {
-    if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
-      merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
-      merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+    if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
+      merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
+      merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
     }
-    if (aggregate.last_seen_timestamp() > merged_aggregate->last_seen_timestamp()) {
-      merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+    if (aggregate.last_seen_timestamp() > merged_aggregate.last_seen_timestamp()) {
+      merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
     }
   } else {  // SELECT_LAST or SELECT_DEFAULT
-    if (aggregate.last_seen_timestamp() >= merged_aggregate->last_seen_timestamp()) {
-      merged_aggregate->set_system_profile_hash(aggregate.system_profile_hash());
-      merged_aggregate->set_last_seen_timestamp(aggregate.last_seen_timestamp());
+    if (aggregate.last_seen_timestamp() >= merged_aggregate.last_seen_timestamp()) {
+      merged_aggregate.set_system_profile_hash(aggregate.system_profile_hash());
+      merged_aggregate.set_last_seen_timestamp(aggregate.last_seen_timestamp());
     }
-    if (aggregate.first_seen_timestamp() < merged_aggregate->first_seen_timestamp()) {
-      merged_aggregate->set_first_seen_timestamp(aggregate.first_seen_timestamp());
+    if (aggregate.first_seen_timestamp() < merged_aggregate.first_seen_timestamp()) {
+      merged_aggregate.set_first_seen_timestamp(aggregate.first_seen_timestamp());
     }
   }
 
   for (const EventCodesAggregateData &aggregate_data : aggregate.by_event_code()) {
     // Find or create the corresponding AggregateData in the merged system profile aggregate.
-    AggregateData *data = GetAggregateData(merged_aggregate, aggregate_data.event_codes());
-    // nullptr means there is no room in event_vector_buffer_max_ for more event codes.
-    if (data != nullptr) {
-      MergeAggregateData(data, aggregate_data.data());
+    lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> data =
+        GetAggregateData(merged_aggregate, aggregate_data.event_codes());
+    // A non-OK status here means that there is no room in event_vector_buffer_max_ for the event
+    // codes.
+    if (data.ok()) {
+      MergeAggregateData(data.ValueOrDie(), aggregate_data.data());
     }
   }
 }
@@ -291,7 +302,7 @@
 
 std::map<uint64_t, std::vector<AggregateDataToGenerate>>
 AggregationProcedure::GetAggregateDataToGenerate(const util::TimeInfo &time_info,
-                                                 ReportAggregate *aggregate) const {
+                                                 ReportAggregate &aggregate) const {
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate;
 
   util::TimeInfo start_time_info = GetStartTimeInfo(time_info);
@@ -304,10 +315,10 @@
       uint32_t first_seen_timestamp = UINT32_MAX;
       std::vector<AggregateDataToGenerate> data_to_generate_for_system_profile;
       for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
-        if (!aggregate->daily().by_day_index().contains(i)) {
+        if (!aggregate.daily().by_day_index().contains(i)) {
           continue;
         }
-        AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+        AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
         AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
         for (SystemProfileAggregate &system_profile_aggregate :
              *agg->mutable_system_profile_aggregates()) {
@@ -325,7 +336,7 @@
             first_seen_timestamp = system_profile_aggregate.first_seen_timestamp();
           }
           for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
-            agg_to_generate.aggregate_data.push_back(&data);
+            agg_to_generate.aggregate_data.push_back(data);
           }
         }
         if (!agg_to_generate.aggregate_data.empty()) {
@@ -337,15 +348,15 @@
       }
     } else if (system_profile_selection_policy_ == REPORT_ALL) {
       for (uint32_t i = time_info.day_index; i >= start_time_info.day_index; i--) {
-        if (!aggregate->daily().by_day_index().contains(i)) {
+        if (!aggregate.daily().by_day_index().contains(i)) {
           continue;
         }
-        AggregationPeriodBucket *agg = &(*aggregate->mutable_daily()->mutable_by_day_index())[i];
+        AggregationPeriodBucket *agg = &(*aggregate.mutable_daily()->mutable_by_day_index())[i];
         for (SystemProfileAggregate &system_profile_aggregate :
              *agg->mutable_system_profile_aggregates()) {
           AggregateDataToGenerate agg_to_generate = {.string_hashes = agg->string_hashes()};
           for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
-            agg_to_generate.aggregate_data.push_back(&data);
+            agg_to_generate.aggregate_data.push_back(data);
           }
           data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
               std::move(agg_to_generate));
@@ -353,14 +364,14 @@
       }
     }
   } else {
-    if (aggregate->hourly().by_hour_id().contains(start_time_info.hour_id)) {
+    if (aggregate.hourly().by_hour_id().contains(start_time_info.hour_id)) {
       AggregationPeriodBucket *agg =
-          &(*aggregate->mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
+          &(*aggregate.mutable_hourly()->mutable_by_hour_id())[start_time_info.hour_id];
       for (SystemProfileAggregate &system_profile_aggregate :
            *agg->mutable_system_profile_aggregates()) {
         AggregateDataToGenerate agg_to_generate{.string_hashes = agg->string_hashes()};
         for (EventCodesAggregateData &data : *system_profile_aggregate.mutable_by_event_code()) {
-          agg_to_generate.aggregate_data.push_back(&data);
+          agg_to_generate.aggregate_data.push_back(data);
         }
         data_to_generate[system_profile_aggregate.system_profile_hash()].emplace_back(
             std::move(agg_to_generate));
@@ -372,7 +383,7 @@
 
 lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>>
 AggregationProcedure::GenerateObservations(const util::TimeInfo &time_info,
-                                           ReportAggregate *aggregate) {
+                                           ReportAggregate &aggregate) {
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
       GetAggregateDataToGenerate(time_info, aggregate);
 
@@ -388,7 +399,7 @@
   return observations;
 }
 
-void AggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void AggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
                                                  util::TimeInfo time_info,
                                                  uint64_t system_profile_hash) const {
   util::TimeInfo delete_before = GetStartTimeInfo(time_info);
@@ -396,32 +407,32 @@
   // Clean up aggregates that will never be used again.
   if (IsDaily()) {
     std::vector<uint32_t> days_to_delete;
-    for (const auto &day : aggregate->daily().by_day_index()) {
+    for (const auto &day : aggregate.daily().by_day_index()) {
       if (day.first <= delete_before.day_index) {
         days_to_delete.push_back(day.first);
       }
     }
 
     for (auto day : days_to_delete) {
-      aggregate->mutable_daily()->mutable_by_day_index()->erase(day);
+      aggregate.mutable_daily()->mutable_by_day_index()->erase(day);
     }
   } else {
     std::vector<uint32_t> hours_to_delete;
-    for (const auto &hour : aggregate->hourly().by_hour_id()) {
+    for (const auto &hour : aggregate.hourly().by_hour_id()) {
       if (hour.first <= delete_before.hour_id) {
         hours_to_delete.push_back(hour.first);
       }
     }
 
     for (auto hour : hours_to_delete) {
-      aggregate->mutable_hourly()->mutable_by_hour_id()->erase(hour);
+      aggregate.mutable_hourly()->mutable_by_hour_id()->erase(hour);
     }
   }
 
   if (IsDaily()) {
-    aggregate->mutable_daily()->set_last_day_index(time_info.day_index);
+    aggregate.mutable_daily()->set_last_day_index(time_info.day_index);
   } else {
-    aggregate->mutable_hourly()->set_last_hour_id(time_info.hour_id);
+    aggregate.mutable_hourly()->set_last_hour_id(time_info.hour_id);
   }
 }
 
@@ -429,12 +440,12 @@
     const std::vector<AggregateDataToGenerate> &buckets) const {
   std::set<std::vector<uint32_t>> event_vectors;
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
       if (event_vectors.size() == event_vector_buffer_max_) {
         return event_vectors;
       }
       event_vectors.insert(
-          {aggregate_data->event_codes().begin(), aggregate_data->event_codes().end()});
+          {aggregate_data.event_codes().begin(), aggregate_data.event_codes().end()});
     }
   }
   return event_vectors;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
index ce1ca12..cc7bf0d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h
@@ -31,8 +31,13 @@
 // generating observations, all AggregateDataToGenerate and all the aggregate_data they contain must
 // be for the same system profile.
 struct AggregateDataToGenerate {
-  std::vector<EventCodesAggregateData *> aggregate_data;
+  std::vector<std::reference_wrapper<EventCodesAggregateData>> aggregate_data;
   const google::protobuf::RepeatedPtrField<std::string> &string_hashes;
+
+  // Make the struct move only
+  AggregateDataToGenerate(AggregateDataToGenerate const &) = delete;
+  AggregateDataToGenerate &operator=(AggregateDataToGenerate const &) = delete;
+  AggregateDataToGenerate(AggregateDataToGenerate &&) = default;
 };
 
 // A container for the observation, with the hash of the system profile that it is for.
@@ -77,7 +82,7 @@
   // |aggregate|: A mutable ReportAggregate object.
   // |system_profile_hash|: The hash of the filtered system profile this event is for.
   // |system_time|: The system time that the event occurred at.
-  void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate *aggregate,
+  void UpdateAggregate(const logger::EventRecord &event_record, ReportAggregate &aggregate,
                        uint64_t system_profile_hash,
                        std::chrono::system_clock::time_point system_time);
 
@@ -87,7 +92,7 @@
   // |merged_aggregate|. Both system profile aggregates must be included in the same
   // AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
   // merging the AggregateData in each bucket.
-  void MergeSystemProfileAggregates(SystemProfileAggregate *merged_aggregate,
+  void MergeSystemProfileAggregates(SystemProfileAggregate &merged_aggregate,
                                     const SystemProfileAggregate &aggregate);
 
   // GenerateObservations is the public interface for generating observations. It handles
@@ -101,7 +106,7 @@
   // and the system profile hash to use when generating their ObservationMetadata.
   //
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations(
-      const util::TimeInfo &time_info, ReportAggregate *aggregate);
+      const util::TimeInfo &time_info, ReportAggregate &aggregate);
 
   [[nodiscard]] virtual std::string DebugString() const = 0;
 
@@ -132,7 +137,7 @@
   // observations, they should be deleted. If an hour_id or a day_index is passed to this function,
   // an earlier hour_id or day_index will never be passed and the data associated with that can be
   // deleted.
-  virtual void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo time_info,
+  virtual void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo time_info,
                                      uint64_t system_profile_hash) const;
 
  protected:
@@ -144,15 +149,15 @@
   // |aggregate_data|: A mutable AggregateData object.
   // |bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|.
   virtual void UpdateAggregateData(const logger::EventRecord &event_record,
-                                   AggregateData *aggregate_data,
-                                   AggregationPeriodBucket *bucket) = 0;
+                                   AggregateData &aggregate_data,
+                                   AggregationPeriodBucket &bucket) = 0;
 
   // Merge two instances of the aggregate data for this procedure.
   //
   // The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
   // AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
   // bucket to be preserved accurately.
-  virtual void MergeAggregateData(AggregateData *merged_aggregate_data,
+  virtual void MergeAggregateData(AggregateData &merged_aggregate_data,
                                   const AggregateData &aggregate_data) = 0;
 
   // GenerateSingleObservation generates an observation for the given report at the given time.
@@ -181,7 +186,7 @@
   // The return value is a map from the system_profile_hash, to the data to use to generate the
   // observation for that system profile.
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate(
-      const util::TimeInfo &time_info, ReportAggregate *aggregate) const;
+      const util::TimeInfo &time_info, ReportAggregate &aggregate) const;
 
   // Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
   // for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period.
@@ -196,27 +201,28 @@
   // Returns a mutable reference to the AggregationPeriodBucket associated with the given time
   // specifier.
   //
-  // |time|: Either a day_index (for daily metrics) or an hour_id (for hourly metrics)
+  // |time|: util::TimeInfo to fetch the bucket for.
   // |aggregate|: A mutable reference to a ReportAggregate object.
-  AggregationPeriodBucket *GetAggregationPeriodBucket(uint32_t time,
-                                                      ReportAggregate *aggregate) const;
+  AggregationPeriodBucket &GetAggregationPeriodBucket(const util::TimeInfo &time,
+                                                      ReportAggregate &aggregate) const;
 
   // Returns a pointer to the AggregateData of |bucket| that |event_record| should be added to,
   // if |bucket| has capacity for the event vector of |event_record|. In addition, adds
   // the event vector of |event_record| to |bucket|'s event_vectors buffer if it has capacity and
-  // the event vector is not yet present. Returns a null pointer if |bucket| does not have
+  // the event vector is not yet present. Returns a non-OK status if |bucket| does not have
   // capacity.
-  AggregateData *GetAggregateData(const logger::EventRecord &event_record,
-                                  AggregationPeriodBucket *bucket, uint64_t system_profile_hash,
-                                  std::chrono::system_clock::time_point system_time) const;
+  lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
+      const logger::EventRecord &event_record, AggregationPeriodBucket &bucket,
+      uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time) const;
 
   // Returns a pointer to the AggregateData of |system_profile_aggregate| that data for
   // |event_codes| should be added to, if |system_profile_aggregate| has capacity for the event
   // vector of |event_codes|. If it is not present, adds the event vector of |event_codes| to
-  // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a null pointer
+  // |system_profile_aggregate|'s event_vectors buffer, if it has capacity. Returns a non-OK status
   // if |system_profile_aggregate| does not have capacity for new event codes.
-  AggregateData *GetAggregateData(SystemProfileAggregate *system_profile_aggregate,
-                                  google::protobuf::RepeatedField<uint32_t> event_codes) const;
+  lib::statusor::StatusOr<std::reference_wrapper<AggregateData>> GetAggregateData(
+      SystemProfileAggregate &system_profile_aggregate,
+      google::protobuf::RepeatedField<uint32_t> event_codes) const;
 
   // Returns the starting TimeInfo to use aggregate data from for this report, when processing the
   // current_time_info.
@@ -236,12 +242,12 @@
   ~UnimplementedAggregationProcedure() override = default;
 
   void UpdateAggregateData(const logger::EventRecord & /* event_record */,
-                           AggregateData * /*aggregate_data*/,
-                           AggregationPeriodBucket * /* bucket */) override {
+                           AggregateData & /*aggregate_data*/,
+                           AggregationPeriodBucket & /* bucket */) override {
     LOG(ERROR) << "UpdateAggregateData is UNIMPLEMENTED for " << DebugString();
   }
 
-  void MergeAggregateData(AggregateData * /*merged_aggregate_data*/,
+  void MergeAggregateData(AggregateData & /*merged_aggregate_data*/,
                           const AggregateData & /*aggregate_data*/) override {
     LOG(ERROR) << "MergeAggregateData is UNIMPLEMENTED for " << DebugString();
   }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
index fc858b4..54ad1fc 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure_test.cc
@@ -60,8 +60,8 @@
   ReportAggregate aggregate;
 
   uint64_t num_events = event_vector_buffer_max + 1;
-  AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate);
+  AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
+                             aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -89,13 +89,13 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 1000;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(second_event_time));
 
   ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -126,13 +126,13 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 1000;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(second_event_time));
 
   ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -163,13 +163,13 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId, first_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 1000;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(num_events, kHourId, second_system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate,
+                             *count_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(second_event_time));
 
   ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
@@ -217,7 +217,7 @@
 
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate);
+                            *at_least_once_aggregation_procedure, aggregate);
 
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
   ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -245,13 +245,13 @@
   int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
   uint64_t first_system_profile_hash = uint64_t{213};
   AddOccurrenceEventsForDay(num_events, kDayIndex, first_system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate,
+                            *at_least_once_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 7200;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(num_events, kDayIndex, second_system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate,
+                            *at_least_once_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(second_event_time));
 
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -284,13 +284,13 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   // Add events for 2 different event vectors: {1} and {2}.
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate,
+                            *select_first_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 7200;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate,
+                            *select_first_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(second_event_time));
 
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -320,13 +320,13 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   // Add events for 2 different event vectors: {1} and {2}.
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, first_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate,
+                            *select_first_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(first_event_time));
 
   int64_t second_event_time = first_event_time + 7200;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, second_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate,
+                            *select_first_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(second_event_time));
 
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -374,7 +374,7 @@
   int64_t first_event_time = util::DayIndexToUnixSeconds(kDayIndex) + 4800;
   uint64_t first_system_profile_hash = uint64_t{213};
   AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, first_system_profile_hash,
-                            report_all_aggregation_procedure.get(), &aggregate,
+                            *report_all_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(first_event_time));
   AggregationPeriodBucket& bucket =
       aggregate.mutable_daily()->mutable_by_day_index()->at(kDayIndex);
@@ -383,7 +383,7 @@
   int64_t second_event_time = first_event_time + 7200;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForDay(event_vector_buffer_max, kDayIndex, second_system_profile_hash,
-                            report_all_aggregation_procedure.get(), &aggregate,
+                            *report_all_aggregation_procedure, aggregate,
                             util::FromUnixSeconds(second_event_time));
 
   // Merge the second system profile aggregates into the first one.
@@ -395,7 +395,7 @@
   SystemProfileAggregate* merged_system_profile_aggregate =
       bucket.mutable_system_profile_aggregates(0);
   at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
-      merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+      *merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
 
   EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), second_system_profile_hash);
   EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
@@ -424,7 +424,7 @@
   int64_t first_event_time = util::HourIdToUnixSeconds(kHourId);
   uint64_t first_system_profile_hash = uint64_t{213};
   AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, first_system_profile_hash,
-                             report_all_aggregation_procedure.get(), &aggregate,
+                             *report_all_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(first_event_time));
   AggregationPeriodBucket& bucket = aggregate.mutable_hourly()->mutable_by_hour_id()->at(kHourId);
   ASSERT_EQ(bucket.system_profile_aggregates_size(), 1u);
@@ -433,7 +433,7 @@
   int64_t second_event_time = first_event_time + 1000;
   uint64_t second_system_profile_hash = uint64_t{324};
   AddOccurrenceEventsForHour(event_vector_buffer_max, kHourId, second_system_profile_hash,
-                             report_all_aggregation_procedure.get(), &aggregate,
+                             *report_all_aggregation_procedure, aggregate,
                              util::FromUnixSeconds(second_event_time));
   ASSERT_EQ(bucket.system_profile_aggregates_size(), 2u);
   ASSERT_EQ(bucket.system_profile_aggregates(1).by_event_code_size(), event_vector_buffer_max);
@@ -447,7 +447,7 @@
   SystemProfileAggregate* merged_system_profile_aggregate =
       bucket.mutable_system_profile_aggregates(0);
   at_least_once_aggregation_procedure->MergeSystemProfileAggregates(
-      merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
+      *merged_system_profile_aggregate, bucket.system_profile_aggregates(1));
 
   EXPECT_EQ(merged_system_profile_aggregate->system_profile_hash(), first_system_profile_hash);
   EXPECT_EQ(merged_system_profile_aggregate->first_seen_timestamp(), first_event_time);
@@ -473,13 +473,13 @@
 
   uint64_t num_events = event_vector_buffer_max + 1;
   AddOccurrenceEventsForHour(num_events, kHourId - 1, system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate);
-  AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash,
-                             count_aggregation_procedure.get(), &aggregate);
+                             *count_aggregation_procedure, aggregate);
+  AddOccurrenceEventsForHour(num_events, kHourId, system_profile_hash, *count_aggregation_procedure,
+                             aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
       count_aggregation_procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId),
-                                                        &aggregate);
+                                                        aggregate);
 
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -507,15 +507,15 @@
 
   uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
   AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate);
+                            *at_least_once_aggregation_procedure, aggregate);
 
   uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate);
+                            *at_least_once_aggregation_procedure, aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
       at_least_once_aggregation_procedure->GenerateObservations(
-          util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+          util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
 
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -543,16 +543,16 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   uint32_t kFirstDayNumEvents = event_vector_buffer_max - 1;
   AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate);
+                            *at_least_once_aggregation_procedure, aggregate);
 
   uint64_t second_system_profile_hash = uint64_t{426};
   uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
-                            at_least_once_aggregation_procedure.get(), &aggregate);
+                            *at_least_once_aggregation_procedure, aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
       at_least_once_aggregation_procedure->GenerateObservations(
-          util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+          util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
 
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
@@ -581,16 +581,16 @@
   uint64_t first_system_profile_hash = uint64_t{213};
   uint32_t kFirstDayNumEvents = event_vector_buffer_max;
   AddOccurrenceEventsForDay(kFirstDayNumEvents, kFirstDayIndex, first_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate);
+                            *select_first_aggregation_procedure, aggregate);
 
   uint64_t second_system_profile_hash = uint64_t{426};
   uint32_t kSecondDayNumEvents = event_vector_buffer_max + 1;
   AddOccurrenceEventsForDay(kSecondDayNumEvents, kFirstDayIndex + 1, second_system_profile_hash,
-                            select_first_aggregation_procedure.get(), &aggregate);
+                            *select_first_aggregation_procedure, aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
       select_first_aggregation_procedure->GenerateObservations(
-          util::TimeInfo::FromDayIndex(kFinalDayIndex), &aggregate);
+          util::TimeInfo::FromDayIndex(kFinalDayIndex), aggregate);
 
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
index 8472172..de17231 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.cc
@@ -15,19 +15,19 @@
     : AggregationProcedure(metric, report), is_expedited_(report.expedited_sending()) {}
 
 void AtLeastOnceAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
+    const logger::EventRecord & /*event_record*/, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
   // TODO(fxbug.dev/53775): Handle the case where event_record is malformed.
-  aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+  aggregate_data.mutable_at_least_once()->set_at_least_once(true);
 }
 
-void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void AtLeastOnceAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                          const AggregateData &aggregate_data) {
   if (aggregate_data.at_least_once().at_least_once()) {
-    merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+    merged_aggregate_data.mutable_at_least_once()->set_at_least_once(true);
     if (aggregate_data.at_least_once().last_day_index() >
-        merged_aggregate_data->at_least_once().last_day_index()) {
-      merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+        merged_aggregate_data.at_least_once().last_day_index()) {
+      merged_aggregate_data.mutable_at_least_once()->set_last_day_index(
           aggregate_data.at_least_once().last_day_index());
     }
   }
@@ -41,13 +41,13 @@
     const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
   std::set<std::vector<uint32_t>> event_vectors_to_send;
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-      std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                         aggregate_data->event_codes().end());
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
       if (!event_vectors.count(event_vector)) {
         continue;
       }
-      if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) {
+      if (aggregate_data.data().at_least_once().last_day_index() >= time_info.day_index) {
         continue;
       }
 
@@ -69,7 +69,7 @@
   return logger::Encoder::EncodeIntegerObservation(data);
 }
 
-void AtLeastOnceAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void AtLeastOnceAggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
                                                             util::TimeInfo info,
                                                             uint64_t system_profile_hash) const {
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
@@ -87,19 +87,19 @@
     const std::set<std::vector<uint32_t>> &event_vectors =
         SelectEventVectorsForObservation(buckets);
     for (AggregateDataToGenerate &bucket : buckets) {
-      for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-        std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                           aggregate_data->event_codes().end());
+      for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+        std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                           aggregate_data.event_codes().end());
         if (!event_vectors.count(event_vector)) {
           continue;
         }
-        if (!aggregate_data->data().at_least_once().at_least_once()) {
+        if (!aggregate_data.data().at_least_once().at_least_once()) {
           continue;
         }
-        if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) {
+        if (aggregate_data.data().at_least_once().last_day_index() >= info.day_index) {
           continue;
         }
-        aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
+        aggregate_data.mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
       }
     }
   }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
index 5acf645..0e63c20 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure.h
@@ -19,17 +19,17 @@
                                            const ReportDefinition &report);
 
   void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
-                           AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+                           AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
   // Call observation generation more frequently so that data can be sent as soon as possible.
   [[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
-  void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+  void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
                              uint64_t system_profile_hash) const override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
index 77bc7a1..4c2819f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_aggregation_procedure_test.cc
@@ -45,8 +45,7 @@
   ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
 
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
   ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -69,7 +68,7 @@
 
   std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -81,7 +80,7 @@
 
   std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_at_least_once());
   EXPECT_FALSE(merged_data.at_least_once().at_least_once());
@@ -96,7 +95,7 @@
 
   std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -110,7 +109,7 @@
 
   std::unique_ptr<AtLeastOnceAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
@@ -130,11 +129,11 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate report_aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -156,12 +155,12 @@
   ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -181,11 +180,11 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate report_aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -208,7 +207,7 @@
 
   // Check that aggregates get marked as sent and don't get cleaned up while they are still needed
   // that day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
   ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
             1u);
@@ -221,7 +220,7 @@
   }
 
   // Check that calling observation generation again for the same day generates no observation.
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 1u);
@@ -230,13 +229,13 @@
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
 
   // Check that obsolete aggregates get cleaned up the next day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 }
 
@@ -254,14 +253,14 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate report_aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   // Check that the observation is generated for the next 7 days.
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -283,7 +282,7 @@
     ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
 
     // Check that obsolete aggregates get cleaned up.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
     // Days in the 7-day window that had events continue to contain data until the last day when it
@@ -311,7 +310,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -331,14 +330,14 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate report_aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   // Check that the observation is generated for the next 7 days.
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -360,7 +359,7 @@
     ASSERT_EQ(obs_event_codes.size(), kNumEventCodes);
 
     // Check that obsolete aggregates get cleaned up.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
     // Days in the 7-day window that had events continue to contain data.
@@ -380,7 +379,7 @@
     }
 
     // Check that calling observation generation again for the same day generates no observation.
-    observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+    observations_or = procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     observations = observations_or.ConsumeValueOrDie();
     ASSERT_EQ(observations.size(), 1u);
@@ -391,7 +390,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -411,11 +410,11 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(), num_events);
 
   ReportAggregate report_aggregate;
-  AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(num_events, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -447,7 +446,7 @@
   ReportAggregate report_aggregate;
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
index eb02a78..224ecff 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.cc
@@ -24,10 +24,10 @@
       string_buffer_max_(metric.string_buffer_max()) {}
 
 void AtLeastOnceStringAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket *bucket) {
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket &bucket) {
   Map<uint32_t, UniqueString> *unique_strings =
-      aggregate_data->mutable_unique_strings()->mutable_unique_strings();
+      aggregate_data.mutable_unique_strings()->mutable_unique_strings();
 
   std::string bytes =
       util::FarmhashFingerprint(event_record.event()->string_event().string_value());
@@ -35,8 +35,8 @@
   // Check if the current string event value's byte representation has appeared before in
   // the string hashes of the current period bucket, if so, then initialize a UniqueString message
   // if the index of the string hash doesn't exist in the current UniqueString mapping.
-  for (int i = 0; i < bucket->string_hashes_size(); i++) {
-    if (bucket->string_hashes(i) == bytes) {
+  for (int i = 0; i < bucket.string_hashes_size(); i++) {
+    if (bucket.string_hashes(i) == bytes) {
       if (!unique_strings->contains(i)) {
         (*unique_strings)[i] = UniqueString();
       }
@@ -44,31 +44,31 @@
     }
   }
 
-  if (bucket->string_hashes_size() < string_buffer_max_) {
+  if (bucket.string_hashes_size() < string_buffer_max_) {
     // Add new entry
-    (*unique_strings)[bucket->string_hashes_size()] = UniqueString();
-    bucket->add_string_hashes(bytes);
+    (*unique_strings)[bucket.string_hashes_size()] = UniqueString();
+    bucket.add_string_hashes(bytes);
   }
 }
 
 void AtLeastOnceStringAggregationProcedure::MergeAggregateData(
-    AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
+    AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
   // Merge in the aggregate data's mapping of indexes to their count.
   // This only works correctly if the AggregateData objects are both part of the same
   // AggregationPeriodBucket, such that their string_index values both refer to the same repeated
   // string_hashes field in the bucket.
   for (const auto &[string_index, unique_string] :
        aggregate_data.unique_strings().unique_strings()) {
-    if (merged_aggregate_data->unique_strings().unique_strings().contains(string_index)) {
-      if (unique_string.last_day_index() > merged_aggregate_data->unique_strings()
+    if (merged_aggregate_data.unique_strings().unique_strings().contains(string_index)) {
+      if (unique_string.last_day_index() > merged_aggregate_data.unique_strings()
                                                .unique_strings()
                                                .at(string_index)
                                                .last_day_index()) {
-        (*merged_aggregate_data->mutable_unique_strings()->mutable_unique_strings())[string_index]
+        (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index]
             .set_last_day_index(unique_string.last_day_index());
       }
     } else {
-      (*merged_aggregate_data->mutable_unique_strings()->mutable_unique_strings())[string_index] =
+      (*merged_aggregate_data.mutable_unique_strings()->mutable_unique_strings())[string_index] =
           unique_string;
     }
   }
@@ -92,16 +92,16 @@
   // of string hashes in the hashes vector above.
   std::map<std::string, uint32_t> seen_hashes;
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-      std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                         aggregate_data->event_codes().end());
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
       if (!(event_vectors.count(event_vector) > 0)) {
         continue;
       }
 
       // Create or add on to a unique event vector's string historgram indices.
       for (const auto &[index, unique_string] :
-           aggregate_data->data().unique_strings().unique_strings()) {
+           aggregate_data.data().unique_strings().unique_strings()) {
         if (unique_string.last_day_index() >= time_info.day_index) {
           continue;
         }
@@ -142,7 +142,7 @@
 }
 
 void AtLeastOnceStringAggregationProcedure::ObservationsCommitted(
-    ReportAggregate *aggregate, util::TimeInfo info, uint64_t system_profile_hash) const {
+    ReportAggregate &aggregate, util::TimeInfo info, uint64_t system_profile_hash) const {
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
       GetAggregateDataToGenerate(info, aggregate);
   auto data_to_generate_it = data_to_generate.find(system_profile_hash);
@@ -158,15 +158,15 @@
     const std::set<std::vector<uint32_t>> &event_vectors =
         SelectEventVectorsForObservation(buckets);
     for (AggregateDataToGenerate &bucket : buckets) {
-      for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-        std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                           aggregate_data->event_codes().end());
+      for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+        std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                           aggregate_data.event_codes().end());
         if (!(event_vectors.count(event_vector) > 0)) {
           continue;
         }
 
         for (auto &[index, unique_string] :
-             *aggregate_data->mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
+             *aggregate_data.mutable_data()->mutable_unique_strings()->mutable_unique_strings()) {
           if (unique_string.last_day_index() >= info.day_index) {
             continue;
           }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
index d63a13d..05453d5 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure.h
@@ -19,17 +19,17 @@
   explicit AtLeastOnceStringAggregationProcedure(const MetricDefinition &metric,
                                                  const ReportDefinition &report);
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket *bucket) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket &bucket) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
   // Call observation generation more frequently so that data can be sent as soon as possible.
   [[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
-  void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+  void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
                              uint64_t system_profile_hash) const override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
index 13e1365..2388006 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/at_least_once_string_aggregation_procedure_test.cc
@@ -23,8 +23,8 @@
  protected:
   void AddStringEventsForDay(uint32_t day_index,
                              const std::map<uint32_t, std::vector<std::string>>& events_to_strings,
-                             uint64_t system_profile_hash, AggregationProcedure* procedure,
-                             ReportAggregate* aggregate) {
+                             uint64_t system_profile_hash, AggregationProcedure& procedure,
+                             ReportAggregate& aggregate) {
     std::unique_ptr<logger::EventRecord> record =
         MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
 
@@ -35,8 +35,8 @@
       event->set_event_code(0, event_code);
       for (const std::string& str : strings) {
         event->set_string_value(str);
-        procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
-                                   util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index)));
+        procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+                                  util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index)));
       }
     }
   }
@@ -74,8 +74,8 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
             events_to_strings.size());
 
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   std::vector<std::string> expected_hashes;
   expected_hashes.reserve(kTestStrings.size());
@@ -116,7 +116,7 @@
 
   std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
       kStringMetricMetricId, kStringMetricUniqueDeviceStringCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 3);
   ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
@@ -133,7 +133,7 @@
 
   std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_unique_strings());
   EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 0);
@@ -149,7 +149,7 @@
 
   std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
   ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(0));
@@ -168,7 +168,7 @@
 
   std::unique_ptr<AtLeastOnceStringAggregationProcedure> procedure = GetProcedure(
       kOccurrenceMetricMetricId, kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.unique_strings().unique_strings_size(), 2);
   ASSERT_TRUE(merged_data.unique_strings().unique_strings().contains(1));
@@ -226,11 +226,11 @@
       {kTestStrings2, kTestHashes2},
       {kTestStrings3, kTestHashes3},
   };
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -267,13 +267,13 @@
   }
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   ASSERT_EQ(report_aggregate.daily().by_day_index_size(), 0);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -319,11 +319,11 @@
       {kTestStrings1, kTestHashes1},
       {kTestStrings2, kTestHashes2},
   };
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -361,7 +361,7 @@
 
   // Check that aggregates get marked as sent and don't get cleaned up while they are still needed
   // that day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   ASSERT_NE(report_aggregate.daily().by_day_index_size(), 0);
   EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
   EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
@@ -377,7 +377,7 @@
   }
 
   // Check that calling observation generation again for the same day generates no observation.
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 1u);
@@ -386,13 +386,13 @@
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
 
   // Check that obsolete aggregates get cleaned up the next day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 }
 
@@ -435,8 +435,8 @@
       {kTestStrings1, kTestHashes1},
       {kTestStrings2, kTestHashes2},
   };
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   std::vector<std::string> expected_hashes;
   expected_hashes.reserve(kTestStrings.size());
@@ -447,7 +447,7 @@
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -478,7 +478,7 @@
     }
 
     // Check that obsolete aggregates get cleaned up.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
     // Days in the 7-day window that had events continue to contain data until the last day when it
@@ -506,7 +506,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -571,16 +571,16 @@
       {2, kTestStrings2},
       {5, kTestStrings3},
   };
-  AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash,
-                        procedure.get(), &report_aggregate);
+  AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
+                        report_aggregate);
 
   const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
   const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
       {2, kTestStrings4},
       {7, kTestStrings5},
   };
-  AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash,
-                        procedure.get(), &report_aggregate);
+  AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
+                        report_aggregate);
 
   // This a vector string hashes that is a combination of hashes across 2 days for a single event
   // vector.
@@ -605,7 +605,7 @@
   util::TimeInfo time_info;
   time_info.day_index = kDayIndexDay2;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -653,11 +653,11 @@
   }
 
   // Commit observation
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
 
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndexDay2 + 7;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -702,8 +702,8 @@
       {kTestStrings1, kTestHashes1},
       {kTestStrings2, kTestHashes2},
   };
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   std::vector<std::string> expected_hashes;
   expected_hashes.reserve(kTestStrings.size());
@@ -714,7 +714,7 @@
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -746,7 +746,7 @@
 
     // Check that aggregates get marked as sent and don't get cleaned up while they are still needed
     // that day.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
     // Days in the 7-day window that had events continue to contain data.
@@ -766,7 +766,7 @@
     }
 
     // Check that calling observation generation again for the same day generates no observation.
-    observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+    observations_or = procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     observations = observations_or.ConsumeValueOrDie();
     ASSERT_EQ(observations.size(), 1u);
@@ -777,7 +777,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   EXPECT_EQ(observations.size(), 0u);
@@ -821,8 +821,8 @@
   ASSERT_GE(GetReportDef(metric_id, report_index).event_vector_buffer_max(),
             events_to_strings.size());
 
-  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, procedure.get(),
-                        &report_aggregate);
+  AddStringEventsForDay(kDayIndex, events_to_strings, system_profile_hash, *procedure,
+                        report_aggregate);
 
   EXPECT_LT(string_buffer_max, kTestStrings.size());
   EXPECT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).string_hashes_size(),
@@ -880,15 +880,15 @@
   const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_1 = {
       {0, kTestStrings1},
   };
-  AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash,
-                        procedure.get(), &report_aggregate);
+  AddStringEventsForDay(kDayIndexDay1, events_to_strings_day_1, system_profile_hash, *procedure,
+                        report_aggregate);
 
   const uint32_t kDayIndexDay2 = kDayIndexDay1 + 1;
   const std::map<uint32_t, std::vector<std::string>> events_to_strings_day_2 = {
       {3, kTestStrings2},
   };
-  AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash,
-                        procedure.get(), &report_aggregate);
+  AddStringEventsForDay(kDayIndexDay2, events_to_strings_day_2, system_profile_hash, *procedure,
+                        report_aggregate);
 
   const std::map<uint32_t, std::vector<std::string>> events_to_hashes = {
       {0, kTestHashes1},
@@ -907,7 +907,7 @@
   time_info.day_index = kDayIndexDay2;
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -940,11 +940,11 @@
     ASSERT_THAT(actualHashes, IsSubsetOf(test_hashes));
   }
 
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
 
   // Check that generating an observation again for the same day will result in no observation, even
   // though a string has not been observed due to the string buffer max being reached.
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
index 961ade1..84faa4d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
@@ -33,15 +33,15 @@
 }
 
 void CountAggregationProcedure::UpdateAggregateData(const logger::EventRecord &event_record,
-                                                    AggregateData *aggregate_data,
-                                                    AggregationPeriodBucket * /*bucket*/) {
+                                                    AggregateData &aggregate_data,
+                                                    AggregationPeriodBucket & /*bucket*/) {
   uint64_t occurrence_count = event_record.event()->occurrence_event().count();
-  aggregate_data->set_count(aggregate_data->count() + occurrence_count);
+  aggregate_data.set_count(aggregate_data.count() + occurrence_count);
 }
 
-void CountAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void CountAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                    const AggregateData &aggregate_data) {
-  merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+  merged_aggregate_data.set_count(merged_aggregate_data.count() + aggregate_data.count());
 }
 
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
@@ -50,13 +50,13 @@
     const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
   std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-      std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                         aggregate_data->event_codes().end());
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
       if (!event_vectors.count(event_vector)) {
         continue;
       }
-      aggregates_by_event_code[event_vector].push_back(&aggregate_data->data());
+      aggregates_by_event_code[event_vector].push_back(&aggregate_data.data());
     }
   }
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
index 155b201..8182ab4 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
@@ -19,10 +19,10 @@
 
   [[nodiscard]] bool IsDaily() const override;
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
index 7f7d17d..8a9723a 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure_test.cc
@@ -31,8 +31,7 @@
   const uint32_t kHourId = 1;
   const uint64_t system_profile_hash = uint64_t{56789};
   const uint32_t kNumEventCodes = 100;
-  AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, procedure.get(),
-                             &aggregate);
+  AddOccurrenceEventsForHour(kNumEventCodes, kHourId, system_profile_hash, *procedure, aggregate);
 
   ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -55,8 +54,8 @@
   const uint32_t kHourId = 1;
   const uint64_t system_profile_hash = uint64_t{56789};
   const uint32_t kNumEventCodes = 100;
-  AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash,
-                                      procedure.get(), &aggregate);
+  AddOccurrenceEventsForHourWithCount(kNumEventCodes, 5, kHourId, system_profile_hash, *procedure,
+                                      aggregate);
 
   ASSERT_TRUE(aggregate.hourly().by_hour_id().contains(kHourId));
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -78,7 +77,7 @@
 
   std::unique_ptr<CountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 30);
 }
@@ -89,7 +88,7 @@
 
   std::unique_ptr<CountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 0);
 }
@@ -101,7 +100,7 @@
 
   std::unique_ptr<CountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 10);
 }
@@ -113,7 +112,7 @@
 
   std::unique_ptr<CountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 20);
 }
@@ -130,14 +129,13 @@
   const uint32_t kEndHourId = 11;
   const uint64_t system_profile_hash = uint64_t{56789};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
-                               &aggregate);
+    AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, *procedure, aggregate);
   }
 
   util::TimeInfo info;
   info.hour_id = kEndHourId;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(info, &aggregate);
+      procedure->GenerateObservations(info, aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -151,7 +149,7 @@
     EXPECT_EQ(value.event_codes(0), value.value());
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+  procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
   EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
 
@@ -167,14 +165,13 @@
   const uint32_t kEndHourId = 11;
   const uint64_t system_profile_hash = uint64_t{56789};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, procedure.get(),
-                               &aggregate);
+    AddOccurrenceEventsForHour(kNumEventCodes, hour_id, system_profile_hash, *procedure, aggregate);
   }
 
   util::TimeInfo info;
   info.hour_id = kEndHourId;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(info, &aggregate);
+      procedure->GenerateObservations(info, aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -188,7 +185,7 @@
     EXPECT_EQ(value.event_codes(0), value.value());
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+  procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
   EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
index d1ea447..604eb3c 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.cc
@@ -25,14 +25,14 @@
           (metric.has_int_buckets()) ? metric.int_buckets() : report.int_buckets())) {}
 
 void IntegerHistogramAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
   switch (metric_type()) {
     case MetricDefinition::INTEGER: {
       const IntegerEvent &int_event = event_record.event()->integer_event();
 
       ::google::protobuf::Map<uint32_t, int64_t> *histogram =
-          aggregate_data->mutable_integer_histogram()->mutable_histogram();
+          aggregate_data.mutable_integer_histogram()->mutable_histogram();
       uint32_t index = integer_buckets_->BucketIndex(int_event.value());
 
       (*histogram)[index] += 1;
@@ -42,7 +42,7 @@
           event_record.event()->integer_histogram_event();
 
       ::google::protobuf::Map<uint32_t, int64_t> *histogram =
-          aggregate_data->mutable_integer_histogram()->mutable_histogram();
+          aggregate_data.mutable_integer_histogram()->mutable_histogram();
       for (const HistogramBucket &bucket : int_histogram_event.buckets()) {
         (*histogram)[bucket.index()] += static_cast<int64_t>(bucket.count());
       }
@@ -53,10 +53,10 @@
   }
 }
 
-void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void IntegerHistogramAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                               const AggregateData &aggregate_data) {
   for (const auto &[index, count] : aggregate_data.integer_histogram().histogram()) {
-    (*merged_aggregate_data->mutable_integer_histogram()->mutable_histogram())[index] += count;
+    (*merged_aggregate_data.mutable_integer_histogram()->mutable_histogram())[index] += count;
   }
 }
 
@@ -66,13 +66,13 @@
   std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
   data.reserve(bucket.aggregate_data.size());
 
-  for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
+  for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
     std::vector<std::tuple<uint32_t, int64_t>> histogram;
-    for (const auto &[key, value] : aggregate_data->data().integer_histogram().histogram()) {
+    for (const auto &[key, value] : aggregate_data.data().integer_histogram().histogram()) {
       histogram.emplace_back(std::make_tuple(key, value));
     }
-    std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
-                                      aggregate_data->event_codes().end());
+    std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+                                      aggregate_data.event_codes().end());
     data.emplace_back(std::make_tuple(event_codes, histogram));
   }
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
index 6112ded..dde3cee 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure.h
@@ -18,10 +18,10 @@
   explicit IntegerHistogramAggregationProcedure(const MetricDefinition &metric,
                                                 const ReportDefinition &report);
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
index 581e08b..3e7eac3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/integer_histogram_aggregation_procedure_test.cc
@@ -17,7 +17,7 @@
 class IntegerHistogramAggregationProcedureTest : public testing::TestAggregationProcedure {
  public:
   void LogIntegerEvents(uint32_t hour_id, uint32_t num_event_codes, uint64_t system_profile_hash,
-                        AggregationProcedure* procedure, ReportAggregate* aggregate) {
+                        AggregationProcedure& procedure, ReportAggregate& aggregate) {
     std::unique_ptr<logger::EventRecord> record =
         MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
     IntegerEvent* event = record->event()->mutable_integer_event();
@@ -25,15 +25,15 @@
     for (int i = 0; i < num_event_codes; i++) {
       event->set_event_code(0, i);
       event->set_value(i);
-      procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
-                                 util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+      procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+                                util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
     }
   }
 
   void LogIntegerHistogramEvents(uint32_t hour_id, uint32_t num_event_codes,
                                  const std::map<uint32_t, uint64_t>& histogram,
-                                 uint64_t system_profile_hash, AggregationProcedure* procedure,
-                                 ReportAggregate* aggregate) {
+                                 uint64_t system_profile_hash, AggregationProcedure& procedure,
+                                 ReportAggregate& aggregate) {
     std::unique_ptr<logger::EventRecord> record =
         MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
     IntegerHistogramEvent* event = record->event()->mutable_integer_histogram_event();
@@ -45,8 +45,8 @@
     event->add_event_code(0);
     for (int i = 0; i < num_event_codes; i++) {
       event->set_event_code(0, i);
-      procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
-                                 util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+      procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+                                util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
     }
   }
 
@@ -68,7 +68,7 @@
 
   const uint32_t kHourId = 1;
   const uint64_t system_profile_hash = uint64_t{1867};
-  LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
+  LogIntegerEvents(kHourId, kNumEventCodes, system_profile_hash, *procedure, aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -90,7 +90,7 @@
   const uint32_t kHourId = 1;
   const uint64_t system_profile_hash = uint64_t{1867};
   LogIntegerHistogramEvents(kHourId, kNumEventCodes, {{1, 10}, {2, 100}, {3, 50}},
-                            system_profile_hash, procedure.get(), &aggregate);
+                            system_profile_hash, *procedure, aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -110,7 +110,7 @@
 
   std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 3);
   ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
@@ -127,7 +127,7 @@
 
   std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_integer_histogram());
   EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 0);
@@ -141,7 +141,7 @@
 
   std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
   ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(0));
@@ -158,7 +158,7 @@
 
   std::unique_ptr<IntegerHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_histogram().histogram_size(), 2);
   ASSERT_TRUE(merged_data.integer_histogram().histogram().contains(1));
@@ -179,11 +179,11 @@
   const uint32_t kEndHourId = 11;
   const uint64_t system_profile_hash = uint64_t{1867};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, procedure.get(), &aggregate);
+    LogIntegerEvents(hour_id, kNumEventCodes, system_profile_hash, *procedure, aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -199,7 +199,7 @@
     EXPECT_EQ(value.bucket_counts(0), 1);
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+  procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
                                    system_profile_hash);
   EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
@@ -218,11 +218,11 @@
   const std::map<uint32_t, uint64_t> kLoggedHistogram = {{1, 10}, {2, 100}, {3, 50}};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
     LogIntegerHistogramEvents(hour_id, kNumEventCodes, kLoggedHistogram, system_profile_hash,
-                              procedure.get(), &aggregate);
+                              *procedure, aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -240,7 +240,7 @@
     }
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+  procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
                                    system_profile_hash);
   EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
index 30ef332..a7b4ba7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.cc
@@ -62,16 +62,17 @@
 NumericStatAggregationProcedure::GenerateSingleObservation(
     const std::vector<AggregateDataToGenerate> &buckets,
     const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo & /*time_info*/) {
-  std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
+  std::map<std::vector<uint32_t>, std::vector<std::reference_wrapper<const AggregateData>>>
+      aggregates_by_event_code;
 
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-      std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                         aggregate_data->event_codes().end());
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
       if (!event_vectors.count(event_vector)) {
         continue;
       }
-      aggregates_by_event_code[event_vector].push_back(&aggregate_data->data());
+      aggregates_by_event_code[event_vector].push_back(aggregate_data.data());
     }
   }
 
@@ -89,135 +90,135 @@
 }
 
 void SumNumericStatAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  aggregate_data->set_integer_value(aggregate_data->integer_value() +
-                                    event_record.event()->integer_event().value());
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  aggregate_data.set_integer_value(aggregate_data.integer_value() +
+                                   event_record.event()->integer_event().value());
 }
 
-void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SumNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                             const AggregateData &aggregate_data) {
-  merged_aggregate_data->set_integer_value(merged_aggregate_data->integer_value() +
-                                           aggregate_data.integer_value());
+  merged_aggregate_data.set_integer_value(merged_aggregate_data.integer_value() +
+                                          aggregate_data.integer_value());
 }
 
 int64_t SumNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   int64_t value = 0;
-  for (const auto *aggregate : aggregates) {
-    value += aggregate->integer_value();
+  for (const AggregateData &aggregate : aggregates) {
+    value += aggregate.integer_value();
   }
   return value;
 }
 
 void MinNumericStatAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
-    aggregate_data->set_integer_value(
-        std::min(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+    aggregate_data.set_integer_value(
+        std::min(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
   } else {
-    aggregate_data->set_integer_value(event_record.event()->integer_event().value());
+    aggregate_data.set_integer_value(event_record.event()->integer_event().value());
   }
 }
 
-void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MinNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                             const AggregateData &aggregate_data) {
   if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
-    if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
-      merged_aggregate_data->set_integer_value(
-          std::min(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+    if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+      merged_aggregate_data.set_integer_value(
+          std::min(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
     } else {
-      merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+      merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
     }
   }
 }
 
 int64_t MinNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   int64_t value = std::numeric_limits<int64_t>::max();
-  for (const auto *aggregate : aggregates) {
-    value = std::min(value, aggregate->integer_value());
+  for (const AggregateData &aggregate : aggregates) {
+    value = std::min(value, aggregate.integer_value());
   }
   return value;
 }
 
 void MaxNumericStatAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  if (aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
-    aggregate_data->set_integer_value(
-        std::max(aggregate_data->integer_value(), event_record.event()->integer_event().value()));
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+    aggregate_data.set_integer_value(
+        std::max(aggregate_data.integer_value(), event_record.event()->integer_event().value()));
   } else {
-    aggregate_data->set_integer_value(event_record.event()->integer_event().value());
+    aggregate_data.set_integer_value(event_record.event()->integer_event().value());
   }
 }
 
-void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MaxNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                             const AggregateData &aggregate_data) {
   if (aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
-    if (merged_aggregate_data->aggregate_data_case() == AggregateData::kIntegerValue) {
-      merged_aggregate_data->set_integer_value(
-          std::max(aggregate_data.integer_value(), merged_aggregate_data->integer_value()));
+    if (merged_aggregate_data.aggregate_data_case() == AggregateData::kIntegerValue) {
+      merged_aggregate_data.set_integer_value(
+          std::max(aggregate_data.integer_value(), merged_aggregate_data.integer_value()));
     } else {
-      merged_aggregate_data->set_integer_value(aggregate_data.integer_value());
+      merged_aggregate_data.set_integer_value(aggregate_data.integer_value());
     }
   }
 }
 
 int64_t MaxNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   int64_t value = std::numeric_limits<int64_t>::min();
-  for (const auto *aggregate : aggregates) {
-    value = std::max(value, aggregate->integer_value());
+  for (const AggregateData &aggregate : aggregates) {
+    value = std::max(value, aggregate.integer_value());
   }
   return value;
 }
 
 void MeanNumericStatAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  SumAndCount *sum_and_count = aggregate_data->mutable_sum_and_count();
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count();
   sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
   sum_and_count->set_count(sum_and_count->count() + 1);
 }
 
-void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void MeanNumericStatAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                              const AggregateData &aggregate_data) {
-  SumAndCount *sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+  SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count();
   sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
   sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
 }
 
 int64_t MeanNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   int64_t sum = 0;
   uint32_t count = 0;
-  for (const auto *aggregate : aggregates) {
-    sum += aggregate->sum_and_count().sum();
-    count += aggregate->sum_and_count().count();
+  for (const AggregateData &aggregate : aggregates) {
+    sum += aggregate.sum_and_count().sum();
+    count += aggregate.sum_and_count().count();
   }
   return sum / count;
 }
 
 void MedianNumericStatAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  aggregate_data->mutable_integer_values()->add_value(
-      event_record.event()->integer_event().value());
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  aggregate_data.mutable_integer_values()->add_value(event_record.event()->integer_event().value());
 }
 
 void MedianNumericStatAggregationProcedure::MergeAggregateData(
-    AggregateData *merged_aggregate_data, const AggregateData &aggregate_data) {
-  merged_aggregate_data->mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
+    AggregateData &merged_aggregate_data, const AggregateData &aggregate_data) {
+  merged_aggregate_data.mutable_integer_values()->MergeFrom(aggregate_data.integer_values());
 }
 
 namespace {
-std::vector<int64_t> CollectValues(const std::vector<const AggregateData *> &aggregates) {
+std::vector<int64_t> CollectValues(
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   std::vector<int64_t> values;
-  for (const auto *aggregate : aggregates) {
-    values.insert(values.end(), aggregate->integer_values().value().begin(),
-                  aggregate->integer_values().value().end());
+  for (const AggregateData &aggregate : aggregates) {
+    values.insert(values.end(), aggregate.integer_values().value().begin(),
+                  aggregate.integer_values().value().end());
   }
 
   sort(values.begin(), values.end());
@@ -227,7 +228,7 @@
 }  // namespace
 
 int64_t MedianNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   std::vector<int64_t> values = CollectValues(aggregates);
 
   if (values.size() % 2 == 0) {
@@ -237,7 +238,7 @@
 }
 
 int64_t PercentileNNumericStatAggregationProcedure::CollectValue(
-    const std::vector<const AggregateData *> &aggregates) {
+    const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) {
   std::vector<int64_t> values = CollectValues(aggregates);
 
   auto index = static_cast<uint32_t>((static_cast<double>(percentile_n_) / 100.0) *
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
index 9ebcd06..eb19c5b 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure.h
@@ -33,7 +33,8 @@
       const util::TimeInfo & /*time_info*/) override;
 
  protected:
-  virtual int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) = 0;
+  virtual int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) = 0;
 };
 
 class SumNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -42,16 +43,17 @@
                                               const ReportDefinition &report)
       : NumericStatAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override { return "SUM_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 };
 
 class MinNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -60,16 +62,17 @@
                                               const ReportDefinition &report)
       : NumericStatAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override { return "MIN_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 };
 
 class MaxNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -78,16 +81,17 @@
                                               const ReportDefinition &report)
       : NumericStatAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override { return "MAX_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 };
 
 class MeanNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -96,16 +100,17 @@
                                                const ReportDefinition &report)
       : NumericStatAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override { return "MEAN_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 };
 
 class MedianNumericStatAggregationProcedure : public NumericStatAggregationProcedure {
@@ -114,16 +119,17 @@
                                                  const ReportDefinition &report)
       : NumericStatAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override { return "MEDIAN_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 };
 
 class PercentileNNumericStatAggregationProcedure : public MedianNumericStatAggregationProcedure {
@@ -136,7 +142,8 @@
   [[nodiscard]] std::string DebugString() const override { return "PERCENTILE_N_NUMERIC_STAT"; }
 
  private:
-  int64_t CollectValue(const std::vector<const AggregateData *> &aggregates) override;
+  int64_t CollectValue(
+      const std::vector<std::reference_wrapper<const AggregateData>> &aggregates) override;
 
   uint32_t percentile_n_;
 };
diff --git a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
index 9e096ef..efd01f7 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/numeric_stat_aggregation_procedure_test.cc
@@ -34,20 +34,20 @@
                      int64_t /* expected value */>> {
  public:
   void LogIntegerSequence(const std::vector<int64_t> &values, util::TimeInfo time_info,
-                          uint64_t system_profile_hash, AggregationProcedure *procedure,
-                          ReportAggregate *aggregate) {
+                          uint64_t system_profile_hash, AggregationProcedure &procedure,
+                          ReportAggregate &aggregate) {
     std::unique_ptr<logger::EventRecord> record = MakeEventRecord(time_info);
     IntegerEvent *event = record->event()->mutable_integer_event();
     for (int64_t value : values) {
       event->set_value(value);
-      procedure->UpdateAggregate(
+      procedure.UpdateAggregate(
           *record, aggregate, system_profile_hash,
           util::FromUnixSeconds(util::HourIdToUnixSeconds(time_info.hour_id)));
     }
     event->add_event_code(1);
     for (int64_t value : values) {
       event->set_value(value);
-      procedure->UpdateAggregate(
+      procedure.UpdateAggregate(
           *record, aggregate, system_profile_hash,
           util::FromUnixSeconds(util::HourIdToUnixSeconds(time_info.hour_id)));
     }
@@ -130,26 +130,26 @@
   return name;
 }
 
-void CheckDebugString(int64_t daily_report_type, AggregationProcedure *procedure) {
+void CheckDebugString(int64_t daily_report_type, const AggregationProcedure &procedure) {
   switch (daily_report_type) {
     case kIntegerMetricUniqueDeviceNumericStatsReport7DaySumReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "SUM_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "SUM_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMinReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "MIN_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "MIN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMaxReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "MAX_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "MAX_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMeanReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "MEAN_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "MEAN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7DayMedianReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "MEDIAN_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "MEDIAN_NUMERIC_STAT");
       break;
     case kIntegerMetricUniqueDeviceNumericStatsReport7Day75thPercentileReportIndex:
     case kIntegerMetricUniqueDeviceNumericStatsReport7Day99thPercentileReportIndex:
-      ASSERT_EQ(procedure->DebugString(), "PERCENTILE_N_NUMERIC_STAT");
+      ASSERT_EQ(procedure.DebugString(), "PERCENTILE_N_NUMERIC_STAT");
       break;
   }
 }
@@ -162,7 +162,7 @@
   uint32_t metric_id = kIntegerMetricMetricId;
   std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, daily_report_type);
 
-  CheckDebugString(daily_report_type, procedure.get());
+  CheckDebugString(daily_report_type, *procedure);
 
   ReportAggregate report_aggregate;
   const uint32_t kDayIndex = 7;
@@ -177,12 +177,12 @@
     } else {
       window.assign(integer_sequence.begin() + (window_size * day), integer_sequence.end());
     }
-    LogIntegerSequence(window, TimeInfo::FromDayIndex(day + 1), system_profile_hash,
-                       procedure.get(), &report_aggregate);
+    LogIntegerSequence(window, TimeInfo::FromDayIndex(day + 1), system_profile_hash, *procedure,
+                       report_aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromDayIndex(kDayIndex), &report_aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromDayIndex(kDayIndex), report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -197,7 +197,7 @@
   ASSERT_EQ(integer_obs.values(1).value(), expected_value);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, util::TimeInfo::FromDayIndex(kDayIndex),
+  procedure->ObservationsCommitted(report_aggregate, util::TimeInfo::FromDayIndex(kDayIndex),
                                    system_profile_hash);
   EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex - 6), 0u);
 }
@@ -208,7 +208,7 @@
   uint32_t metric_id = kIntegerMetricMetricId;
   std::unique_ptr<AggregationProcedure> procedure = GetProcedureFor(metric_id, hourly_report_type);
 
-  CheckDebugString(daily_report_type, procedure.get());
+  CheckDebugString(daily_report_type, *procedure);
 
   ReportAggregate report_aggregate;
   const uint32_t kHourId = 20;
@@ -216,11 +216,11 @@
 
   for (uint32_t hour = 0; hour <= kHourId; hour += 2) {
     LogIntegerSequence(integer_sequence, TimeInfo::FromHourId(hour), system_profile_hash,
-                       procedure.get(), &report_aggregate);
+                       *procedure, report_aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId), &report_aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromHourId(kHourId), report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -235,7 +235,7 @@
   ASSERT_EQ(integer_obs.values(1).value(), expected_value);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, util::TimeInfo::FromHourId(kHourId),
+  procedure->ObservationsCommitted(report_aggregate, util::TimeInfo::FromHourId(kHourId),
                                    system_profile_hash);
   EXPECT_EQ(report_aggregate.hourly().by_hour_id().count(kHourId), 0u);
 }
@@ -304,7 +304,7 @@
 
   std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_value(), 300);
 }
@@ -315,7 +315,7 @@
 
   std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_value(), 0);
 }
@@ -327,7 +327,7 @@
 
   std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_value(), 200);
 }
@@ -339,7 +339,7 @@
 
   std::unique_ptr<SumNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsSumReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.integer_value(), 100);
 }
@@ -361,7 +361,7 @@
 
   std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), -15);
@@ -373,7 +373,7 @@
 
   std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), 0);
@@ -386,7 +386,7 @@
 
   std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), -15);
@@ -399,7 +399,7 @@
 
   std::unique_ptr<MinNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMinReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), 100);
@@ -422,7 +422,7 @@
 
   std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), 100);
@@ -434,7 +434,7 @@
 
   std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), 0);
@@ -447,7 +447,7 @@
 
   std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), -15);
@@ -460,7 +460,7 @@
 
   std::unique_ptr<MaxNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMaxReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.has_integer_value());
   EXPECT_EQ(merged_data.integer_value(), 100);
@@ -485,7 +485,7 @@
 
   std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 30);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
@@ -497,7 +497,7 @@
 
   std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 0);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
@@ -511,7 +511,7 @@
 
   std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 10);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
@@ -525,7 +525,7 @@
 
   std::unique_ptr<MeanNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMeanReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 20);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
@@ -550,7 +550,7 @@
 
   std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
 }
@@ -561,7 +561,7 @@
 
   std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
 }
@@ -574,7 +574,7 @@
 
   std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
 }
@@ -587,7 +587,7 @@
 
   std::unique_ptr<MedianNumericStatAggregationProcedure> procedure =
       GetProcedure(kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStatsMedianReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
 }
@@ -611,7 +611,7 @@
 
   std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
       kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 10, 100, 150));
 }
@@ -622,7 +622,7 @@
 
   std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
       kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), IsEmpty());
 }
@@ -635,7 +635,7 @@
 
   std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
       kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(-15, 150));
 }
@@ -648,7 +648,7 @@
 
   std::unique_ptr<PercentileNNumericStatAggregationProcedure> procedure = GetProcedure(
       kIntegerMetricMetricId, kIntegerMetricHourlyValueNumericStats99thPercentileReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_THAT(merged_data.integer_values().value(), UnorderedElementsAre(10, 100));
 }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
index 23138ea..615b74f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.cc
@@ -18,18 +18,18 @@
 }
 
 void SelectFirstAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord & /*event_record*/, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
-  aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+    const logger::EventRecord & /*event_record*/, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
+  aggregate_data.mutable_at_least_once()->set_at_least_once(true);
 }
 
-void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SelectFirstAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                          const AggregateData &aggregate_data) {
   if (aggregate_data.at_least_once().at_least_once()) {
-    merged_aggregate_data->mutable_at_least_once()->set_at_least_once(true);
+    merged_aggregate_data.mutable_at_least_once()->set_at_least_once(true);
     if (aggregate_data.at_least_once().last_day_index() >
-        merged_aggregate_data->at_least_once().last_day_index()) {
-      merged_aggregate_data->mutable_at_least_once()->set_last_day_index(
+        merged_aggregate_data.at_least_once().last_day_index()) {
+      merged_aggregate_data.mutable_at_least_once()->set_last_day_index(
           aggregate_data.at_least_once().last_day_index());
     }
   }
@@ -43,13 +43,13 @@
     const std::set<std::vector<uint32_t>> &event_vectors, const util::TimeInfo &time_info) {
   std::set<std::vector<uint32_t>> event_vectors_to_send;
   for (const AggregateDataToGenerate &bucket : buckets) {
-    for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-      std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                         aggregate_data->event_codes().end());
+    for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
       if (!event_vectors.count(event_vector)) {
         continue;
       }
-      if (aggregate_data->data().at_least_once().last_day_index() >= time_info.day_index) {
+      if (aggregate_data.data().at_least_once().last_day_index() >= time_info.day_index) {
         continue;
       }
 
@@ -71,7 +71,7 @@
   return logger::Encoder::EncodeIntegerObservation(data);
 }
 
-void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate *aggregate,
+void SelectFirstAggregationProcedure::ObservationsCommitted(ReportAggregate &aggregate,
                                                             util::TimeInfo info,
                                                             uint64_t system_profile_hash) const {
   std::map<uint64_t, std::vector<AggregateDataToGenerate>> data_to_generate =
@@ -89,19 +89,19 @@
     const std::set<std::vector<uint32_t>> &event_vectors =
         SelectEventVectorsForObservation(buckets);
     for (AggregateDataToGenerate &bucket : buckets) {
-      for (EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-        std::vector<uint32_t> event_vector(aggregate_data->event_codes().begin(),
-                                           aggregate_data->event_codes().end());
+      for (EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+        std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                           aggregate_data.event_codes().end());
         if (!event_vectors.count(event_vector)) {
           continue;
         }
-        if (!aggregate_data->data().at_least_once().at_least_once()) {
+        if (!aggregate_data.data().at_least_once().at_least_once()) {
           continue;
         }
-        if (aggregate_data->data().at_least_once().last_day_index() >= info.day_index) {
+        if (aggregate_data.data().at_least_once().last_day_index() >= info.day_index) {
           continue;
         }
-        aggregate_data->mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
+        aggregate_data.mutable_data()->mutable_at_least_once()->set_last_day_index(info.day_index);
       }
     }
   }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
index 0a540a7..3a3e648 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure.h
@@ -16,17 +16,17 @@
                                            const ReportDefinition &report);
 
   void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
-                           AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+                           AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override;
   [[nodiscard]] bool IsDaily() const override { return true; }
   // Call observation generation more frequently so that data can be sent as soon as possible.
   [[nodiscard]] bool IsExpedited() const override { return is_expedited_; }
-  void ObservationsCommitted(ReportAggregate *aggregate, util::TimeInfo info,
+  void ObservationsCommitted(ReportAggregate &aggregate, util::TimeInfo info,
                              uint64_t system_profile_hash) const override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
index 1f6ee10..6e36562 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_first_aggregation_procedure_test.cc
@@ -45,8 +45,7 @@
 
   ReportAggregate aggregate;
   // Add events for 2 different event vectors: {1} and {2}.
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
 
   // Check that |aggregate| was updated for the first event vector but not the second.
   ASSERT_TRUE(aggregate.daily().by_day_index().contains(kDayIndex));
@@ -69,7 +68,7 @@
   std::unique_ptr<SelectFirstAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -82,7 +81,7 @@
   std::unique_ptr<SelectFirstAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_at_least_once());
   EXPECT_FALSE(merged_data.at_least_once().at_least_once());
@@ -98,7 +97,7 @@
   std::unique_ptr<SelectFirstAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 101);
@@ -113,7 +112,7 @@
   std::unique_ptr<SelectFirstAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_TRUE(merged_data.at_least_once().at_least_once());
   EXPECT_EQ(merged_data.at_least_once().last_day_index(), 100);
@@ -133,11 +132,11 @@
 
   ReportAggregate report_aggregate;
   // Add events for 2 different event vectors: {1} and {2}.
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -151,12 +150,12 @@
   ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 0u);
@@ -176,11 +175,11 @@
 
   ReportAggregate report_aggregate;
   // Add events for 2 different event vectors: {1} and {2}.
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -195,7 +194,7 @@
 
   // Check that aggregates get marked as sent and don't get cleaned up while they are still needed
   // that day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
   ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
             1u);
@@ -205,7 +204,7 @@
   EXPECT_EQ(system_profile_agg.by_event_code(0).data().at_least_once().last_day_index(), kDayIndex);
 
   // Check that calling observation generation again for the same day generates no observation.
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 1u);
@@ -214,13 +213,13 @@
 
   // Check that calling observation generation the next day generates no observation.
   time_info.day_index++;
-  observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+  observations_or = procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 0u);
 
   // Check that obsolete aggregates get cleaned up the next day.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(kDayIndex));
 }
 
@@ -240,15 +239,15 @@
   // Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
   // ending on |kDayIndex|.
   for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
-    AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
-                              &report_aggregate);
+    AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, *procedure,
+                              report_aggregate);
   }
 
   // Check that the observation is generated for the next 7 days.
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -262,7 +261,7 @@
     ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
 
     // Check that obsolete aggregates get cleaned up.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 6));
     // Days in the 7-day window that had events continue to contain data.
@@ -285,7 +284,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 0u);
@@ -307,15 +306,15 @@
   // Add events for 2 different event vectors: {1} and {2}, for each of the 7 days in the window
   // ending on |kDayIndex|.
   for (uint32_t day = kDayIndex; day > kDayIndex - 7; --day) {
-    AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, procedure.get(),
-                              &report_aggregate);
+    AddOccurrenceEventsForDay(kNumEventCodes, day, system_profile_hash, *procedure,
+                              report_aggregate);
   }
 
   // Check that the observation is generated for the next 7 days.
   for (int i = 0; i < 7; i++) {
     time_info.day_index = kDayIndex + i;
     lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-        procedure->GenerateObservations(time_info, &report_aggregate);
+        procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -329,7 +328,7 @@
     ASSERT_EQ(integer_obs.values(0).event_codes(0), 1u);
 
     // Check that obsolete aggregates get cleaned up.
-    procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+    procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
     // No data for days beyond the 7-day window.
     EXPECT_FALSE(report_aggregate.daily().by_day_index().contains(time_info.day_index - 7));
     // Days in the 7-day window that had events continue to contain data.
@@ -349,7 +348,7 @@
     }
 
     // Check that calling observation generation again for the same day generates no observation.
-    observations_or = procedure->GenerateObservations(time_info, &report_aggregate);
+    observations_or = procedure->GenerateObservations(time_info, report_aggregate);
     ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
     observations = observations_or.ConsumeValueOrDie();
     ASSERT_EQ(observations.size(), 1u);
@@ -360,7 +359,7 @@
   // After 7 days the observation is no longer generated.
   time_info.day_index = kDayIndex + 7;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
   ASSERT_EQ(observations.size(), 0u);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
index 75d7e4d..964c70d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.cc
@@ -15,16 +15,16 @@
     : AggregationProcedure(metric, report) {}
 
 void SelectMostCommonAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket * /*bucket*/) {
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket & /*bucket*/) {
   // TODO(fxbug.dev/53775): Handle the case where event_record is malformed.
-  aggregate_data->set_count(aggregate_data->count() +
-                            event_record.event()->occurrence_event().count());
+  aggregate_data.set_count(aggregate_data.count() +
+                           event_record.event()->occurrence_event().count());
 }
 
-void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void SelectMostCommonAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                               const AggregateData &aggregate_data) {
-  merged_aggregate_data->set_count(merged_aggregate_data->count() + aggregate_data.count());
+  merged_aggregate_data.set_count(merged_aggregate_data.count() + aggregate_data.count());
 }
 
 std::string SelectMostCommonAggregationProcedure::DebugString() const {
@@ -48,10 +48,10 @@
     uint32_t event_vector_count = 0;
 
     for (const AggregateDataToGenerate &bucket : buckets) {
-      for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-        if (std::equal(aggregate_data->event_codes().begin(), aggregate_data->event_codes().end(),
+      for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+        if (std::equal(aggregate_data.event_codes().begin(), aggregate_data.event_codes().end(),
                        event_vector.begin(), event_vector.end())) {
-          event_vector_count += aggregate_data->data().count();
+          event_vector_count += aggregate_data.data().count();
           continue;
         }
       }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
index e2430ff..7a9bda5 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure.h
@@ -18,10 +18,10 @@
   explicit SelectMostCommonAggregationProcedure(const MetricDefinition &metric,
                                                 const ReportDefinition &report);
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket * /*bucket*/) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket & /*bucket*/) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   [[nodiscard]] std::string DebugString() const override;
diff --git a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
index 67ae97e..759b837 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/select_most_common_aggregation_procedure_test.cc
@@ -43,8 +43,7 @@
   ASSERT_GE(report.event_vector_buffer_max(), kNumEventCodes);
 
   ReportAggregate aggregate;
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure, aggregate);
 
   ASSERT_EQ(aggregate.daily().by_day_index().count(kDayIndex), 1u);
   ASSERT_EQ(aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(), 1u);
@@ -66,7 +65,7 @@
   std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 221);
 }
@@ -78,7 +77,7 @@
   std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 0);
 }
@@ -91,7 +90,7 @@
   std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 100);
 }
@@ -104,7 +103,7 @@
   std::unique_ptr<SelectMostCommonAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId,
                    kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.count(), 121);
 }
@@ -121,7 +120,7 @@
   ReportAggregate report_aggregate;
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -144,11 +143,11 @@
   ReportAggregate report_aggregate;
   // Log |event_code| OccurrenceEvents, each with a count of 1, for each |event_code| in the range
   // [1, kNumEventCodes]. The most common event code will be |kNumEventCodes|.
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex, system_profile_hash, *procedure,
+                            report_aggregate);
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -163,7 +162,7 @@
   EXPECT_EQ(integer_obs.values(0).value(), 1);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   EXPECT_EQ(report_aggregate.daily().by_day_index().count(kDayIndex), 0u);
 }
 
@@ -184,18 +183,18 @@
   const uint32_t kMostCommonEventCode = 1;
   // Log |event_code| OccurrenceEvents, each with a count of 1, for each |event_code| in the
   // range [1, kNumEventCodes].
-  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex - 1, system_profile_hash, procedure.get(),
-                            &report_aggregate);
+  AddOccurrenceEventsForDay(kNumEventCodes, kDayIndex - 1, system_profile_hash, *procedure,
+                            report_aggregate);
   // For another day in the aggregation period, log |kNumEventCodes| + 1 OccurrenceEvents, each with
   // a count of 1, for event code |kMostCommonEventCode|. The most common event code over the
   // aggregation period is now |kMostCommonEventCode|.
   for (int i = 0; i < kNumEventCodes + 1; ++i) {
-    AddOccurrenceEventsForDay(kMostCommonEventCode, kDayIndex, system_profile_hash, procedure.get(),
-                              &report_aggregate);
+    AddOccurrenceEventsForDay(kMostCommonEventCode, kDayIndex, system_profile_hash, *procedure,
+                              report_aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(time_info, &report_aggregate);
+      procedure->GenerateObservations(time_info, report_aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -210,7 +209,7 @@
   EXPECT_EQ(integer_obs.values(0).value(), 1);
 
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&report_aggregate, time_info, system_profile_hash);
+  procedure->ObservationsCommitted(report_aggregate, time_info, system_profile_hash);
   // Days in the 7-day window that had events continue to contain data.
   ASSERT_TRUE(report_aggregate.daily().by_day_index().contains(kDayIndex));
   ASSERT_EQ(report_aggregate.daily().by_day_index().at(kDayIndex).system_profile_aggregates_size(),
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
index 6f232e3..db8e02d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.cc
@@ -16,36 +16,35 @@
 namespace cobalt::local_aggregation {
 
 void StringHistogramAggregationProcedure::UpdateAggregateData(
-    const logger::EventRecord &event_record, AggregateData *aggregate_data,
-    AggregationPeriodBucket *bucket) {
-  StringHistogram *histogram = aggregate_data->mutable_string_histogram();
+    const logger::EventRecord &event_record, AggregateData &aggregate_data,
+    AggregationPeriodBucket &bucket) {
+  StringHistogram *histogram = aggregate_data.mutable_string_histogram();
 
   std::string bytes =
       util::FarmhashFingerprint(event_record.event()->string_event().string_value());
 
-  for (int i = 0; i < bucket->string_hashes_size(); i++) {
-    if (bucket->string_hashes(i) == bytes) {
+  for (int i = 0; i < bucket.string_hashes_size(); i++) {
+    if (bucket.string_hashes(i) == bytes) {
       (*histogram->mutable_histogram())[i] += 1;
       return;
     }
   }
 
-  if (bucket->string_hashes_size() < string_buffer_max_) {
+  if (bucket.string_hashes_size() < string_buffer_max_) {
     // Add new entry
-    (*histogram->mutable_histogram())[bucket->string_hashes_size()] += 1;
-    bucket->add_string_hashes(bytes);
+    (*histogram->mutable_histogram())[bucket.string_hashes_size()] += 1;
+    bucket.add_string_hashes(bytes);
   }
 }
 
-void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData *merged_aggregate_data,
+void StringHistogramAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
                                                              const AggregateData &aggregate_data) {
   // Merge in the aggregate data's mapping of indexes to their count.
   // This only works correctly if the AggregateData objects are both part of the same
   // AggregationPeriodBucket, such that their string_index values both refer to the same repeated
   // string_hashes field in the bucket.
   for (const auto &[string_index, count] : aggregate_data.string_histogram().histogram()) {
-    (*merged_aggregate_data->mutable_string_histogram()->mutable_histogram())[string_index] +=
-        count;
+    (*merged_aggregate_data.mutable_string_histogram()->mutable_histogram())[string_index] += count;
   }
 }
 
@@ -55,10 +54,10 @@
   std::vector<std::tuple<std::vector<uint32_t>, std::vector<std::tuple<uint32_t, int64_t>>>> data;
   data.reserve(bucket.aggregate_data.size());
 
-  for (const EventCodesAggregateData *aggregate_data : bucket.aggregate_data) {
-    const StringHistogram &histogram = aggregate_data->data().string_histogram();
-    std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
-                                      aggregate_data->event_codes().end());
+  for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+    const StringHistogram &histogram = aggregate_data.data().string_histogram();
+    std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+                                      aggregate_data.event_codes().end());
     std::vector<std::tuple<uint32_t, int64_t>> event_code_histogram;
     for (const auto &[index, count] : histogram.histogram()) {
       event_code_histogram.emplace_back(index, count);
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
index aa67ed8..b53f303 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure.h
@@ -18,10 +18,10 @@
       : HourlyAggregationProcedure(metric, report),
         string_buffer_max_(metric.string_buffer_max()) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket *bucket) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket &bucket) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
index 6f358e6..7b9b0bc 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/string_histogram_aggregation_procedure_test.cc
@@ -22,7 +22,7 @@
  protected:
   void LogStringEvents(uint32_t hour_id, uint32_t num_event_codes,
                        const std::vector<std::string>& strings, uint64_t system_profile_hash,
-                       AggregationProcedure* procedure, ReportAggregate* aggregate) {
+                       AggregationProcedure& procedure, ReportAggregate& aggregate) {
     std::unique_ptr<logger::EventRecord> record =
         MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
     StringEvent* event = record->event()->mutable_string_event();
@@ -31,8 +31,8 @@
       event->set_event_code(0, i);
       for (const std::string& str : strings) {
         event->set_string_value(str);
-        procedure->UpdateAggregate(*record, aggregate, system_profile_hash,
-                                   util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
+        procedure.UpdateAggregate(*record, aggregate, system_profile_hash,
+                                  util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id)));
       }
     }
   }
@@ -59,8 +59,8 @@
       "Integer a ullamcorper dolor.",
       "Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
   };
-  LogStringEvents(kHourId, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
-                  &aggregate);
+  LogStringEvents(kHourId, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+                  aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 1);
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -80,7 +80,7 @@
 
   std::unique_ptr<StringHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.string_histogram().histogram_size(), 3);
   ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
@@ -97,7 +97,7 @@
 
   std::unique_ptr<StringHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_FALSE(merged_data.has_string_histogram());
   EXPECT_EQ(merged_data.string_histogram().histogram_size(), 0);
@@ -111,7 +111,7 @@
 
   std::unique_ptr<StringHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
   ASSERT_TRUE(merged_data.string_histogram().histogram().contains(0));
@@ -128,7 +128,7 @@
 
   std::unique_ptr<StringHistogramAggregationProcedure> procedure =
       GetProcedure(kStringMetricMetricId, kStringMetricStringCountsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.string_histogram().histogram_size(), 2);
   ASSERT_TRUE(merged_data.string_histogram().histogram().contains(1));
@@ -153,12 +153,12 @@
       "Praesent vel nulla quis metus consectetur aliquam sed ut felis.",
   };
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
-                    &aggregate);
+    LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+                    aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -185,7 +185,7 @@
     }
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+  procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
                                    system_profile_hash);
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
@@ -213,12 +213,12 @@
       "Pellentesque dictum quam nec lectus sagittis interdum.",
   };
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, procedure.get(),
-                    &aggregate);
+    LogStringEvents(hour_id, kNumEventCodes, kTestStrings, system_profile_hash, *procedure,
+                    aggregate);
   }
 
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), &aggregate);
+      procedure->GenerateObservations(util::TimeInfo::FromHourId(kEndHourId), aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -234,7 +234,7 @@
     ASSERT_EQ(value.bucket_counts_size(), 5);
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, util::TimeInfo::FromHourId(kEndHourId),
+  procedure->ObservationsCommitted(aggregate, util::TimeInfo::FromHourId(kEndHourId),
                                    system_profile_hash);
   ASSERT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
index a7572dd..365b325 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.cc
@@ -14,31 +14,31 @@
 
 namespace cobalt::local_aggregation {
 
-void SumAndCountAggregationProcedure::UpdateAggregateData(const logger::EventRecord& event_record,
-                                                          AggregateData* aggregate_data,
-                                                          AggregationPeriodBucket* /*bucket*/) {
-  SumAndCount* sum_and_count = aggregate_data->mutable_sum_and_count();
+void SumAndCountAggregationProcedure::UpdateAggregateData(const logger::EventRecord &event_record,
+                                                          AggregateData &aggregate_data,
+                                                          AggregationPeriodBucket & /*bucket*/) {
+  SumAndCount *sum_and_count = aggregate_data.mutable_sum_and_count();
   sum_and_count->set_count(sum_and_count->count() + 1);
   sum_and_count->set_sum(sum_and_count->sum() + event_record.event()->integer_event().value());
 }
 
-void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData* merged_aggregate_data,
-                                                         const AggregateData& aggregate_data) {
-  SumAndCount* sum_and_count = merged_aggregate_data->mutable_sum_and_count();
+void SumAndCountAggregationProcedure::MergeAggregateData(AggregateData &merged_aggregate_data,
+                                                         const AggregateData &aggregate_data) {
+  SumAndCount *sum_and_count = merged_aggregate_data.mutable_sum_and_count();
   sum_and_count->set_sum(sum_and_count->sum() + aggregate_data.sum_and_count().sum());
   sum_and_count->set_count(sum_and_count->count() + aggregate_data.sum_and_count().count());
 }
 
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
-SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate& bucket) {
+SumAndCountAggregationProcedure::GenerateHourlyObservation(const AggregateDataToGenerate &bucket) {
   std::vector<std::tuple<std::vector<uint32_t>, int64_t, uint32_t>> data;
   data.reserve(bucket.aggregate_data.size());
 
-  for (const EventCodesAggregateData* aggregate_data : bucket.aggregate_data) {
-    std::vector<uint32_t> event_codes(aggregate_data->event_codes().begin(),
-                                      aggregate_data->event_codes().end());
-    data.emplace_back(std::make_tuple(event_codes, aggregate_data->data().sum_and_count().sum(),
-                                      aggregate_data->data().sum_and_count().count()));
+  for (const EventCodesAggregateData &aggregate_data : bucket.aggregate_data) {
+    std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
+                                      aggregate_data.event_codes().end());
+    data.emplace_back(std::make_tuple(event_codes, aggregate_data.data().sum_and_count().sum(),
+                                      aggregate_data.data().sum_and_count().count()));
   }
 
   if (data.empty()) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
index fbc28b3..88e2f9f 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure.h
@@ -17,10 +17,10 @@
   SumAndCountAggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report)
       : HourlyAggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
-                           AggregationPeriodBucket *bucket) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData &aggregate_data,
+                           AggregationPeriodBucket &bucket) override;
 
-  void MergeAggregateData(AggregateData *merged_aggregate_data,
+  void MergeAggregateData(AggregateData &merged_aggregate_data,
                           const AggregateData &aggregate_data) override;
 
   lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
diff --git a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
index 0bfed7b..dc769c3 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/sum_and_count_aggregation_procedure_test.cc
@@ -34,8 +34,7 @@
   const int64_t kValue = 42;
   const uint32_t kHourId = 1;
   const uint64_t system_profile_hash = uint64_t{634354};
-  AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, procedure.get(),
-                   &aggregate);
+  AddIntegerEvents(kNumEventCodes, kValue, kHourId, system_profile_hash, *procedure, aggregate);
 
   ASSERT_EQ(aggregate.hourly().by_hour_id().count(kHourId), 1u);
   ASSERT_EQ(aggregate.hourly().by_hour_id().at(kHourId).system_profile_aggregates_size(), 1u);
@@ -60,7 +59,7 @@
 
   std::unique_ptr<SumAndCountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 30);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 300);
@@ -72,7 +71,7 @@
 
   std::unique_ptr<SumAndCountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 0);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 0);
@@ -86,7 +85,7 @@
 
   std::unique_ptr<SumAndCountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 10);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 200);
@@ -100,7 +99,7 @@
 
   std::unique_ptr<SumAndCountAggregationProcedure> procedure =
       GetProcedure(kOccurrenceMetricMetricId, kOccurrenceMetricHourlyDeviceHistogramsReportIndex);
-  procedure->MergeAggregateData(&merged_data, data);
+  procedure->MergeAggregateData(merged_data, data);
 
   EXPECT_EQ(merged_data.sum_and_count().count(), 20);
   EXPECT_EQ(merged_data.sum_and_count().sum(), 100);
@@ -119,14 +118,13 @@
   const uint32_t kEndHourId = 11;
   const uint64_t system_profile_hash = uint64_t{634354};
   for (int hour_id = 1; hour_id <= kEndHourId; hour_id += 2) {
-    AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, procedure.get(),
-                     &aggregate);
+    AddIntegerEvents(kNumEventCodes, kValue, hour_id, system_profile_hash, *procedure, aggregate);
   }
 
   util::TimeInfo info;
   info.hour_id = kEndHourId;
   lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> observations_or =
-      procedure->GenerateObservations(info, &aggregate);
+      procedure->GenerateObservations(info, aggregate);
   ASSERT_EQ(observations_or.status().error_code(), StatusCode::OK);
   std::vector<ObservationAndSystemProfile> observations = observations_or.ConsumeValueOrDie();
 
@@ -142,7 +140,7 @@
     EXPECT_EQ(sum_and_count.event_codes(0) * kValue, sum_and_count.sum());
   }
   // Check that obsolete aggregates get cleaned up.
-  procedure->ObservationsCommitted(&aggregate, info, system_profile_hash);
+  procedure->ObservationsCommitted(aggregate, info, system_profile_hash);
   EXPECT_EQ(aggregate.hourly().by_hour_id_size(), 0);
 }
 
diff --git a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
index 4de47b5..75ca062 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/testing/test_aggregation_procedure.h
@@ -73,7 +73,7 @@
 
   void AddOccurrenceEventsForDay(
       uint32_t num_events, uint32_t day_index, uint64_t system_profile_hash,
-      AggregationProcedure* procedure, ReportAggregate* aggregate,
+      AggregationProcedure& procedure, ReportAggregate& aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     AddOccurrenceEventsForDayWithCount(num_events, 1, day_index, system_profile_hash, procedure,
                                        aggregate, event_time);
@@ -81,7 +81,7 @@
 
   void AddOccurrenceEventsForDayWithCount(
       uint32_t num_events, uint64_t count, uint32_t day_index, uint64_t system_profile_hash,
-      AggregationProcedure* procedure, ReportAggregate* aggregate,
+      AggregationProcedure& procedure, ReportAggregate& aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     auto record = MakeEventRecord(util::TimeInfo::FromDayIndex(day_index));
     OccurrenceEvent* event = record->event()->mutable_occurrence_event();
@@ -91,7 +91,7 @@
     for (uint32_t i = 1; i <= num_events; i++) {
       event->set_event_code(0, i);
       for (uint32_t j = 0; j < i; j++) {
-        procedure->UpdateAggregate(
+        procedure.UpdateAggregate(
             *record, aggregate, system_profile_hash,
             event_time.value_or(util::FromUnixSeconds(util::DayIndexToUnixSeconds(day_index))));
       }
@@ -100,7 +100,7 @@
 
   void AddOccurrenceEventsForHour(
       uint32_t num_events, uint32_t hour_id, uint64_t system_profile_hash,
-      AggregationProcedure* procedure, ReportAggregate* aggregate,
+      AggregationProcedure& procedure, ReportAggregate& aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     AddOccurrenceEventsForHourWithCount(num_events, 1, hour_id, system_profile_hash, procedure,
                                         aggregate, event_time);
@@ -108,7 +108,7 @@
 
   void AddOccurrenceEventsForHourWithCount(
       uint32_t num_events, uint64_t count, uint32_t hour_id, uint64_t system_profile_hash,
-      AggregationProcedure* procedure, ReportAggregate* aggregate,
+      AggregationProcedure& procedure, ReportAggregate& aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
     OccurrenceEvent* event = record->event()->mutable_occurrence_event();
@@ -118,7 +118,7 @@
     for (uint32_t i = 1; i <= num_events; i++) {
       event->set_event_code(0, i);
       for (uint32_t j = 0; j < i; j++) {
-        procedure->UpdateAggregate(
+        procedure.UpdateAggregate(
             *record, aggregate, system_profile_hash,
             event_time.value_or(util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))));
       }
@@ -127,7 +127,7 @@
 
   void AddIntegerEvents(
       uint32_t num_events, int64_t value, uint32_t hour_id, uint64_t system_profile_hash,
-      AggregationProcedure* procedure, ReportAggregate* aggregate,
+      AggregationProcedure& procedure, ReportAggregate& aggregate,
       std::optional<std::chrono::system_clock::time_point> event_time = std::nullopt) {
     auto record = MakeEventRecord(util::TimeInfo::FromHourId(hour_id));
     IntegerEvent* event = record->event()->mutable_integer_event();
@@ -137,7 +137,7 @@
     for (uint32_t i = 1; i <= num_events; i++) {
       event->set_event_code(0, i);
       for (uint32_t j = 0; j < i; j++) {
-        procedure->UpdateAggregate(
+        procedure.UpdateAggregate(
             *record, aggregate, system_profile_hash,
             event_time.value_or(util::FromUnixSeconds(util::HourIdToUnixSeconds(hour_id))));
       }
diff --git a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
index 71927dc..27f062f 100644
--- a/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
+++ b/src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.cc
@@ -103,7 +103,7 @@
 
   ++system_profile_aggregates_it;
   while (system_profile_aggregates_it != system_profile_aggregates->end()) {
-    procedure->MergeSystemProfileAggregates(&merged_system_profile_aggregate,
+    procedure->MergeSystemProfileAggregates(merged_system_profile_aggregate,
                                             *system_profile_aggregates_it);
 
     // After merging, remove the old aggregate from the repeated field.
diff --git a/src/local_aggregation_1_1/local_aggregation.cc b/src/local_aggregation_1_1/local_aggregation.cc
index f395318..e110def 100644
--- a/src/local_aggregation_1_1/local_aggregation.cc
+++ b/src/local_aggregation_1_1/local_aggregation.cc
@@ -101,7 +101,7 @@
       // Calculate the hash of the final filtered system profile.
       uint64_t system_profile_hash = util::Farmhash64(filtered_system_profile.SerializeAsString());
 
-      procedure->UpdateAggregate(event_record, &report_aggregate, system_profile_hash,
+      procedure->UpdateAggregate(event_record, report_aggregate, system_profile_hash,
                                  event_timestamp);
 
       // Make sure the final filtered system profile is stored.
diff --git a/src/local_aggregation_1_1/observation_generator.cc b/src/local_aggregation_1_1/observation_generator.cc
index b750daf..3233670 100644
--- a/src/local_aggregation_1_1/observation_generator.cc
+++ b/src/local_aggregation_1_1/observation_generator.cc
@@ -136,12 +136,11 @@
 
         if (procedure) {
           if (aggregate.aggregate()->mutable_by_report_id()->count(report.id())) {
-            ReportAggregate* report_aggregate =
-                &aggregate.aggregate()->mutable_by_report_id()->at(report.id());
+            ReportAggregate& report_aggregate =
+                aggregate.aggregate()->mutable_by_report_id()->at(report.id());
 
-            Status status =
-                GenerateObservationsForReportAggregate(aggregate, report_aggregate, procedure.get(),
-                                                       system_time, metric, metric_ref, report);
+            Status status = GenerateObservationsForReportAggregate(
+                aggregate, report_aggregate, *procedure, system_time, *metric, metric_ref, report);
 
             if (!status.ok()) {
               generation_errors.push_back(status);
@@ -172,9 +171,9 @@
 }
 
 Status ObservationGenerator::GenerateObservationsForReportAggregate(
-    const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate,
-    AggregationProcedure* procedure, std::chrono::system_clock::time_point end_time,
-    const MetricDefinition* metric, const logger::MetricRef& metric_ref,
+    const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
+    AggregationProcedure& procedure, std::chrono::system_clock::time_point end_time,
+    const MetricDefinition& metric, const logger::MetricRef& metric_ref,
     const ReportDefinition& report) {
   SystemProfile current_filtered_system_profile;
   MetadataBuilder::FilteredSystemProfile(report, system_data_->system_profile(),
@@ -183,23 +182,23 @@
 
   CB_ASSIGN_OR_RETURN(
       std::vector<util::TimeInfo> to_generate,
-      backfill_manager_.CalculateBackfill(procedure->GetLastTimeInfo(*report_aggregate), end_time,
-                                          *civil_time_converter_, *metric, procedure->IsDaily()));
+      backfill_manager_.CalculateBackfill(procedure.GetLastTimeInfo(report_aggregate), end_time,
+                                          *civil_time_converter_, metric, procedure.IsDaily()));
 
   // In general, don't generate observations for the hour ID (if hourly) or the day index (if daily)
   // corresponding to the current time. However, if `report` is a daily expedited report, then do
   // generate observations for the current day.
   //
   // TODO(fxbug.dev/92131): Move this conditional into BackfillManager.
-  if (procedure->IsDaily() && procedure->IsExpedited()) {
+  if (procedure.IsDaily() && procedure.IsExpedited()) {
     CB_ASSIGN_OR_RETURN(util::TimeInfo end_time_info,
-                        util::TimeInfo::FromTimePoint(end_time, *civil_time_converter_, *metric));
+                        util::TimeInfo::FromTimePoint(end_time, *civil_time_converter_, metric));
     to_generate.push_back(end_time_info);
   }
 
   for (util::TimeInfo time_info : to_generate) {
     CB_ASSIGN_OR_RETURN(std::vector<ObservationAndSystemProfile> observations,
-                        procedure->GenerateObservations(time_info, report_aggregate));
+                        procedure.GenerateObservations(time_info, report_aggregate));
 
     if (!observations.empty()) {
       for (ObservationAndSystemProfile& observation : observations) {
@@ -209,7 +208,7 @@
 
         CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
                             privacy_encoder_->MaybeMakePrivateObservations(
-                                std::move(observation.observation), *metric, report));
+                                std::move(observation.observation), metric, report));
 
         CB_RETURN_IF_ERROR(WriteObservations(report_aggregate, procedure, time_info, metric_ref,
                                              report, private_observations, *system_profile,
@@ -218,7 +217,7 @@
     } else {
       // Allow the privacy encoder a chance to create a fabricated observation.
       CB_ASSIGN_OR_RETURN(std::vector<std::unique_ptr<Observation>> private_observations,
-                          privacy_encoder_->MaybeMakePrivateObservations(nullptr, *metric, report));
+                          privacy_encoder_->MaybeMakePrivateObservations(nullptr, metric, report));
 
       // Use the current system profile if any fabricated private observations are created.
       // TODO(fxbug.dev/92955): choose plausible SystemProfiles for fabricated observations.
@@ -231,7 +230,7 @@
   return Status::OkStatus();
 }
 Status ObservationGenerator::WriteObservations(
-    ReportAggregate* report_aggregate, AggregationProcedure* procedure, util::TimeInfo time_info,
+    ReportAggregate& report_aggregate, AggregationProcedure& procedure, util::TimeInfo time_info,
     const logger::MetricRef& metric_ref, const ReportDefinition& report,
     const std::vector<std::unique_ptr<Observation>>& private_observations,
     const SystemProfile& system_profile, std::optional<uint64_t> commit_system_profile_hash) {
@@ -248,7 +247,7 @@
   }
 
   if (commit_system_profile_hash != std::nullopt) {
-    procedure->ObservationsCommitted(report_aggregate, time_info, *commit_system_profile_hash);
+    procedure.ObservationsCommitted(report_aggregate, time_info, *commit_system_profile_hash);
   }
   return Status::OkStatus();
 }
diff --git a/src/local_aggregation_1_1/observation_generator.h b/src/local_aggregation_1_1/observation_generator.h
index 5516648..61add03 100644
--- a/src/local_aggregation_1_1/observation_generator.h
+++ b/src/local_aggregation_1_1/observation_generator.h
@@ -85,12 +85,12 @@
   // GenerateObservationsForReportAggregate generates all needed observations for a given
   // ReportAggregate, returning a Status::OK if all observations were generated.
   Status GenerateObservationsForReportAggregate(
-      const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate* report_aggregate,
-      AggregationProcedure* procedure, std::chrono::system_clock::time_point end_time,
-      const MetricDefinition* metric, const logger::MetricRef& metric_ref,
+      const LocalAggregateStorage::MetricAggregateRef& aggregate, ReportAggregate& report_aggregate,
+      AggregationProcedure& procedure, std::chrono::system_clock::time_point end_time,
+      const MetricDefinition& metric, const logger::MetricRef& metric_ref,
       const ReportDefinition& report);
 
-  Status WriteObservations(ReportAggregate* report_aggregate, AggregationProcedure* procedure,
+  Status WriteObservations(ReportAggregate& report_aggregate, AggregationProcedure& procedure,
                            util::TimeInfo time_info, const logger::MetricRef& metric_ref,
                            const ReportDefinition& report,
                            const std::vector<std::unique_ptr<Observation>>& private_observations,
diff --git a/src/logger/BUILD.gn b/src/logger/BUILD.gn
index 87b3b05..5b4c8a0 100644
--- a/src/logger/BUILD.gn
+++ b/src/logger/BUILD.gn
@@ -32,6 +32,7 @@
   ]
   public_deps = [
     ":project_context",
+    "$cobalt_root/src/lib/util:datetime_util",
     "$cobalt_root/src/pb",
     "$cobalt_root/src/registry:cobalt_registry_proto",
   ]
diff --git a/src/logger/event_record.h b/src/logger/event_record.h
index 350d070..2822e8d 100644
--- a/src/logger/event_record.h
+++ b/src/logger/event_record.h
@@ -9,6 +9,7 @@
 
 #include <third_party/abseil-cpp/absl/strings/str_cat.h>
 
+#include "src/lib/util/datetime_util.h"
 #include "src/lib/util/status_builder.h"
 #include "src/logger/project_context.h"
 #include "src/logging.h"
@@ -47,6 +48,13 @@
   // Get the Event that is to be logged.
   [[nodiscard]] Event* event() const { return event_.get(); }
 
+  [[nodiscard]] util::TimeInfo GetTimeInfo() const {
+    return {
+        .day_index = event_->day_index(),
+        .hour_id = event_->hour_id(),
+    };
+  }
+
   // Get the SystemProfile that is to be logged with the event. Only used for Cobalt 1.1 metrics.
   [[nodiscard]] SystemProfile* system_profile() const { return system_profile_.get(); }