// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/local_aggregation_1_1/local_aggregation.h"

#include <time.h>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/testing/report_all_test_registry_with_report_all_set.cb.h"
#include "src/local_aggregation_1_1/testing/report_all_test_registry_with_select_last_set.cb.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/event_record.h"
#include "src/logger/logger_test_utils.h"
#include "src/logger/observation_writer.h"
#include "src/logger/project_context_factory.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/system_data/client_secret.h"
#include "src/system_data/fake_system_data.h"

namespace cobalt::local_aggregation {
using ::testing::Contains;
using ::testing::ContainsRegex;
using ::testing::HasSubstr;
using ::testing::Not;
using TimeInfo = util::TimeInfo;

namespace {

// Start 9th January 2014 at 10 minutes after 7PM, UTC.
std::tm kStartingTime = {
    .tm_sec = 0,
    .tm_min = 10,
    .tm_hour = 19,
    .tm_mday = 9,    // 9th
    .tm_mon = 0,     // January
    .tm_year = 114,  // 2014
};
const std::chrono::time_point<std::chrono::system_clock> kStartingTimePoint =
    std::chrono::system_clock::from_time_t(timegm(&kStartingTime));

std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) {
  std::string bytes;
  if (!absl::Base64Unescape(registryBase64, &bytes)) {
    LOG(ERROR) << "Unable to decode Base64 String";
    return nullptr;
  }

  auto registry = std::make_unique<CobaltRegistry>();
  if (!registry->ParseFromString(bytes)) {
    LOG(ERROR) << "Unable to parse registry from bytes";
    return nullptr;
  }

  return registry;
}

}  // namespace

class LocalAggregationTest : public util::testing::TestWithFiles {
 private:
  void SetUp() override {
    TestWithFiles::SetUp();
    global_project_context_factory_ =
        std::make_unique<logger::ProjectContextFactory>(GetRegistry(kCobaltRegistryBase64));
    mock_clock_ =
        std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0));
    mock_clock_->set_time(kStartingTimePoint);
    validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get());
    validated_clock_->SetAccurate(true);
    civil_time_converter_ = std::make_unique<util::UtcTimeConverter>();
    metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(),
                                                          system_data_cache_path(), fs());
    system_data_.SetVersion("never used");
    metadata_builder_->SnapshotSystemData();
    test_writer_ = std::make_unique<logger::testing::FakeObservationStore>();
    observation_writer_ = std::make_unique<logger::ObservationWriter>(test_writer_.get(), nullptr);
  }

 protected:
  std::unique_ptr<LocalAggregation> MakeLocalAggregation(
      cobalt::StorageQuotas storage_quotas = cobalt::kDefaultStorageQuotas) {
    CobaltConfig cfg = {
        .storage_quotas = storage_quotas,
        .local_aggregate_store_dir = local_aggregation_store_path(),
        .client_secret = system_data::ClientSecret::GenerateNewSecret(),
    };

    return std::make_unique<LocalAggregation>(
        cfg, global_project_context_factory_.get(), &system_data_, metadata_builder_.get(), fs(),
        observation_writer_.get(), civil_time_converter_.get());
  }

  void OverrideRegistry(const std::string& registryBase64) {
    global_project_context_factory_ =
        std::make_unique<logger::ProjectContextFactory>(GetRegistry(registryBase64));
  }

  std::shared_ptr<const logger::ProjectContext> GetProjectContext() {
    return global_project_context_factory_->NewProjectContext(
        lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId));
  }

  void VerifyStoredIntegerObservation(
      int observation_num, lib::ReportIdentifier report, uint32_t day_index,
      const std::string& system_version, std::vector<int> expected_count,
      std::vector<std::vector<uint32_t>> expected_event_codes = {}) {
    ObservationMetadata* metadata = test_writer_->metadata_received[observation_num].get();
    EXPECT_EQ(metadata->customer_id(), report.customer_id())
        << " for observation: " << observation_num;
    EXPECT_EQ(metadata->project_id(), report.project_id())
        << " for observation: " << observation_num;
    EXPECT_EQ(metadata->metric_id(), report.metric_id()) << " for observation: " << observation_num;
    EXPECT_EQ(metadata->report_id(), report.report_id()) << " for observation: " << observation_num;
    EXPECT_EQ(metadata->day_index(), day_index) << " for observation: " << observation_num;
    EXPECT_EQ(metadata->system_profile().system_version(), system_version)
        << " for observation: " << observation_num;
    EXPECT_EQ(metadata->system_profile().product_name(), "")
        << " for observation: " << observation_num;
    observation_store::StoredObservation* stored_observation =
        test_writer_->messages_received[observation_num].get();
    ASSERT_TRUE(stored_observation->has_unencrypted()) << " for observation: " << observation_num;
    ASSERT_TRUE(stored_observation->unencrypted().has_integer())
        << " for observation: " << observation_num;
    ASSERT_EQ(stored_observation->unencrypted().integer().values_size(), expected_count.size())
        << " for observation: " << observation_num;
    if (!expected_event_codes.empty()) {
      ASSERT_EQ(expected_event_codes.size(), expected_count.size())
          << " for observation: " << observation_num;
    }
    for (int i = 0; i < expected_count.size(); i++) {
      EXPECT_EQ(stored_observation->unencrypted().integer().values(i).value(), expected_count[i])
          << " for observation " << observation_num << " integer value: " << i;
      if (!expected_event_codes.empty()) {
        EXPECT_THAT(stored_observation->unencrypted().integer().values(i).event_codes(),
                    testing::ElementsAreArray(expected_event_codes[i]))
            << " for observation " << observation_num << " integer value: " << i;
      }
    }
  }

  system_data::FakeSystemData system_data_;
  std::unique_ptr<logger::testing::FakeObservationStore> test_writer_;
  std::unique_ptr<logger::ObservationWriter> observation_writer_;
  std::unique_ptr<util::IncrementingSystemClock> mock_clock_;
  std::unique_ptr<util::UtcTimeConverter> civil_time_converter_;

 private:
  std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
  std::unique_ptr<util::FakeValidatedClock> validated_clock_;
  std::unique_ptr<MetadataBuilder> metadata_builder_;
};

TEST_F(LocalAggregationTest, AddEventWorks) {
  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();

  ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
                                           kProjectId))
                  .ConsumeValueOrDie(),
              Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));

  auto record = logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId)
                    .ValueOrDie();
  record->event()->mutable_occurrence_event();
  aggregation->AddEvent(*record, mock_clock_->now());

  ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
                                           kProjectId))
                  .ConsumeValueOrDie(),
              Contains(absl::StrCat(kOccurrenceMetricMetricId)));
}

TEST_F(LocalAggregationTest, DisableWorks) {
  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
  aggregation->Disable(true);

  // Check that the no data is present.
  ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
                                           kProjectId))
                  .ConsumeValueOrDie(),
              Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));

  auto record = logger::EventRecord::MakeEventRecord(GetProjectContext(), kOccurrenceMetricMetricId)
                    .ValueOrDie();
  record->event()->mutable_occurrence_event();
  aggregation->AddEvent(*record, mock_clock_->now());

  // Check that the data still isn't present.
  ASSERT_THAT(fs()->ListFiles(absl::StrCat(local_aggregation_store_path(), "/", kCustomerId, "/",
                                           kProjectId))
                  .ConsumeValueOrDie(),
              Not(Contains(absl::StrCat(kOccurrenceMetricMetricId))));
}

TEST_F(LocalAggregationTest, GenerateObservationsWorks) {
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
  lib::MetricIdentifier metric_id =
      project_context->Identifier().ForMetric(kOccurrenceMetricMetricId);
  const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId);
  // This test assumes that this metric has no reports with added privacy.
  for (const ReportDefinition& report : metric->reports()) {
    ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
  }

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();

  // Before any events occur, aggregation run happens, no observations are generated.
  aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_EQ(test_writer_->num_observations_added(), 0);

  // 10 minutes later, 4 events occur.
  mock_clock_->increment_by(std::chrono::minutes(10));
  std::unique_ptr<logger::EventRecord> record =
      logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).ValueOrDie();
  record->event()->mutable_occurrence_event()->set_count(1);
  std::chrono::system_clock::time_point now = mock_clock_->now();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
                                                  now, *civil_time_converter_, *record->metric()));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
                                                now, *civil_time_converter_, *record->metric()));
  record->event()->set_day_index(day_index);
  record->event()->set_hour_id(hour_id);
  record->system_profile()->set_system_version("100");
  record->system_profile()->set_product_name("should-be-filtered");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(50));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
  // hourly_device_histograms_report_all.
  ASSERT_EQ(test_writer_->num_observations_added(), 3);
  VerifyStoredIntegerObservation(
      0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "100", {4});
  VerifyStoredIntegerObservation(
      1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
      {4});
  VerifyStoredIntegerObservation(
      2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
      "100", {4});

  // An hour later at 9:10PM, no more events, next aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(60));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // Still only the original 3 observations created.
  ASSERT_EQ(test_writer_->num_observations_added(), 3);

  // 45 minutes later, 3 more events occur, with a new system version.
  mock_clock_->increment_by(std::chrono::minutes(45));
  now = mock_clock_->now();
  record->event()->set_day_index(day_index);
  CB_ASSERT_OK_AND_ASSIGN(
      hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
  record->event()->set_hour_id(hour_id);
  record->system_profile()->set_system_version("101");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(15));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 3 new observations created, this time with an incremental count of 3.
  ASSERT_EQ(test_writer_->num_observations_added(), 6);
  VerifyStoredIntegerObservation(
      3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "101", {2});
  VerifyStoredIntegerObservation(
      4, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "101",
      {2});
  VerifyStoredIntegerObservation(
      5, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
      "101", {2});

  // An hour later at 11:10PM, no more events, next aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(60));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // Still only the expected 6 observations created.
  ASSERT_EQ(test_writer_->num_observations_added(), 6);

  // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(60));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
  // reports, two for each of the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
  ASSERT_EQ(test_writer_->num_observations_added(), 16);
  VerifyStoredIntegerObservation(
      6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
      "101", {1});
  VerifyStoredIntegerObservation(
      7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
      "101", {1});
  // SELECT_FIRST reports use the first system version, others use the second.
  VerifyStoredIntegerObservation(
      8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
      day_index, "100", {1});

  // This report is REPORT_ALL and generates 2 observations
  VerifyStoredIntegerObservation(
      10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
      day_index, "101", {1});
  VerifyStoredIntegerObservation(
      11, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
      day_index, "100", {1});

  // This report is REPORT_ALL and generates 2 observations
  VerifyStoredIntegerObservation(
      12, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "101", {1});
  VerifyStoredIntegerObservation(
      13, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "100", {1});

  VerifyStoredIntegerObservation(
      14,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
      day_index, "101", {1});
  VerifyStoredIntegerObservation(
      15,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
      day_index, "101", {1});

  // 45 minutes later, 1 more event occurs, again with a new system version.
  day_index += 1;
  mock_clock_->increment_by(std::chrono::minutes(45));
  now = mock_clock_->now();
  record->event()->set_day_index(day_index);
  CB_ASSERT_OK_AND_ASSIGN(
      hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
  record->event()->set_hour_id(hour_id);
  record->system_profile()->set_system_version("102");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 15 minutes later (1 hour from the previous run) at 1:10AM, aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(15));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 3 new observations created, this time with an incremental count of 1.
  ASSERT_EQ(test_writer_->num_observations_added(), 19);
  VerifyStoredIntegerObservation(
      16, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "102", {1});
  VerifyStoredIntegerObservation(
      17, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "102",
      {1});
  VerifyStoredIntegerObservation(
      18, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
      "102", {1});

  // The rest of the day, no more events occur so 22 hourly aggregation runs create no new
  // observations.
  for (int i = 1; i <= 22; i++) {
    mock_clock_->increment_by(std::chrono::minutes(60));
    aggregation->GenerateAggregatedObservations(mock_clock_->now());
    ASSERT_EQ(test_writer_->num_observations_added(), 19);
  }

  // At 12:10AM (just after midnight UTC), daily event aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(60));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 10 new observations are created, one for each of the non-REPORT_ALL UNIQUE_DEVICE_COUNTS
  // reports, 4 for each of the 7-day REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
  ASSERT_EQ(test_writer_->num_observations_added(), 29);
  VerifyStoredIntegerObservation(
      19, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
      "102", {1});
  VerifyStoredIntegerObservation(
      20, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
      "102", {1});
  // SELECT_FIRST 1-day report uses the new system version.
  VerifyStoredIntegerObservation(
      21, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
      day_index, "102", {1});
  // SELECT_FIRST 7-day report continues to use the original system version.
  VerifyStoredIntegerObservation(
      22, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      23, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
      day_index, "102", {1});

  // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
  // is deterministic based on the hash value)
  VerifyStoredIntegerObservation(
      24, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "101", {1});
  VerifyStoredIntegerObservation(
      25, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      26, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "102", {1});

  VerifyStoredIntegerObservation(
      27,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
      day_index, "102", {1});
  VerifyStoredIntegerObservation(
      28,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
      day_index, "102", {1});

  // The next day, no events occur so all 23 hourly aggregation runs create no new observations.
  for (int i = 1; i <= 23; i++) {
    mock_clock_->increment_by(std::chrono::minutes(60));
    aggregation->GenerateAggregatedObservations(mock_clock_->now());
    ASSERT_EQ(test_writer_->num_observations_added(), 29);
  }

  // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
  mock_clock_->increment_by(std::chrono::minutes(60));
  aggregation->GenerateAggregatedObservations(mock_clock_->now());

  // 6 new observations are created for the next day, only for the 7-day window
  // UNIQUE_DEVICE_COUNTS reports, (3 for the REPORT_ALL 7-day report).
  ASSERT_EQ(test_writer_->num_observations_added(), 35);
  day_index += 1;
  VerifyStoredIntegerObservation(
      29, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
      "102", {1});
  // SELECT_FIRST 7-day report continues to use the original system version.
  VerifyStoredIntegerObservation(
      30, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
      day_index, "100", {1});

  // 7 day REPORT_ALL metric reports an observation for each previously seen system version. (Order
  // is deterministic based on the hash value)
  VerifyStoredIntegerObservation(
      31, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "101", {1});
  VerifyStoredIntegerObservation(
      32, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      33, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "102", {1});

  VerifyStoredIntegerObservation(
      34,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
      day_index, "102", {1});
}

TEST_F(LocalAggregationTest, GenerateObservationsWriterFailures) {
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
  lib::MetricIdentifier metric_id =
      project_context->Identifier().ForMetric(kOccurrenceMetricMetricId);
  const MetricDefinition* metric = project_context->GetMetric(kOccurrenceMetricMetricId);
  // This test assumes that this metric has no reports with added privacy.
  for (const ReportDefinition& report : metric->reports()) {
    ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
  }

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();

  // Before any events occur, aggregation run happens, no observations are generated.
  Status status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_EQ(status.error_code(), StatusCode::OK);
  ASSERT_EQ(test_writer_->num_observations_added(), 0);

  // 10 minutes later, 4 events occur.
  mock_clock_->increment_by(std::chrono::minutes(10));
  std::unique_ptr<logger::EventRecord> record =
      logger::EventRecord::MakeEventRecord(project_context, kOccurrenceMetricMetricId).ValueOrDie();
  record->event()->mutable_occurrence_event()->set_count(1);
  std::chrono::system_clock::time_point now = mock_clock_->now();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
                                                  now, *civil_time_converter_, *record->metric()));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
                                                now, *civil_time_converter_, *record->metric()));
  record->event()->set_day_index(day_index);
  record->event()->set_hour_id(hour_id);
  record->system_profile()->set_system_version("100");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
  // But errors occur in the writing to the ObservationStore.
  test_writer_->SetFailCalls(true);
  mock_clock_->increment_by(std::chrono::minutes(50));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_NE(status.error_code(), StatusCode::OK);
  ASSERT_EQ(test_writer_->num_observations_added(), 0);

  // An hour later at 9:10PM, no more events, next aggregation run happens.
  // The previously aggregated events should be retried. This time the writing succeeds.
  test_writer_->SetFailCalls(false);
  mock_clock_->increment_by(std::chrono::minutes(60));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_EQ(status.error_code(), StatusCode::OK);

  // 3 observations created, for fleetwide_occurrence_counts_report, hourly_device_histograms, and
  // hourly_device_histograms_report_all.
  ASSERT_EQ(test_writer_->num_observations_added(), 3);
  VerifyStoredIntegerObservation(
      0, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "100", {4});
  VerifyStoredIntegerObservation(
      1, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
      {4});
  VerifyStoredIntegerObservation(
      2, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportAllReportId), day_index,
      "100", {4});

  // 45 minutes later, 2 more events occur.
  mock_clock_->increment_by(std::chrono::minutes(45));
  now = mock_clock_->now();
  record->event()->set_day_index(day_index);
  CB_ASSERT_OK_AND_ASSIGN(
      hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
  record->event()->set_hour_id(hour_id);
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
  // But errors occur again in the writing to the ObservationStore.
  test_writer_->SetFailCalls(true);
  mock_clock_->increment_by(std::chrono::minutes(15));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_NE(status.error_code(), StatusCode::OK);
  ASSERT_EQ(test_writer_->num_observations_added(), 3);

  // 30 minutes later, 1 more event occurs.
  mock_clock_->increment_by(std::chrono::minutes(30));
  now = mock_clock_->now();
  record->event()->set_day_index(day_index);
  CB_ASSERT_OK_AND_ASSIGN(
      hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
  record->event()->set_hour_id(hour_id);
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 30 minutes later at 11:10PM, next aggregation run happens, but still fails.
  mock_clock_->increment_by(std::chrono::minutes(30));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_NE(status.error_code(), StatusCode::OK);
  ASSERT_EQ(test_writer_->num_observations_added(), 3);

  // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens and fails.
  mock_clock_->increment_by(std::chrono::minutes(60));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_NE(status.error_code(), StatusCode::OK);
  ASSERT_EQ(test_writer_->num_observations_added(), 3);

  // An hour later at 1:10AM (after midnight UTC), now aggregation succeeds, both daily and hourly.
  test_writer_->SetFailCalls(false);
  mock_clock_->increment_by(std::chrono::minutes(60));
  status = aggregation->GenerateAggregatedObservations(mock_clock_->now());
  ASSERT_EQ(status.error_code(), StatusCode::OK);

  // 15 total new observations created.
  // 6 new hourly observations created, 3 with an incremental count of 2, 3 with count of 1.
  // 9 new daily observations are created, one for each of the UNIQUE_DEVICE_COUNTS reports.
  ASSERT_EQ(test_writer_->num_observations_added(), 17);
  VerifyStoredIntegerObservation(
      3, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "100", {2});
  VerifyStoredIntegerObservation(
      4, metric_id.ForReport(kOccurrenceMetricFleetwideOccurrenceCountsReportReportId), day_index,
      "100", {1});
  VerifyStoredIntegerObservation(
      5, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport1DayReportId), day_index,
      "100", {1});
  VerifyStoredIntegerObservation(
      6, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId), day_index,
      "100", {1});
  VerifyStoredIntegerObservation(
      7, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      8, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectFirstReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      9, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport1DayReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      10, metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsReportAllReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      11,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport1DayReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      12,
      metric_id.ForReport(kOccurrenceMetricUniqueDeviceCountsSelectMostCommonReport7DaysReportId),
      day_index, "100", {1});
  VerifyStoredIntegerObservation(
      13, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
      {2});
  VerifyStoredIntegerObservation(
      14, metric_id.ForReport(kOccurrenceMetricHourlyDeviceHistogramsReportId), day_index, "100",
      {1});
}

TEST_F(LocalAggregationTest, GenerateObservationsExpedited) {
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();
  lib::MetricIdentifier metric_id =
      project_context->Identifier().ForMetric(kExpeditedOccurrenceMetricMetricId);
  const MetricDefinition* metric = project_context->GetMetric(kExpeditedOccurrenceMetricMetricId);
  // This test assumes that this metric has no reports with added privacy.
  for (const ReportDefinition& report : metric->reports()) {
    ASSERT_EQ(ReportDefinition::NO_ADDED_PRIVACY, report.privacy_level());
  }

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();

  // Start 9th January, 2014 at 10 minutes after 7PM, UTC
  std::tm tm;
  strptime("2014-01-09 19:10:00", "%Y-%m-%d %H:%M:%S", &tm);
  std::chrono::time_point<std::chrono::system_clock> system_time =
      std::chrono::system_clock::from_time_t(timegm(&tm));

  // Before any events occur, aggregation run happens, no observations are generated.
  aggregation->GenerateAggregatedObservations(system_time);
  ASSERT_EQ(test_writer_->num_observations_added(), 0);

  // 10 minutes later, 4 events occur with event code 45, with two different system_versions.
  system_time += std::chrono::minutes(10);
  std::unique_ptr<logger::EventRecord> record =
      logger::EventRecord::MakeEventRecord(project_context, kExpeditedOccurrenceMetricMetricId)
          .ValueOrDie();
  record->event()->mutable_occurrence_event()->set_count(1);
  record->event()->mutable_occurrence_event()->add_event_code(
      ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code45);
  std::chrono::system_clock::time_point now = mock_clock_->now();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index, cobalt::util::TimePointToDayIndex(
                                                  now, *civil_time_converter_, *record->metric()));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t hour_id, cobalt::util::TimePointToHourId(
                                                now, *civil_time_converter_, *record->metric()));
  record->event()->set_day_index(day_index);
  record->event()->set_hour_id(hour_id);
  record->system_profile()->set_system_version("99");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  record->system_profile()->set_system_version("100");
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 50 minutes later (1 hour from the previous run) at 8:10PM, aggregation run happens.
  system_time += std::chrono::minutes(50);
  aggregation->GenerateAggregatedObservations(system_time);

  // 8 observations created, one for each expedited non-REPORT_ALL UNIQUE_DEVICE_COUNTS reports, two
  // for the REPORT_ALL UNIQUE_DEVICE_COUNTS reports.
  ASSERT_EQ(test_writer_->num_observations_added(), 8);
  VerifyStoredIntegerObservation(
      0, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      1, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      2, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst1DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      3, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      4, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "99", {1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      5, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      6, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99", {1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      7, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});

  // An hour later at 9:10PM, no more events, next aggregation run happens.
  system_time += std::chrono::minutes(60);
  aggregation->GenerateAggregatedObservations(system_time);

  // Still only the original observations created, expedited observations are not re-sent.
  ASSERT_EQ(test_writer_->num_observations_added(), 8);

  // 45 minutes later, 2 more events occur, one with a different event code 46.
  system_time += std::chrono::minutes(45);
  record->event()->mutable_occurrence_event()->set_event_code(
      0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46);
  now = system_time;
  record->event()->set_day_index(day_index);
  CB_ASSERT_OK_AND_ASSIGN(
      hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
  record->event()->set_hour_id(hour_id);
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  record->event()->mutable_occurrence_event()->set_event_code(
      0, ExpeditedOccurrenceMetricMetricDimensionFirstDimension_Code46);
  ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());

  // 15 minutes later (1 hour from the previous run) at 10:10PM, aggregation run happens.
  system_time += std::chrono::minutes(15);
  aggregation->GenerateAggregatedObservations(system_time);

  // 4 new observation for the AT_LEAST_ONCE reports, with the new event code of 46.
  // Note that the expedited SELECT_FIRST report was already sent, so doesn't get new observations.
  // Also, no new observations for event code 45 are created, as they were already sent.
  ASSERT_EQ(test_writer_->num_observations_added(), 12);
  VerifyStoredIntegerObservation(
      8, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce1DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  VerifyStoredIntegerObservation(
      9, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  VerifyStoredIntegerObservation(
      10, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll1DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  VerifyStoredIntegerObservation(
      11, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});

  // An hour later at 11:10PM, no more events, next aggregation run happens.
  system_time += std::chrono::minutes(60);
  aggregation->GenerateAggregatedObservations(system_time);

  // Still only the expected original observation created.
  ASSERT_EQ(test_writer_->num_observations_added(), 12);

  // An hour later at 12:10AM (after midnight UTC), daily event aggregation run happens.
  system_time += std::chrono::minutes(60);
  aggregation->GenerateAggregatedObservations(system_time);

  // Note that the daily expedited reports were already sent, so they don't get new observations.
  // But 4 more expedited observations are created for the next day's 7day reports: an AT_LEAST_ONCE
  // report containing the codes 45 and 46, a SELECT_FIRST report for the code 45, and two
  // REPORT_ALL reports for the two system_versions reported earlier.
  ASSERT_EQ(test_writer_->num_observations_added(), 16);
  day_index += 1;
  VerifyStoredIntegerObservation(
      12, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
      {1, 1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
       {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  VerifyStoredIntegerObservation(
      13, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      14, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      15, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
      {1, 1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
       {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});

  // The next day, no events occur so all 23 hourly aggregation runs create no new observations.
  for (int i = 1; i <= 23; i++) {
    system_time += std::chrono::minutes(60);
    aggregation->GenerateAggregatedObservations(system_time);
    ASSERT_EQ(test_writer_->num_observations_added(), 16);
  }

  // Finally, at 12:10AM (just after midnight UTC), daily event aggregation run happens.
  system_time += std::chrono::minutes(60);
  aggregation->GenerateAggregatedObservations(system_time);

  // 4 new observations are created, again only for the 7-day window UNIQUE_DEVICE_COUNTS reports.
  ASSERT_EQ(test_writer_->num_observations_added(), 20);
  day_index += 1;
  VerifyStoredIntegerObservation(
      16, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId), day_index, "100",
      {1, 1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
       {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  VerifyStoredIntegerObservation(
      17, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId), day_index, "100",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      18, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "99",
      {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
  VerifyStoredIntegerObservation(
      19, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index, "100",
      {1, 1},
      {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
       {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});

  // For the next 4 days (3 days of data have been sent above) no more events occur, and the 7-day
  // reports continue to be sent at the start of the day.
  for (int j = 0; j < 4; j++) {
    // The 23 hourly aggregation runs.
    for (int i = 1; i <= 23; i++) {
      system_time += std::chrono::minutes(60);
      aggregation->GenerateAggregatedObservations(system_time);
      ASSERT_EQ(test_writer_->num_observations_added(), 20 + 4 * j);
    }

    // The 12:10AM (just after midnight UTC) daily event aggregation run.
    system_time += std::chrono::minutes(60);
    aggregation->GenerateAggregatedObservations(system_time);

    // 3 new observations are created for the 7-day window UNIQUE_DEVICE_COUNTS reports.
    ASSERT_EQ(test_writer_->num_observations_added(), 24 + 4 * j);
    day_index += 1;
    VerifyStoredIntegerObservation(
        20 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricAtLeastOnce7DayReportId),
        day_index, "100", {1, 1},
        {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
         {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
    VerifyStoredIntegerObservation(
        21 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricSelectFirst7DayReportId),
        day_index, "100", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
    VerifyStoredIntegerObservation(
        22 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
        "99", {1}, {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45}});
    VerifyStoredIntegerObservation(
        23 + 4 * j, metric_id.ForReport(kExpeditedOccurrenceMetricReportAll7DayReportId), day_index,
        "100", {1, 1},
        {{ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code45},
         {ExpeditedOccurrenceMetricMetricDimensionFirstDimension::Code46}});
  }

  // Finally, on the 8th day, and all subsequent days, no observations are created at all.
  for (int i = 1; i <= 72; i++) {  // run 72 more hours to be sure
    system_time += std::chrono::minutes(60);
    aggregation->GenerateAggregatedObservations(system_time);
    ASSERT_EQ(test_writer_->num_observations_added(), 36);
  }
}

TEST_F(LocalAggregationTest, StorageQuotasWork) {
  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({
      .per_project_reserved_bytes = 60,
      .total_capacity_bytes = 120,
  });

  auto record =
      logger::EventRecord::MakeEventRecord(GetProjectContext(), kIntegerHistogramMetricMetricId)
          .ValueOrDie();
  ::cobalt::HistogramBucket* bucket =
      record->event()->mutable_integer_histogram_event()->add_buckets();
  bucket->set_count(100);
  record->event()->set_day_index(1);
  record->event()->set_hour_id(1);

  // Add entries to the histogram, until its size exceeds the per_project_reserved_bytes.
  int num_to_slush = 0;
  while (aggregation->SlushUsed() == 0) {
    bucket->set_index(num_to_slush);
    ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
    num_to_slush++;
  }

  // Must have stored at least a couple events successfully before exceeding
  // per_project_reserved_bytes.
  ASSERT_GT(num_to_slush, 2);

  // We should be able to add another num_to_slush events successfully.
  for (int i = 0; i < num_to_slush; i++) {
    bucket->set_index(i + num_to_slush);
    ASSERT_TRUE(aggregation->AddEvent(*record, mock_clock_->now()).ok());
  }

  bool failure_occurred = false;
  Status last_status;
  // Add another 8 events, it should fail on one of these.
  for (int i = 0; i < 8; i++) {
    bucket->set_index(num_to_slush * 2 + i);
    last_status = aggregation->AddEvent(*record, mock_clock_->now());
    if (last_status.error_code() == StatusCode::RESOURCE_EXHAUSTED) {
      failure_occurred = true;
    }
  }

  ASSERT_TRUE(failure_occurred);
  ASSERT_THAT(last_status.error_details(), HasSubstr("per_project_reserved_bytes=60"));
  ASSERT_THAT(last_status.error_details(), ContainsRegex("project_bytes=1.."));
  ASSERT_THAT(last_status.error_details(), HasSubstr("SlushSize=60"));
  ASSERT_THAT(last_status.error_details(), ContainsRegex("SlushUsed=6."));
}

TEST_F(LocalAggregationTest, CrashesIfTooManyBytes) {
  ASSERT_DEATH(std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation({
                   .per_project_reserved_bytes = 120,
                   .total_capacity_bytes = 100,
               }),
               "There is no space in slush");
}

TEST_F(LocalAggregationTest, MigrateToReportAll) {
  OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64);
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
  uint32_t day_index;
  uint32_t hour_id;

  {
    auto record = logger::EventRecord::MakeEventRecord(
                      project_context,
                      report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId)
                      .ValueOrDie();
    record->event()->mutable_occurrence_event()->set_count(1);
    std::chrono::system_clock::time_point now = mock_clock_->now();
    CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex(
                                           now, *civil_time_converter_, *record->metric()));
    CB_ASSERT_OK_AND_ASSIGN(
        hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
    record->event()->set_day_index(day_index);
    record->event()->set_hour_id(hour_id);

    record->system_profile()->set_system_version("100");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
    record->system_profile()->set_system_version("101");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
  }

  OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
  project_context = GetProjectContext();
  aggregation = MakeLocalAggregation();
  {
    auto record = logger::EventRecord::MakeEventRecord(
                      project_context,
                      report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
                      .ValueOrDie();
    record->event()->mutable_occurrence_event()->set_count(1);
    std::chrono::system_clock::time_point now = mock_clock_->now();
    CB_ASSERT_OK_AND_ASSIGN(day_index, cobalt::util::TimePointToDayIndex(
                                           now, *civil_time_converter_, *record->metric()));
    CB_ASSERT_OK_AND_ASSIGN(
        hour_id, cobalt::util::TimePointToHourId(now, *civil_time_converter_, *record->metric()));
    record->event()->set_day_index(day_index);
    record->event()->set_hour_id(hour_id);

    record->system_profile()->set_system_version("100");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
    record->system_profile()->set_system_version("101");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
  }

  aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
  ASSERT_EQ(test_writer_->num_observations_added(), 2);
  EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);

  lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
      report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId);

  // The first two metrics will be logged to the non partitioned data portion. When the
  // migration happens, those two will be moved to whatever value the SystemProfile currently
  // holds. Thus 2 ungrouped + 1 grouped = 3
  VerifyStoredIntegerObservation(
      0,
      metric_id.ForReport(report_all_test::with_report_all_set::
                              kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
      day_index, "101", {3});
  VerifyStoredIntegerObservation(
      1,
      metric_id.ForReport(report_all_test::with_report_all_set::
                              kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
      day_index, "100", {1});
}

TEST_F(LocalAggregationTest, MigrateFromReportAll) {
  OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();
  util::TimeInfo time_info;

  {
    auto record = logger::EventRecord::MakeEventRecord(
                      project_context,
                      report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
                      .ValueOrDie();
    record->event()->mutable_occurrence_event()->set_count(1);
    std::chrono::system_clock::time_point now = mock_clock_->now();
    CB_ASSERT_OK_AND_ASSIGN(
        time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
    record->event()->set_day_index(time_info.day_index);
    record->event()->set_hour_id(time_info.hour_id);

    record->system_profile()->set_system_version("100");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
    record->system_profile()->set_system_version("101");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
  }

  OverrideRegistry(report_all_test::with_select_last_set::kRegistryBase64);
  project_context = GetProjectContext();
  aggregation = MakeLocalAggregation();
  {
    auto record = logger::EventRecord::MakeEventRecord(
                      project_context,
                      report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId)
                      .ValueOrDie();
    record->event()->mutable_occurrence_event()->set_count(1);
    std::chrono::system_clock::time_point now = mock_clock_->now();
    CB_ASSERT_OK_AND_ASSIGN(
        time_info, util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
    record->event()->set_day_index(time_info.day_index);
    record->event()->set_hour_id(time_info.hour_id);

    record->system_profile()->set_system_version("100");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
    record->system_profile()->set_system_version("101");
    ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
  }

  aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
  ASSERT_EQ(test_writer_->num_observations_added(), 1);
  EXPECT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);

  lib::MetricIdentifier metric_id = project_context->Identifier().ForMetric(
      report_all_test::with_select_last_set::kOccurrenceMetricReportAllMetricId);

  // All 4 metrics should be grouped back together under system_version 101 (the most recently
  // seen).
  VerifyStoredIntegerObservation(
      0,
      metric_id.ForReport(report_all_test::with_select_last_set::
                              kOccurrenceMetricReportAllUniqueDeviceCounts1DayReportAllReportId),
      time_info.day_index, "101", {4});
}

TEST_F(LocalAggregationTest, ReportAllWorks) {
  const int32_t kNumPartitions = 15;
  OverrideRegistry(report_all_test::with_report_all_set::kRegistryBase64);
  std::shared_ptr<const logger::ProjectContext> project_context = GetProjectContext();

  std::unique_ptr<LocalAggregation> aggregation = MakeLocalAggregation();

  auto record =
      logger::EventRecord::MakeEventRecord(
          project_context, report_all_test::with_report_all_set::kOccurrenceMetricReportAllMetricId)
          .ValueOrDie();
  record->event()->mutable_occurrence_event()->set_count(1);
  std::chrono::system_clock::time_point now = mock_clock_->now();
  CB_ASSERT_OK_AND_ASSIGN(
      util::TimeInfo time_info,
      util::TimeInfo::FromTimePoint(now, *civil_time_converter_, *record->metric()));
  record->event()->set_day_index(time_info.day_index);
  record->event()->set_hour_id(time_info.hour_id);
  for (int32_t i = 1; i <= kNumPartitions; i++) {
    for (int32_t j = i; j <= kNumPartitions; j++) {
      record->system_profile()->set_system_version(absl::StrCat("Version ", j));
      ASSERT_EQ(aggregation->AddEvent(*record, mock_clock_->now()).error_code(), StatusCode::OK);
    }
  }

  aggregation->GenerateAggregatedObservations(mock_clock_->now() + util::kOneDay);
  ASSERT_EQ(test_writer_->num_observations_added(), kNumPartitions);
  ASSERT_EQ(test_writer_->metadata_received[0]->customer_id(), kCustomerId);
  size_t i = 0;
  for (const std::unique_ptr<observation_store::StoredObservation>& obs :
       test_writer_->messages_received) {
    ASSERT_TRUE(obs->has_unencrypted());

    Observation observation = obs->unencrypted();
    ASSERT_TRUE(observation.has_integer());
    ASSERT_GT(observation.integer().values_size(), 0);

    std::string system_version =
        test_writer_->metadata_received[i]->system_profile().system_version();
    int system_version_number = std::stoi(system_version.substr(system_version.find(' ')));
    ASSERT_EQ(observation.integer().values(0).value(), system_version_number);
    ++i;
  }
}

}  // namespace cobalt::local_aggregation
