[EventLogger] Fix duplicated logging

Cobalt 1.1 would log to each report a number of times equal to the
number of reports in a metric. This change fixes that, so that each
report is logged to only once.

Additionally, found bug in implementation of UNIQUE_DEVICE_HISTOGRAMS,
and UNIQUE_DEVICE_NUMERIC_STATS, where the values would be aggregated as
hourly, instead of daily.

Change-Id: I402ca8f322846ce6d5f3f42b9a83c93f4989763e
Reviewed-on: https://fuchsia-review.googlesource.com/c/cobalt/+/552530
Reviewed-by: Cameron Dale <camrdale@google.com>
Reviewed-by: Alexandre Zani <azani@google.com>
Commit-Queue: Zach Bush <zmbush@google.com>
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
index 2a0ff18..a5eb48d 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.cc
@@ -14,6 +14,24 @@
 
 namespace cobalt::local_aggregation {
 
+bool CountAggregationProcedure::IsDaily() const {
+  switch (report_type()) {
+    case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
+    case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
+    case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS:
+      return false;
+
+    case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
+    case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
+      return true;
+
+    default:
+      LOG(ERROR) << "Unexpected report_type for CountAggregationProcedure: " << report_type()
+                 << ". Defaulting to IsDaily=true.";
+      return true;
+  }
+}
+
 void CountAggregationProcedure::UpdateAggregateData(const logger::EventRecord &event_record,
                                                     AggregateData *aggregate_data,
                                                     EventCodeAggregate * /*aggregate*/) {
@@ -22,14 +40,30 @@
 }
 
 lib::statusor::StatusOr<std::unique_ptr<Observation>>
-CountAggregationProcedure::GenerateHourlyObservation(EventCodeAggregate *aggregate) {
-  std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data;
-  data.reserve(aggregate->by_event_code_size());
+CountAggregationProcedure::GenerateSingleObservation(
+    const std::vector<EventCodeAggregate *> &aggregates,
+    const std::set<std::vector<uint32_t>> &event_vectors) {
+  std::map<std::vector<uint32_t>, std::vector<const AggregateData *>> aggregates_by_event_code;
+  for (const EventCodeAggregate *aggregate : aggregates) {
+    for (const EventCodesAggregateData &aggregate_data : aggregate->by_event_code()) {
+      std::vector<uint32_t> event_vector(aggregate_data.event_codes().begin(),
+                                         aggregate_data.event_codes().end());
+      if (!event_vectors.count(event_vector)) {
+        continue;
+      }
+      aggregates_by_event_code[event_vector].push_back(&aggregate_data.data());
+    }
+  }
 
-  for (const EventCodesAggregateData &aggregate_data : aggregate->by_event_code()) {
-    std::vector<uint32_t> event_codes(aggregate_data.event_codes().begin(),
-                                      aggregate_data.event_codes().end());
-    data.emplace_back(std::make_tuple(event_codes, aggregate_data.data().count()));
+  std::vector<std::tuple<std::vector<uint32_t>, int64_t>> data;
+  data.reserve(aggregates_by_event_code.size());
+
+  for (auto [event_codes, aggregates] : aggregates_by_event_code) {
+    int64_t count = 0;
+    for (const AggregateData *aggregate : aggregates) {
+      count += aggregate->count();
+    }
+    data.emplace_back(std::make_tuple(event_codes, count));
   }
 
   if (data.empty()) {
diff --git a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
index 7c8bd6a..3ab1b80 100644
--- a/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
+++ b/src/local_aggregation_1_1/aggregation_procedures/count_aggregation_procedure.h
@@ -12,17 +12,19 @@
 
 namespace cobalt::local_aggregation {
 
-class CountAggregationProcedure : public HourlyAggregationProcedure {
+class CountAggregationProcedure : public AggregationProcedure {
  public:
   CountAggregationProcedure(const MetricDefinition &metric, const ReportDefinition &report)
-      : HourlyAggregationProcedure(metric, report) {}
+      : AggregationProcedure(metric, report) {}
 
-  void UpdateAggregateData(const logger::EventRecord & /*event_record*/,
-                           AggregateData *aggregate_data,
-                           EventCodeAggregate * /*aggregate*/) override;
+  [[nodiscard]] bool IsDaily() const override;
 
-  lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateHourlyObservation(
-      EventCodeAggregate *aggregates) override;
+  void UpdateAggregateData(const logger::EventRecord &event_record, AggregateData *aggregate_data,
+                           EventCodeAggregate *aggregate) override;
+
+  lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation(
+      const std::vector<EventCodeAggregate *> &aggregates,
+      const std::set<std::vector<uint32_t>> &event_vectors) override;
 
   [[nodiscard]] std::string DebugString() const override;
 };
diff --git a/src/logger/BUILD.gn b/src/logger/BUILD.gn
index 641b9b2..802e850 100644
--- a/src/logger/BUILD.gn
+++ b/src/logger/BUILD.gn
@@ -185,6 +185,7 @@
     "$cobalt_root/src:tracing",
     "$cobalt_root/src/local_aggregation:event_aggregator",
     "$cobalt_root/src/local_aggregation_1_1:local_aggregation",
+    "$cobalt_root/src/local_aggregation_1_1:proto",
   ]
 }
 
diff --git a/src/logger/event_loggers.cc b/src/logger/event_loggers.cc
index e3219dd..e031f5f 100644
--- a/src/logger/event_loggers.cc
+++ b/src/logger/event_loggers.cc
@@ -288,26 +288,38 @@
   // useful information.
   auto trace = TraceEvent(*event_record);
 
-  for (const auto& report : event_record->metric()->reports()) {
-    auto status = MaybeUpdateLocalAggregation(report, *event_record);
-    if (status != kOK) {
-      TraceLogFailure(status, *event_record, trace, report);
+  Status status;
+  if (IsOnePointOne()) {
+    const cobalt::util::Status cobalt_status = local_aggregation()->AddEvent(*event_record);
+    status = FromStatus(cobalt_status);
+    if (!cobalt_status.ok()) {
+      LOG(ERROR) << "Error occurred while locally aggregating event: "
+                 << cobalt_status.error_message();
+      TraceLogFailure(status, *event_record, trace, event_record->metric()->reports(0));
       return status;
     }
+  } else {
+    for (const auto& report : event_record->metric()->reports()) {
+      status = MaybeUpdateLocalAggregation(report, *event_record);
+      if (status != kOK) {
+        TraceLogFailure(status, *event_record, trace, report);
+        return status;
+      }
 
-    // If we are processing the final report, then we set may_invalidate
-    // to true in order to allow data to be moved out of |event_record|
-    // instead of being copied. One example where this is useful is when
-    // creating an immediate Observation of type Histogram. In that case
-    // we can move the histogram from the Event to the Observation and
-    // avoid copying. Since the |event_record| is invalidated, any other
-    // operation on the |event_record| must be performed before this for
-    // loop.
-    bool may_invalidate = ++report_index == num_reports;
-    status = MaybeGenerateImmediateObservation(report, may_invalidate, event_record.get());
-    if (status != kOK) {
-      TraceLogFailure(status, *event_record, trace, report);
-      return status;
+      // If we are processing the final report, then we set may_invalidate
+      // to true in order to allow data to be moved out of |event_record|
+      // instead of being copied. One example where this is useful is when
+      // creating an immediate Observation of type Histogram. In that case
+      // we can move the histogram from the Event to the Observation and
+      // avoid copying. Since the |event_record| is invalidated, any other
+      // operation on the |event_record| must be performed before this for
+      // loop.
+      bool may_invalidate = ++report_index == num_reports;
+      status = MaybeGenerateImmediateObservation(report, may_invalidate, event_record.get());
+      if (status != kOK) {
+        TraceLogFailure(status, *event_record, trace, report);
+        return status;
+      }
     }
   }
 
@@ -419,6 +431,10 @@
 // and returns OK.
 Status EventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& /*report*/,
                                                 const EventRecord& /*event_record*/) {
+  // In Cobalt 1.1 calling MaybeUpdateLocalAggregation is considered an error.
+  if (IsOnePointOne()) {
+    return kOther;
+  }
   return kOK;
 }
 
@@ -426,6 +442,10 @@
                                                       bool may_invalidate,
                                                       EventRecord* event_record) {
   TRACE_DURATION("cobalt_core", "EventLogger::MaybeGenerateImmediateObservation");
+  // In Cobalt 1.1 calling MaybeGenerateImmediateObservation is considered an error.
+  if (IsOnePointOne()) {
+    return kOther;
+  }
 
   auto encoder_result = MaybeEncodeImmediateObservation(report, may_invalidate, event_record);
   if (encoder_result.status != kOK) {
@@ -820,27 +840,6 @@
                             event_record.project_context()->FullMetricName(metric));
 }
 
-Status OccurrenceEventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                                          const EventRecord& event_record) {
-  switch (report.report_type()) {
-    case ReportDefinition::FLEETWIDE_OCCURRENCE_COUNTS:
-    case ReportDefinition::UNIQUE_DEVICE_COUNTS:
-    case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
-    case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
-    case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
-    case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS: {
-      const cobalt::util::Status status = local_aggregation()->AddEvent(event_record);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error occurred while locally aggregating Occurrence event: "
-                   << status.error_message();
-      }
-      return FromStatus(status);
-    }
-    default:
-      return Status::kInvalidArguments;
-  }
-}
-
 ///////////// IntegerEventLogger method implementations //////////////////////////
 
 Status IntegerEventLogger::ValidateEvent(const EventRecord& event_record) {
@@ -859,27 +858,6 @@
                             event_record.project_context()->FullMetricName(metric));
 }
 
-Status IntegerEventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                                       const EventRecord& event_record) {
-  switch (report.report_type()) {
-    case ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS:
-    case ReportDefinition::HOURLY_VALUE_HISTOGRAMS:
-    case ReportDefinition::FLEETWIDE_HISTOGRAMS:
-    case ReportDefinition::FLEETWIDE_MEANS:
-    case ReportDefinition::UNIQUE_DEVICE_NUMERIC_STATS:
-    case ReportDefinition::HOURLY_VALUE_NUMERIC_STATS: {
-      const cobalt::util::Status status = local_aggregation()->AddEvent(event_record);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error occurred while locally aggregating Integer event: "
-                   << status.error_message();
-      }
-      return FromStatus(status);
-    }
-    default:
-      return Status::kInvalidArguments;
-  }
-}
-
 ///////////// IntegerHistogramEventLogger method implementations //////////////////////////
 
 Status IntegerHistogramEventLogger::ValidateEvent(const EventRecord& event_record) {
@@ -944,22 +922,6 @@
   return kOK;
 }
 
-Status IntegerHistogramEventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                                                const EventRecord& event_record) {
-  switch (report.report_type()) {
-    case ReportDefinition::FLEETWIDE_HISTOGRAMS: {
-      const cobalt::util::Status status = local_aggregation()->AddEvent(event_record);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error occurred while locally aggregating IntegerHistogram event: "
-                   << status.error_message();
-      }
-      return FromStatus(status);
-    }
-    default:
-      return Status::kInvalidArguments;
-  }
-}
-
 ///////////// StringEventLogger method implementations //////////////////////////
 
 Status StringEventLogger::ValidateEvent(const EventRecord& event_record) {
@@ -978,22 +940,6 @@
                             event_record.project_context()->FullMetricName(metric));
 }
 
-Status StringEventLogger::MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                                      const EventRecord& event_record) {
-  switch (report.report_type()) {
-    case ReportDefinition::STRING_COUNTS: {
-      const cobalt::util::Status status = local_aggregation()->AddEvent(event_record);
-      if (!status.ok()) {
-        LOG(ERROR) << "Error occurred while locally aggregating String event: "
-                   << status.error_message();
-      }
-      return FromStatus(status);
-    }
-    default:
-      return Status::kInvalidArguments;
-  }
-}
-
 /////////////// CustomEventLogger method implementations ///////////////////////
 
 Status CustomEventLogger::ValidateEvent(const EventRecord& /*event_record*/) {
diff --git a/src/logger/event_loggers.h b/src/logger/event_loggers.h
index ca38327..c0433db 100644
--- a/src/logger/event_loggers.h
+++ b/src/logger/event_loggers.h
@@ -127,6 +127,8 @@
                                                           bool may_invalidate,
                                                           EventRecord* event_record);
 
+  [[nodiscard]] virtual bool IsOnePointOne() const { return false; };
+
   // Traces an |event_record| into a string if the metric is tagged with
   // also_log_locally. If not, it will return the empty string.
   static std::string TraceEvent(const EventRecord& event_record);
@@ -266,8 +268,7 @@
 
  private:
   Status ValidateEvent(const EventRecord& event_record) override;
-  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                     const EventRecord& event_record) override;
+  [[nodiscard]] bool IsOnePointOne() const override { return true; };
 };
 
 // Implementation of EventLogger for metrics of type INTEGER.
@@ -278,8 +279,7 @@
 
  private:
   Status ValidateEvent(const EventRecord& event_record) override;
-  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                     const EventRecord& event_record) override;
+  [[nodiscard]] bool IsOnePointOne() const override { return true; };
 };
 
 // Implementation of EventLogger for metrics of type INTEGER_HISTOGRAM.
@@ -290,8 +290,7 @@
 
  private:
   Status ValidateEvent(const EventRecord& event_record) override;
-  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                     const EventRecord& event_record) override;
+  [[nodiscard]] bool IsOnePointOne() const override { return true; };
 };
 
 // Implementation of EventLogger for metrics of type STRING.
@@ -302,8 +301,7 @@
 
  private:
   Status ValidateEvent(const EventRecord& event_record) override;
-  Status MaybeUpdateLocalAggregation(const ReportDefinition& report,
-                                     const EventRecord& event_record) override;
+  [[nodiscard]] bool IsOnePointOne() const override { return true; };
 };
 
 // Implementation of EventLogger for metrics of type CUSTOM.
diff --git a/src/logger/event_loggers_test.cc b/src/logger/event_loggers_test.cc
index f6612ee..f8ac1c4 100644
--- a/src/logger/event_loggers_test.cc
+++ b/src/logger/event_loggers_test.cc
@@ -19,12 +19,17 @@
 #include "src/lib/util/encrypted_message_util.h"
 #include "src/lib/util/testing/test_with_files.h"
 #include "src/local_aggregation/event_aggregator_mgr.h"
+#include "src/local_aggregation/local_aggregation.pb.h"
 #include "src/local_aggregation/test_utils/test_event_aggregator_mgr.h"
+#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
+#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
 #include "src/local_aggregation_1_1/local_aggregation.h"
+#include "src/local_aggregation_1_1/local_aggregation.pb.h"
 #include "src/logger/encoder.h"
 #include "src/logger/logger_test_utils.h"
 #include "src/logger/project_context.h"
 #include "src/logger/status.h"
+#include "src/logger/test_registries/cobalt1.1_test_registry.cb.h"
 #include "src/logger/testing_constants.h"
 #include "src/pb/metadata_builder.h"
 #include "src/pb/observation.pb.h"
@@ -1610,7 +1615,7 @@
     cfg.local_aggregation_backfill_days = 0;
     cfg.local_aggregate_proto_store_path = aggregate_store_path();
     cfg.obs_history_proto_store_path = obs_history_path();
-    cfg.local_aggregate_store_dir = test_folder() + "/local_aggregation_store_dir";
+    cfg.local_aggregate_store_dir = local_aggregation_store_path();
 
     event_aggregator_mgr_ = std::make_unique<TestEventAggregatorManager>(cfg, fs(), encoder_.get(),
                                                                          observation_writer_.get());
@@ -1620,6 +1625,25 @@
                                                             observation_writer_.get());
   }
 
+  local_aggregation::ImmediateLocalAggregateStorage GetLocalAggregateStorage() {
+    return local_aggregation::ImmediateLocalAggregateStorage(local_aggregation_store_path(), fs(),
+                                                             project_context_factory_.get());
+  }
+
+  local_aggregation::ReportAggregate GetReportAggregate(
+      uint32_t metric_id, MetricDefinition::MetricType metric_type, uint32_t report_id,
+      local_aggregation::LocalAggregateStorage* store) {
+    std::unique_ptr<EventRecord> event_record =
+        GetEventRecordWithMetricType(metric_id, metric_type);
+    local_aggregation::LocalAggregateStorage::MetricAggregateRef aggregate =
+        store
+            ->GetMetricAggregate(event_record->project_context()->project().customer_id(),
+                                 event_record->project_context()->project().project_id(),
+                                 event_record->metric()->id())
+            .ConsumeValueOrDie();
+    return aggregate.aggregate()->by_report_id().at(report_id);
+  }
+
   std::unique_ptr<EventLogger> GetEventLoggerForMetricType(
       MetricDefinition::MetricType metric_type) {
     auto logger =
@@ -1714,14 +1738,20 @@
       ReportDefinition::UNIQUE_DEVICE_HISTOGRAMS);
   auto event_record = GetEventRecordWithMetricType(testing::cobalt_new::kSettingsChangedMetricId,
                                                    MetricDefinition::OCCURRENCE);
-
   auto event_logger = GetEventLoggerForMetricType(MetricDefinition::OCCURRENCE);
-  auto status = MaybeUpdateLocalAggregation(event_logger.get(), report, *event_record);
+  EXPECT_EQ(kOK, event_logger->Log(std::move(event_record), std::chrono::system_clock::now()));
 
-  EXPECT_EQ(kOK, status);
-  // TODO(camrdale): once local_aggregation_1_1 supports testing like the event aggregator does,
-  // add something like this check for the new local aggregation:
-  // EXPECT_EQ(1, event_aggregator_mgr_->NumPerDeviceNumericAggregatesInStore());
+  event_record = GetEventRecordWithMetricType(testing::cobalt_new::kSettingsChangedMetricId,
+                                              MetricDefinition::OCCURRENCE);
+  local_aggregation::ImmediateLocalAggregateStorage store = GetLocalAggregateStorage();
+  local_aggregation::ReportAggregate report_aggregate = GetReportAggregate(
+      testing::cobalt_new::kSettingsChangedMetricId, MetricDefinition::OCCURRENCE,
+      testing::cobalt_new::kSettingsChangedSettingsChangedUniqueDeviceHistogramsReportId, &store);
+  ASSERT_TRUE(report_aggregate.has_daily());
+  local_aggregation::EventCodeAggregate daily_aggregate =
+      report_aggregate.daily().by_day_index().begin()->second;
+  local_aggregation::AggregateData data = daily_aggregate.by_event_code().at(0).data();
+  EXPECT_EQ(data.count(), 1);
 }
 
 TEST_F(EventLoggersLocalAggregationTest, IntegerFleetwideMeansValidation) {
@@ -1750,12 +1780,20 @@
                                                    MetricDefinition::INTEGER);
 
   auto event_logger = GetEventLoggerForMetricType(MetricDefinition::INTEGER);
-  auto status = MaybeUpdateLocalAggregation(event_logger.get(), report, *event_record);
+  EXPECT_EQ(kOK, event_logger->Log(std::move(event_record), std::chrono::system_clock::now()));
 
-  EXPECT_EQ(kOK, status);
-  // TODO(camrdale): once local_aggregation_1_1 supports testing like the event aggregator does,
-  // add something like this check for the new local aggregation:
-  // EXPECT_EQ(1, event_aggregator_mgr_->NumPerDeviceNumericAggregatesInStore());
+  event_record = GetEventRecordWithMetricType(testing::cobalt_new::kStreamingTimeMetricId,
+                                              MetricDefinition::INTEGER);
+  local_aggregation::ImmediateLocalAggregateStorage store = GetLocalAggregateStorage();
+  local_aggregation::ReportAggregate report_aggregate = GetReportAggregate(
+      testing::cobalt_new::kStreamingTimeMetricId, MetricDefinition::INTEGER,
+      testing::cobalt_new::kStreamingTimeStreamingTimeAggregationReportId, &store);
+  ASSERT_TRUE(report_aggregate.has_hourly());
+  local_aggregation::EventCodeAggregate hourly_aggregate =
+      report_aggregate.hourly().by_hour_id().begin()->second;
+  local_aggregation::AggregateData data = hourly_aggregate.by_event_code().at(0).data();
+  ASSERT_TRUE(data.has_sum_and_count());
+  EXPECT_EQ(data.sum_and_count().sum(), 10);
 }
 
 TEST_F(EventLoggersLocalAggregationTest, IntegerHistogramFleetwideHistogramsValidation) {
@@ -1799,12 +1837,24 @@
       testing::cobalt_new::kFileSystemWriteTimesMetricId, MetricDefinition::INTEGER_HISTOGRAM);
 
   auto event_logger = GetEventLoggerForMetricType(MetricDefinition::INTEGER_HISTOGRAM);
-  auto status = MaybeUpdateLocalAggregation(event_logger.get(), report, *event_record);
+  EXPECT_EQ(kOK, event_logger->Log(std::move(event_record), std::chrono::system_clock::now()));
 
-  EXPECT_EQ(kOK, status);
-  // TODO(camrdale): once local_aggregation_1_1 supports testing like the event aggregator does,
-  // add something like this check for the new local aggregation:
-  // EXPECT_EQ(1, event_aggregator_mgr_->NumPerDeviceNumericAggregatesInStore());
+  event_record = GetEventRecordWithMetricType(testing::cobalt_new::kFileSystemWriteTimesMetricId,
+                                              MetricDefinition::INTEGER_HISTOGRAM);
+  local_aggregation::ImmediateLocalAggregateStorage store = GetLocalAggregateStorage();
+  local_aggregation::ReportAggregate report_aggregate = GetReportAggregate(
+      testing::cobalt_new::kFileSystemWriteTimesMetricId, MetricDefinition::INTEGER_HISTOGRAM,
+      testing::cobalt_new::kFileSystemWriteTimesFileSystemWriteTimesHistogramReportId, &store);
+  ASSERT_TRUE(report_aggregate.has_hourly());
+  local_aggregation::EventCodeAggregate hourly_aggregate =
+      report_aggregate.hourly().by_hour_id().begin()->second;
+  local_aggregation::AggregateData data = hourly_aggregate.by_event_code().at(0).data();
+  ASSERT_TRUE(data.has_integer_histogram());
+  const auto& histogram = data.integer_histogram();
+  EXPECT_EQ(histogram.histogram().at(0), 100);
+  EXPECT_EQ(histogram.histogram().at(1), 101);
+  EXPECT_EQ(histogram.histogram().at(2), 102);
+  EXPECT_EQ(histogram.histogram().at(3), 103);
 }
 
 TEST_F(EventLoggersLocalAggregationTest, StringStringHistogramsValidation) {
@@ -1832,12 +1882,21 @@
                                                    MetricDefinition::STRING);
 
   auto event_logger = GetEventLoggerForMetricType(MetricDefinition::STRING);
-  auto status = MaybeUpdateLocalAggregation(event_logger.get(), report, *event_record);
+  EXPECT_EQ(kOK, event_logger->Log(std::move(event_record), std::chrono::system_clock::now()));
 
-  EXPECT_EQ(kOK, status);
-  // TODO(camrdale): once local_aggregation_1_1 supports testing like the event aggregator does,
-  // add something like this check for the new local aggregation:
-  // EXPECT_EQ(1, event_aggregator_mgr_->NumPerDeviceNumericAggregatesInStore());
+  event_record = GetEventRecordWithMetricType(testing::cobalt_new::kComponentMetricId,
+                                              MetricDefinition::STRING);
+  local_aggregation::ImmediateLocalAggregateStorage store = GetLocalAggregateStorage();
+  local_aggregation::ReportAggregate report_aggregate =
+      GetReportAggregate(testing::cobalt_new::kComponentMetricId, MetricDefinition::STRING,
+                         testing::cobalt_new::kComponentComponentHistogramReportId, &store);
+  ASSERT_TRUE(report_aggregate.has_hourly());
+  local_aggregation::EventCodeAggregate hourly_aggregate =
+      report_aggregate.hourly().by_hour_id().begin()->second;
+  local_aggregation::AggregateData data = hourly_aggregate.by_event_code().at(0).data();
+  ASSERT_TRUE(data.has_string_histogram());
+  const auto& histogram = data.string_histogram();
+  EXPECT_EQ(histogram.histogram().at(0), 1);
 }
 
 }  // namespace internal
diff --git a/src/logger/test_registries/cobalt1.1_test_registry.yaml b/src/logger/test_registries/cobalt1.1_test_registry.yaml
index ee74590..b9cadbc 100644
--- a/src/logger/test_registries/cobalt1.1_test_registry.yaml
+++ b/src/logger/test_registries/cobalt1.1_test_registry.yaml
@@ -25,6 +25,9 @@
     - report_name: "SettingsChanged_UniqueDeviceHistograms"
       id: 1
       report_type: UNIQUE_DEVICE_HISTOGRAMS
+    - report_name: "SettingsChanged_UniqueDeviceHistograms_Two"
+      id: 2
+      report_type: UNIQUE_DEVICE_HISTOGRAMS
 
 - id: 2
   metric_name: "StreamingTime"
@@ -40,6 +43,9 @@
     - report_name: "StreamingTime_Aggregation"
       id: 1
       report_type: FLEETWIDE_MEANS
+    - report_name: "StreamingTime_Aggregation_Two"
+      id: 2
+      report_type: FLEETWIDE_MEANS
 
 - id: 3
   metric_name: "FileSystemWriteTimes"
@@ -62,6 +68,14 @@
           floor: 0
           num_buckets: 120
           step_size: 1000
+    - report_name: "FileSystemWriteTimes_Histogram_Two"
+      id: 2
+      report_type: FLEETWIDE_HISTOGRAMS
+      int_buckets:
+        linear:
+          floor: 0
+          num_buckets: 120
+          step_size: 1000
 
 - id: 4
   metric_name: "Component"
@@ -74,3 +88,6 @@
     - report_name: "Component_Histogram"
       id: 1
       report_type: STRING_COUNTS
+    - report_name: "Component_Histogram_Two"
+      id: 2
+      report_type: STRING_COUNTS