// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/local_aggregation/observation_generator.h"

#include <memory>

#include <gtest/gtest.h>

#include "absl/strings/escaping.h"
#include "src/algorithms/random/random.h"
#include "src/algorithms/random/test_secure_random.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation/backfill_manager.h"
#include "src/local_aggregation/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/local_aggregation/testing/test_privacy_registry.cb.h"
#include "src/local_aggregation/testing/test_registry.cb.h"
#include "src/logger/fake_logger.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/observation_writer.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/logger/types.h"
#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/testing/testing.h"
#include "src/system_data/client_secret.h"
#include "src/system_data/fake_system_data.h"

namespace cobalt::local_aggregation {

using TimeInfo = util::TimeInfo;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;

// The expected size in bytes of an Observation's |random_id| field.
constexpr size_t kRandomIdSize = 8u;

const lib::ProjectIdentifier kProjectIdentifier =
    lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId);

namespace {

std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) {
  std::string bytes;
  if (!absl::Base64Unescape(registryBase64, &bytes)) {
    LOG(ERROR) << "Unable to decode Base64 String";
    return nullptr;
  }

  auto registry = std::make_unique<CobaltRegistry>();
  if (!registry->ParseFromString(bytes)) {
    LOG(ERROR) << "Unable to parse registry from bytes";
    return nullptr;
  }

  return registry;
}

class FakePrivacyEncoder : public logger::PrivacyEncoder {
 public:
  explicit FakePrivacyEncoder(bool return_private_observations, int num_private_observations = 0)
      : PrivacyEncoder(std::make_unique<TestSecureRandomNumberGenerator>(0),
                       std::make_unique<RandomNumberGenerator>(0)),
        return_private_observations_(return_private_observations),
        num_private_observations_(num_private_observations) {}

  lib::statusor::StatusOr<std::vector<std::unique_ptr<Observation>>> MaybeMakePrivateObservations(
      std::unique_ptr<Observation> observation, const MetricDefinition& /*metric_def*/,
      const ReportDefinition& /*report_def*/) override {
    std::vector<std::unique_ptr<Observation>> observations;

    if (!return_private_observations_) {
      observations.push_back(std::move(observation));
      return observations;
    }

    for (int i = 0; i < num_private_observations_; i++) {
      auto observation = std::make_unique<Observation>();
      observations.push_back(std::move(observation));
    }

    return observations;
  }

 private:
  bool return_private_observations_;
  int num_private_observations_;
};

}  // namespace

class ObservationGeneratorTest : public util::testing::TestWithFiles {
 public:
  void SetUp() override {
    MakeTestFolder();

    system_clock_ = std::make_unique<util::IncrementingSystemClock>();
    system_clock_->increment_by(std::chrono::hours(util::kNumHoursPerDay * 100));
    starting_time_ = system_clock_->now();
    test_clock_ = std::make_unique<util::FakeValidatedClock>(system_clock_.get());
    test_clock_->SetAccurate(true);
    metadata_builder_ = std::make_unique<MetadataBuilder>(system_data_, test_clock_.get(),
                                                          system_data_cache_path(), fs());
    // The current SystemProfile has a system version of 101 and 1 experiment ID.
    system_data_.SetVersion("101");
    metadata_builder_->SnapshotSystemData();
    SetRegistry();
  }

  void SetRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(kCobaltRegistryBase64)) {
    project_context_factory_ = std::make_unique<logger::ProjectContextFactory>(
        util::WrapNotNullUniquePtr(std::move(registry)).value());
    aggregate_storage_ = LocalAggregateStorage::New(
        LocalAggregateStorage::StorageStrategy::Immediate, local_aggregation_store_path(), fs(),
        *project_context_factory_, *metadata_builder_, 0);
    project_context_ = GetProjectContext();
  }

  MetricAggregateRef GetMetricAggregate(uint32_t metric_id) {
    lib::statusor::StatusOr<MetricAggregateRef> metric_aggregate_or =
        aggregate_storage_->GetMetricAggregate(kProjectIdentifier.ForMetric(metric_id));
    EXPECT_TRUE(metric_aggregate_or.ok());

    return std::move(metric_aggregate_or).value();
  }

  std::shared_ptr<const logger::ProjectContext> GetProjectContext() {
    return project_context_factory_->NewProjectContext(
        lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId));
  }

  void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
                                     std::unique_ptr<FakePrivacyEncoder> privacy_encoder,
                                     util::CivilTimeConverterInterface& civil_time_converter,
                                     logger::InternalMetrics* internal_metrics = nullptr,
                                     bool generate_observations_with_current_system_profile = false,
                                     bool test_dont_backfill_empty_reports = false) {
    observation_generator_ = std::make_unique<ObservationGenerator>(
        *aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
        std::move(privacy_encoder), civil_time_converter,
        generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
    observation_generator_->ResetInternalMetrics(internal_metrics);
  }

  void TearDown() override {
    if (observation_generator_) {
      observation_generator_->ShutDown();
    }
  }

  Status GenerateObservationsOnce(std::chrono::system_clock::time_point system_time) {
    return observation_generator_->GenerateObservationsOnce(system_time);
  }

  friend class BasicLogger;

 protected:
  system_data::FakeSystemData system_data_;
  std::unique_ptr<util::IncrementingSystemClock> system_clock_;
  std::unique_ptr<util::FakeValidatedClock> test_clock_;
  std::chrono::system_clock::time_point starting_time_;

  std::unique_ptr<MetadataBuilder> metadata_builder_;
  std::unique_ptr<logger::ProjectContextFactory> project_context_factory_;
  // The `ProjectContext` for the test customer and project.
  std::shared_ptr<const logger::ProjectContext> project_context_;
  std::unique_ptr<LocalAggregateStorage> aggregate_storage_;
  std::unique_ptr<ObservationGenerator> observation_generator_;
};

class TestObservationStoreWriter : public observation_store::ObservationStoreWriterInterface {
 public:
  explicit TestObservationStoreWriter(
      std::function<void(std::unique_ptr<observation_store::StoredObservation>,
                         std::unique_ptr<ObservationMetadata>)>
          watcher)
      : watcher_(std::move(watcher)) {}

  Status StoreObservation(std::unique_ptr<observation_store::StoredObservation> observation,
                          std::unique_ptr<ObservationMetadata> metadata) override {
    watcher_(std::move(observation), std::move(metadata));
    return Status::OkStatus();
  }

 private:
  std::function<void(std::unique_ptr<observation_store::StoredObservation>,
                     std::unique_ptr<ObservationMetadata>)>
      watcher_;
};

struct ObservationGeneratorTestParams {
  uint32_t metric_id;
  uint32_t report_id;
  std::function<std::unique_ptr<util::CivilTimeConverterInterface>()> converter_fn;
};

class ObservationGeneratorTestWithParams
    : public ObservationGeneratorTest,
      public ::testing::WithParamInterface<ObservationGeneratorTestParams> {};

using HourlyObservationGeneratorTest = ObservationGeneratorTestWithParams;
using DailyObservationGeneratorTest = ObservationGeneratorTestWithParams;
using SystemProfileObservationGeneratorTest = ObservationGeneratorTestWithParams;

TEST_P(HourlyObservationGeneratorTest, GeneratesHourlyObservations) {
  // The delta between the hour IDs for consecutive hours, assuming no change in time zone
  // (including DST status).
  const int kHourIdsPerHour = 2;
  const uint32_t kMaxHourIdOffset = 101;

  ObservationGeneratorTestParams params = GetParam();
  std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
  const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
                          util::TimePointToHourId(starting_time_, *converter, metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];

    for (uint32_t i = 0; i <= kMaxHourIdOffset; i += kHourIdsPerHour) {
      uint32_t count = (i + 1) * 100;
      SystemProfileAggregate* agg =
          (*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id + i]
              .add_system_profile_aggregates();
      agg->set_system_profile_hash(system_profile_hash);
      agg->add_by_event_code()->mutable_data()->set_count(count);
    }

    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  uint32_t metric_id = params.metric_id;
  uint32_t report_id = params.report_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  for (uint32_t i = 0; i <= kMaxHourIdOffset; i += 2 * kHourIdsPerHour) {
    // Generate observations for the hour ending at `end_time`.
    std::chrono::system_clock::time_point end_time =
        starting_time_ + std::chrono::hours(i / kHourIdsPerHour);
    GenerateObservationsOnce(end_time + std::chrono::hours(1));

    CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index,
                            util::TimePointToDayIndex(end_time, *converter, metric));
    EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
    EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
    EXPECT_EQ(last_metadata->day_index(), day_index)
        << "Incorrect day index for hour ID offset i: " << i;
    EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
    EXPECT_EQ(last_metadata->system_profile().board_name(), "");
    ASSERT_TRUE(last_observation);
    EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
    ASSERT_TRUE(last_observation->has_integer());
    ASSERT_GT(last_observation->integer().values_size(), 0);
    EXPECT_EQ(last_observation->integer().values(0).value(), (i + 1) * 100)
        << "Incorrect count for hour ID offset i: " << i;
  }
}

INSTANTIATE_TEST_SUITE_P(
    GenerateHourlyObservations, HourlyObservationGeneratorTest,
    ::testing::Values(
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceMetricFleetwideOccurrenceCountsReportReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceMetricFleetwideOccurrenceCountsReportReportId,
            .converter_fn =
                []() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId,
            .converter_fn = []() {
              return std::make_unique<util::FakeCivilTimeConverter>(2, false);
            }}));

TEST_F(ObservationGeneratorTest, GeneratesHourlyObservationsWithDstStart) {
  // Use a civil time converter that simulates a transition from PST (UTC-8h) to PDT (UTC-7h), with
  // the transition occurring 1 hour after the start of the test.
  std::unique_ptr<util::FakeCivilTimeConverter> converter =
      std::make_unique<util::FakeCivilTimeConverter>(
          /*start_utc_offset=*/-8, /*start_isdst=*/false,
          /*end_utc_offset=*/-7, /*end_isdst=*/true,
          /*threshold=*/starting_time_ + std::chrono::hours(1));
  const MetricDefinition& metric = *project_context_->GetMetric(kOccurrenceOtherTzMetricMetricId);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
                          util::TimePointToHourId(starting_time_, *converter, metric));

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;

  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceOtherTzMetricMetricId &&
            metadata->report_id() ==
                kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())
            [kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->set_count(100);

    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  // Increment the clock by 1 hour, reaching the time zone transition threshold. The hour ID
  // increases by 3.
  system_clock_->increment_by(std::chrono::hours(1));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t next_hour_id,
                          util::TimePointToHourId(system_clock_->now(), *converter, metric));
  ASSERT_EQ(next_hour_id, start_hour_id + 3);
  GenerateObservationsOnce(system_clock_->now());

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(start_hour_id));
  EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_GT(last_observation->integer().values_size(), 0);
  EXPECT_EQ(last_observation->integer().values(0).value(), 100);

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())
            [kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_hourly()->mutable_by_hour_id())[next_hour_id]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->set_count(200);

    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  // Increment the clock by 1 hour. The hour ID increases by 2 since `next_hour_id`, for a total of
  // `start_hour_id + 5`.
  system_clock_->increment_by(std::chrono::hours(1));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t final_hour_id,
                          util::TimePointToHourId(system_clock_->now(), *converter, metric));
  ASSERT_EQ(final_hour_id, start_hour_id + 5);
  GenerateObservationsOnce(system_clock_->now());

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(next_hour_id));
  EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_GT(last_observation->integer().values_size(), 0);
  EXPECT_EQ(last_observation->integer().values(0).value(), 200);
}

TEST_F(ObservationGeneratorTest, GeneratesHourlyObservationsWithDstEnd) {
  // Use a civil time converter that simulates a transition from PDT (UTC-7h) to PST (UTC-8h), with
  // the transition occurring 1 hour after the start of the test.
  std::unique_ptr<util::FakeCivilTimeConverter> converter =
      std::make_unique<util::FakeCivilTimeConverter>(
          /*start_utc_offset=*/-7, /*start_isdst=*/true,
          /*end_utc_offset=*/-8, /*end_isdst=*/false,
          /*threshold=*/starting_time_ + std::chrono::hours(1));
  const MetricDefinition& metric = *project_context_->GetMetric(kOccurrenceOtherTzMetricMetricId);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
                          util::TimePointToHourId(starting_time_, *converter, metric));

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;

  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceOtherTzMetricMetricId &&
            metadata->report_id() ==
                kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())
            [kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->set_count(100);

    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  // Increment the clock by 1 hour, reaching the time zone transition threshold. The hour ID
  // increases by 1.
  system_clock_->increment_by(std::chrono::hours(1));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t next_hour_id,
                          util::TimePointToHourId(system_clock_->now(), *converter, metric));
  ASSERT_EQ(next_hour_id, start_hour_id + 1);
  GenerateObservationsOnce(system_clock_->now());

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(start_hour_id));
  EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_GT(last_observation->integer().values_size(), 0);
  EXPECT_EQ(last_observation->integer().values(0).value(), 100);

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())
            [kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_hourly()->mutable_by_hour_id())[next_hour_id]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->set_count(200);

    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  // Increment the clock by 1 hour. The hour ID increases by 2 since `next_hour_id`, for a total of
  // `start_hour_id + 3`.
  system_clock_->increment_by(std::chrono::hours(1));
  CB_ASSERT_OK_AND_ASSIGN(uint32_t final_hour_id,
                          util::TimePointToHourId(system_clock_->now(), *converter, metric));
  ASSERT_EQ(final_hour_id, start_hour_id + 3);
  GenerateObservationsOnce(system_clock_->now());

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(next_hour_id));
  EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_GT(last_observation->integer().values_size(), 0);
  EXPECT_EQ(last_observation->integer().values(0).value(), 200);
}

TEST_P(DailyObservationGeneratorTest, GeneratesDailyObservations) {
  const uint32_t kMaxDayIndexOffset = 5;

  ObservationGeneratorTestParams params = GetParam();
  std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
  const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];

    for (uint32_t i = 0; i <= kMaxDayIndexOffset; i += 1) {
      SystemProfileAggregate* agg =
          (*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index + i]
              .add_system_profile_aggregates();
      agg->set_system_profile_hash(system_profile_hash);
      agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
    }
    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  uint32_t metric_id = params.metric_id;
  uint32_t report_id = params.report_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  for (uint32_t i = 0; i <= kMaxDayIndexOffset; i += 1) {
    // Generate observations for the day ending at `end_time`.
    std::chrono::system_clock::time_point end_time =
        starting_time_ + std::chrono::hours(i * util::kNumHoursPerDay);
    GenerateObservationsOnce(end_time + std::chrono::hours(util::kNumHoursPerDay));

    CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index,
                            util::TimePointToDayIndex(end_time, *converter, metric));
    EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
    ASSERT_TRUE(last_metadata);
    EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
    EXPECT_EQ(last_metadata->day_index(), day_index)
        << "Incorrect day index for day index offset i: " << i;
    EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
    EXPECT_EQ(last_metadata->system_profile().board_name(), "");
    ASSERT_TRUE(last_observation);
    EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
    ASSERT_TRUE(last_observation->has_integer());
    ASSERT_EQ(last_observation->integer().values_size(), 1);
    EXPECT_EQ(last_observation->integer().values(0).value(), 1)
        << "Incorrect count for day index offset i: " << i;
  }
}

INSTANTIATE_TEST_SUITE_P(
    GenerateDailyObservations, DailyObservationGeneratorTest,
    ::testing::Values(
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceMetricUniqueDeviceCountsReport1DayReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
            .converter_fn =
                []() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
            .converter_fn = []() {
              return std::make_unique<util::FakeCivilTimeConverter>(2, false);
            }}));

TEST_F(ObservationGeneratorTest, GeneratesNoObservationsForNoAggregateData) {
  const uint32_t kNumDays = 5;
  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
                          TimeInfo::FromTimePoint(starting_time_, *converter, *metric));

  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
    report->mutable_daily()->set_last_day_index(starting_time_info.day_index);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceMetricMetricId &&
            metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  for (uint32_t i = 0; i <= kNumDays; ++i) {
    GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay * kNumDays));

    EXPECT_FALSE(last_metadata);
    EXPECT_FALSE(last_observation);
  }
}

TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsAsExpectedDespiteFailure) {
  const uint32_t kNumDays = 5;
  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
    for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
      SystemProfileAggregate* system_profile_agg =
          (*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
      system_profile_agg->set_system_profile_hash(system_profile_hash);
      system_profile_agg->add_by_event_code()
          ->mutable_data()
          ->mutable_at_least_once()
          ->set_at_least_once(true);
    }
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceMetricMetricId &&
            metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  // Set the MetricType to an invalid type.
  SetRegistry(testing::MutateProject(GetRegistry(kCobaltRegistryBase64), kCustomerId, kProjectId,
                                     [](ProjectConfig* project) {
                                       project->mutable_metrics(kOccurrenceMetricMetricIndex)
                                           ->set_metric_type(MetricDefinition::INTEGER_HISTOGRAM);
                                     }));

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  // This generation should fail because the aggregation procedure
  // couldn't be created.
  EXPECT_NE(StatusCode::OK,
            GenerateObservationsOnce(
                util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)))
                .error_code());

  // Reset to default registry.
  SetRegistry();
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  // Data should still be collected as expected.
  for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
    EXPECT_EQ(StatusCode::OK,
              GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(i + 1)))
                  .error_code());

    EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
    ASSERT_TRUE(last_metadata);
    EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
    EXPECT_EQ(last_metadata->day_index(), i);
    EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
    EXPECT_EQ(last_metadata->system_profile().board_name(), "");
    ASSERT_TRUE(last_observation);
    EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
    ASSERT_TRUE(last_observation->has_integer());
    ASSERT_EQ(last_observation->integer().values_size(), 1);
    EXPECT_EQ(last_observation->integer().values(0).value(), 1);
  }
}

TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsWithReportAllAsExpected) {
  SetRegistry(testing::MutateProject(
      GetRegistry(kCobaltRegistryBase64), kCustomerId, kProjectId, [](ProjectConfig* project) {
        project->mutable_metrics(kOccurrenceMetricMetricIndex)
            ->mutable_reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex)
            ->set_system_profile_selection(SystemProfileSelectionPolicy::REPORT_ALL);
      }));

  const uint32_t kNumDays = 5;
  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
    for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
      SystemProfileAggregate* system_profile_agg =
          (*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
      system_profile_agg->set_system_profile_hash(system_profile_hash);
      system_profile_agg->add_by_event_code()
          ->mutable_data()
          ->mutable_at_least_once()
          ->set_at_least_once(true);
    }
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceMetricMetricId &&
            metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
    GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(i + 1)));

    EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
    ASSERT_TRUE(last_metadata);
    EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
    EXPECT_EQ(last_metadata->day_index(), i);
    ASSERT_TRUE(last_observation);
    EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
    ASSERT_TRUE(last_observation->has_integer());
    ASSERT_EQ(last_observation->integer().values_size(), 1);
    EXPECT_EQ(last_observation->integer().values(0).value(), 1);
  }
}

TEST_F(ObservationGeneratorTest, GeneratesPrivateObservations) {
  uint32_t kMaxHourOffset = 101;
  int kNumPrivateObs = 2;

  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
                          TimeInfo::FromTimePoint(starting_time_, *converter, *metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    auto* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
    report->mutable_hourly()->set_last_hour_id(starting_time_info.hour_id - 2);
    for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
         i += 2) {
      SystemProfileAggregate* system_profile_agg =
          (*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
      system_profile_agg->set_system_profile_hash(system_profile_hash);
      system_profile_agg->add_by_event_code()->mutable_data()->set_count(
          (i - starting_time_info.hour_id + 1) * 100);
    }
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::vector<Observation> observations;
  observations.reserve(kNumPrivateObs);
  std::vector<ObservationMetadata> metadatas;
  metadatas.reserve(kNumPrivateObs);
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceMetricMetricId &&
            metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(
      observation_writer, std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs), *converter);
  GenerateObservationsOnce(starting_time_ + util::kOneHour);

  EXPECT_EQ(observations.size(), kNumPrivateObs);
  EXPECT_EQ(contribution_count, 1);
  for (const Observation& obs : observations) {
    EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
  }
  EXPECT_EQ(metadatas.size(), kNumPrivateObs);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info.hour_id));
    // Stored data's system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
}

TEST_F(ObservationGeneratorTest, GeneratesPrivateObservationsForNoObservation) {
  int kNumPrivateObs = 2;
  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
                          TimeInfo::FromTimePoint(starting_time_, *converter, *metric));

  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    auto* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
    report->mutable_hourly()->set_last_hour_id(starting_time_info.hour_id - 2);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::vector<Observation> observations;
  observations.reserve(kNumPrivateObs);
  std::vector<ObservationMetadata> metadatas;
  metadatas.reserve(kNumPrivateObs);
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == kOccurrenceMetricMetricId &&
            metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(
      observation_writer, std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs), *converter);
  GenerateObservationsOnce(starting_time_ + util::kOneHour);

  EXPECT_EQ(observations.size(), kNumPrivateObs);
  EXPECT_EQ(contribution_count, 1);
  for (const Observation& obs : observations) {
    EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
  }
  EXPECT_EQ(metadatas.size(), kNumPrivateObs);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info.hour_id));
    // Current system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(),
              system_data_.system_profile().system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
}

TEST_P(SystemProfileObservationGeneratorTest, GenerateObservationsFallbackToCurrentSystemProfile) {
  ObservationGeneratorTestParams params = GetParam();
  std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
  const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::ANDROID);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
    // Use the report that has experiment IDs in the system profile.
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
    SystemProfileAggregate* agg =
        (*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
    // Don't store the system_profile.
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  uint32_t metric_id = params.metric_id;
  uint32_t report_id = params.report_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter);

  GenerateObservationsOnce(starting_time_ + util::kOneDay);

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), start_day_index);
  // System profile fields were set from the current system profile.
  EXPECT_NE(last_metadata->system_profile().os(), system_profile.os());
  EXPECT_NE(last_metadata->system_profile().system_version(), system_profile.system_version());
  EXPECT_EQ(last_metadata->system_profile().os(), system_data_.system_profile().os());
  EXPECT_EQ(last_metadata->system_profile().system_version(),
            system_data_.system_profile().system_version());
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_EQ(last_observation->integer().values_size(), 1);
  EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}

// Use a report that has experiment IDs in the system profile.
TEST_P(SystemProfileObservationGeneratorTest, GenerateObservationsWithCurrentSystemProfile) {
  ObservationGeneratorTestParams params = GetParam();
  std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
  const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, metric));

  SystemProfile system_profile;
  // No system profile fields except OS.
  system_profile.set_os(SystemProfile::ANDROID);
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  uint32_t metric_id = params.metric_id;
  uint32_t report_id = params.report_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter, /*internal_metrics=*/nullptr,
                                /*generate_observations_with_current_system_profile=*/true);
  // Generate observations for the day with day index `start_day_index`.
  GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), start_day_index);
  // Unset system profile fields were set from the current system
  // profile.
  EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
  EXPECT_EQ(last_metadata->system_profile().system_version(),
            system_data_.system_profile().system_version());
  ASSERT_EQ(last_metadata->system_profile().experiment_ids_size(), 1);
  EXPECT_EQ(last_metadata->system_profile().experiment_ids(0),
            system_data_.system_profile().experiment_ids(0));
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_EQ(last_observation->integer().values_size(), 1);
  EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}

// Use a report that has experiment IDs in the system profile.
TEST_P(SystemProfileObservationGeneratorTest,
       GenerateObservationsWithCurrentSystemProfileDoesNotOverwrite) {
  ObservationGeneratorTestParams params = GetParam();
  std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
  const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::ANDROID);
  system_profile.set_system_version("100");
  system_profile.add_experiment_ids(2222);
  system_profile.add_experiment_ids(3333);
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
    ReportAggregate* report_agg =
        &(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];

    SystemProfileAggregate* agg =
        (*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
            .add_system_profile_aggregates();
    agg->set_system_profile_hash(system_profile_hash);
    agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
    metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(metric_agg.Save().ok());
  }

  std::unique_ptr<ObservationMetadata> last_metadata;
  std::unique_ptr<Observation> last_observation;
  std::string last_contribution_id;
  uint32_t metric_id = params.metric_id;
  uint32_t report_id = params.report_id;
  TestObservationStoreWriter test_writer(
      [&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
          last_metadata = std::move(metadata);
          if (observation->has_unencrypted()) {
            last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
            last_contribution_id = observation->contribution_id();
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter, /*internal_metrics=*/nullptr,
                                /*generate_observations_with_current_system_profile=*/true);
  // Generate observations for the day with day index `start_day_index`.
  GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));

  EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
  ASSERT_TRUE(last_metadata);
  EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
  EXPECT_EQ(last_metadata->day_index(), start_day_index);
  // All system profile fields were set from the aggregate system
  // profile.
  EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
  EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
  ASSERT_EQ(last_metadata->system_profile().experiment_ids_size(), 2);
  EXPECT_EQ(last_metadata->system_profile().experiment_ids(0), system_profile.experiment_ids(0));
  EXPECT_EQ(last_metadata->system_profile().experiment_ids(1), system_profile.experiment_ids(1));
  EXPECT_EQ(last_metadata->system_profile().board_name(), "");
  ASSERT_TRUE(last_observation);
  EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
  ASSERT_TRUE(last_observation->has_integer());
  ASSERT_EQ(last_observation->integer().values_size(), 1);
  EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}

INSTANTIATE_TEST_SUITE_P(
    GenerateObservationWithSystemProfile, SystemProfileObservationGeneratorTest,
    ::testing::Values(
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
            .converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceMetricMetricId,
            .report_id = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
            .converter_fn =
                []() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
        ObservationGeneratorTestParams{
            .metric_id = kOccurrenceOtherTzMetricMetricId,
            .report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
            .converter_fn = []() {
              return std::make_unique<util::FakeCivilTimeConverter>(2, false);
            }}));

class BasicLogger : public logger::testing::FakeLogger {
 public:
  explicit BasicLogger(ObservationGeneratorTest* test) : test_(test) {}

  Status LogInteger(uint32_t metric_id, int64_t value,
                    const std::vector<uint32_t>& event_codes) override {
    // Lock the aggregate store to simulate actually performing log.
    test_->aggregate_storage_->GetMetricAggregate(
        lib::CustomerIdentifier(1).ForProject(1).ForMetric(1));
    return logger::testing::FakeLogger::LogInteger(metric_id, value, event_codes);
  }

 private:
  ObservationGeneratorTest* test_;
};

TEST_F(ObservationGeneratorTest, DoesNotDeadlock) {
  aggregate_storage_ = LocalAggregateStorage::New(LocalAggregateStorage::StorageStrategy::Delayed,
                                                  local_aggregation_store_path(), fs(),
                                                  *project_context_factory_, *metadata_builder_, 0);
  observation_store::FileObservationStore obs_store(10000, 10000, 10000, fs(), nullptr,
                                                    test_folder() + "/obs_store");
  BasicLogger logger(this);
  logger::InternalMetricsImpl internal_metrics(logger, nullptr);
  obs_store.ResetInternalMetrics(&internal_metrics);

  const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
                          TimeInfo::FromTimePoint(starting_time_, *converter, *metric));

  const uint32_t kMaxHourOffset = 101;
  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());

  {
    MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()
               ->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
    for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
         i += 2) {
      SystemProfileAggregate* system_profile_agg =
          (*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
      system_profile_agg->set_system_profile_hash(system_profile_hash);
      system_profile_agg->add_by_event_code()->mutable_data()->set_count(
          (i - starting_time_info.hour_id + 1) * 100);
    }
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  logger::ObservationWriter observation_writer(obs_store, nullptr);
  ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
                                *converter, &internal_metrics);

  for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
       i += 4) {
    GenerateObservationsOnce(util::FromUnixSeconds(util::HourIdToUnixSeconds(i + 1)));
  }
}

// Test class for tests that use the real privacy generator.
class PrivacyObservationGeneratorTest : public ObservationGeneratorTest {
 public:
  void SetUp() override {
    ObservationGeneratorTest::SetUp();
    SetRegistry(GetRegistry(privacy::kCobaltRegistryBase64));
  }

  void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
                                     util::CivilTimeConverterInterface& civil_time_converter,
                                     logger::InternalMetrics* internal_metrics = nullptr,
                                     bool generate_observations_with_current_system_profile = false,
                                     bool test_dont_backfill_empty_reports = false) {
    std::unique_ptr<logger::PrivacyEncoder> privacy_encoder =
        std::make_unique<logger::PrivacyEncoder>(
            std::make_unique<TestSecureRandomNumberGenerator>(0),
            std::make_unique<RandomNumberGenerator>(0));
    observation_generator_ = std::make_unique<ObservationGenerator>(
        *aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
        std::move(privacy_encoder), civil_time_converter,
        generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
    observation_generator_->ResetInternalMetrics(internal_metrics);
  }
};

TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservations) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
  {
    MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()->mutable_by_report_id())
            [privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId];
    report->mutable_daily()->set_last_day_index(start_day_index - 1);
    SystemProfileAggregate* system_profile_agg =
        (*report->mutable_daily()->mutable_by_day_index())[start_day_index]
            .add_system_profile_aggregates();
    system_profile_agg->set_system_profile_hash(system_profile_hash);
    system_profile_agg->add_by_event_code()
        ->mutable_data()
        ->mutable_at_least_once()
        ->set_at_least_once(true);
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::vector<Observation> observations;
  std::vector<ObservationMetadata> metadatas;
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
            metadata->report_id() ==
                privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, *converter);
  GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));

  EXPECT_EQ(observations.size(), 2);
  EXPECT_EQ(contribution_count, 1);
  for (const Observation& obs : observations) {
    EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
  }
  EXPECT_EQ(observations[0].observation_type_case(),
            Observation::ObservationTypeCase::kPrivateIndex);
  EXPECT_EQ(observations[1].observation_type_case(),
            Observation::ObservationTypeCase::kReportParticipation);
  EXPECT_EQ(metadatas.size(), 2);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    EXPECT_EQ(metadata.day_index(), start_day_index);
    // Stored data's system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
}

TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservationsForNoObservation) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  {
    MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()->mutable_by_report_id())
            [privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId];
    report->mutable_daily()->set_last_day_index(start_day_index - 1);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  std::vector<Observation> observations;
  std::vector<ObservationMetadata> metadatas;
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
            metadata->report_id() ==
                privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, *converter);
  GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));

  EXPECT_EQ(observations.size(), 1);
  EXPECT_EQ(contribution_count, 1);
  EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize);
  EXPECT_EQ(observations[0].observation_type_case(),
            Observation::ObservationTypeCase::kReportParticipation);
  EXPECT_EQ(metadatas.size(), 1);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    EXPECT_EQ(metadata.day_index(), start_day_index);
    // Current system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(),
              system_data_.system_profile().system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
}

TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservationsInitialBackfill) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  std::vector<Observation> observations;
  std::vector<ObservationMetadata> metadatas;
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
            metadata->report_id() ==
                privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, *converter);
  GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));

  // Initially 3 observations should be generated for the backstop backfill.
  EXPECT_EQ(observations.size(), 3);
  EXPECT_EQ(contribution_count, 3);
  for (const Observation& obs : observations) {
    EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
    EXPECT_EQ(obs.observation_type_case(), Observation::ObservationTypeCase::kReportParticipation);
  }
  EXPECT_EQ(metadatas.size(), 3);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    // Current system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(),
              system_data_.system_profile().system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
  EXPECT_EQ(metadatas[0].day_index(), start_day_index - 2);
  EXPECT_EQ(metadatas[1].day_index(), start_day_index - 1);
  EXPECT_EQ(metadatas[2].day_index(), start_day_index);
}

TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservations) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  SystemProfile system_profile;
  system_profile.set_os(SystemProfile::FUCHSIA);
  system_profile.set_system_version("100");
  uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
  {
    MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()->mutable_by_report_id())
            [privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId];
    report->mutable_daily()->set_last_day_index(start_day_index - 1);
    SystemProfileAggregate* system_profile_agg =
        (*report->mutable_daily()->mutable_by_day_index())[start_day_index]
            .add_system_profile_aggregates();
    system_profile_agg->set_system_profile_hash(system_profile_hash);
    system_profile_agg->add_by_event_code()
        ->mutable_data()
        ->mutable_at_least_once()
        ->set_at_least_once(true);
    aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  // Private and Report participation observations generated for every day of the week.
  for (int i = 0; i < 7; i++) {
    std::vector<Observation> observations;
    std::vector<ObservationMetadata> metadatas;
    int contribution_count = 0;
    TestObservationStoreWriter test_writer(
        [&observations, &metadatas, &contribution_count](
            std::unique_ptr<observation_store::StoredObservation> observation,
            std::unique_ptr<ObservationMetadata> metadata) {
          if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
              metadata->report_id() ==
                  privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
            metadatas.push_back(*metadata);
            if (observation->has_unencrypted()) {
              observations.push_back(observation->unencrypted());
            }
            if (!observation->contribution_id().empty()) {
              contribution_count++;
            }
          }
        });

    logger::ObservationWriter observation_writer(test_writer, nullptr);
    ConstructObservationGenerator(observation_writer, *converter);
    GenerateObservationsOnce(
        util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));

    EXPECT_EQ(observations.size(), 2);
    EXPECT_EQ(contribution_count, 1);
    for (const Observation& obs : observations) {
      EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
    }
    EXPECT_EQ(observations[0].observation_type_case(),
              Observation::ObservationTypeCase::kPrivateIndex);
    EXPECT_EQ(observations[1].observation_type_case(),
              Observation::ObservationTypeCase::kReportParticipation);
    EXPECT_EQ(metadatas.size(), 2);
    for (const ObservationMetadata& metadata : metadatas) {
      EXPECT_EQ(metadata.customer_id(), kCustomerId);
      EXPECT_EQ(metadata.day_index(), start_day_index + i);
      // Stored data's system profile is used.
      EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
      EXPECT_EQ(metadata.system_profile().board_name(), "");
    }
  }

  std::vector<Observation> observations;
  std::vector<ObservationMetadata> metadatas;
  int contribution_count = 0;
  TestObservationStoreWriter test_writer(
      [&observations, &metadatas, &contribution_count](
          std::unique_ptr<observation_store::StoredObservation> observation,
          std::unique_ptr<ObservationMetadata> metadata) {
        if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
            metadata->report_id() ==
                privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
          metadatas.push_back(*metadata);
          if (observation->has_unencrypted()) {
            observations.push_back(observation->unencrypted());
          }
          if (!observation->contribution_id().empty()) {
            contribution_count++;
          }
        }
      });

  logger::ObservationWriter observation_writer(test_writer, nullptr);
  ConstructObservationGenerator(observation_writer, *converter);
  GenerateObservationsOnce(
      util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + 7)));

  // Only Report participation observation generated after the week is over.
  EXPECT_EQ(observations.size(), 1);
  EXPECT_EQ(contribution_count, 1);
  EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize);
  EXPECT_EQ(observations[0].observation_type_case(),
            Observation::ObservationTypeCase::kReportParticipation);
  EXPECT_EQ(metadatas.size(), 1);
  for (const ObservationMetadata& metadata : metadatas) {
    EXPECT_EQ(metadata.customer_id(), kCustomerId);
    EXPECT_EQ(metadata.day_index(), start_day_index + 7);
    // Current system profile is used.
    EXPECT_EQ(metadata.system_profile().system_version(),
              system_data_.system_profile().system_version());
    EXPECT_EQ(metadata.system_profile().board_name(), "");
  }
}

TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservationsForNoObservation) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  {
    MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
    ReportAggregate* report =
        &(*aggregate.aggregate()->mutable_by_report_id())
            [privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId];
    report->mutable_daily()->set_last_day_index(start_day_index - 1);
    ASSERT_TRUE(aggregate.Save().ok());
  }

  // ReportParticipation observations should be generated for all days, even after the week.
  for (int i = 0; i <= 10; i++) {
    std::vector<Observation> observations;
    std::vector<ObservationMetadata> metadatas;
    int contribution_count = 0;
    TestObservationStoreWriter test_writer(
        [&observations, &metadatas, &contribution_count](
            std::unique_ptr<observation_store::StoredObservation> observation,
            std::unique_ptr<ObservationMetadata> metadata) {
          if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
              metadata->report_id() ==
                  privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
            metadatas.push_back(*metadata);
            if (observation->has_unencrypted()) {
              observations.push_back(observation->unencrypted());
            }
            if (!observation->contribution_id().empty()) {
              contribution_count++;
            }
          }
        });

    logger::ObservationWriter observation_writer(test_writer, nullptr);
    ConstructObservationGenerator(observation_writer, *converter);
    GenerateObservationsOnce(
        util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));

    EXPECT_EQ(observations.size(), 1) << " day " << i;
    EXPECT_EQ(contribution_count, 1) << " day " << i;
    EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize) << " day " << i;
    EXPECT_EQ(observations[0].observation_type_case(),
              Observation::ObservationTypeCase::kReportParticipation)
        << " day " << i;
    EXPECT_EQ(metadatas.size(), 1) << " day " << i;
    for (const ObservationMetadata& metadata : metadatas) {
      EXPECT_EQ(metadata.customer_id(), kCustomerId) << " day " << i;
      EXPECT_EQ(metadata.day_index(), start_day_index + i) << " day " << i;
      // Current system profile is used.
      EXPECT_EQ(metadata.system_profile().system_version(),
                system_data_.system_profile().system_version())
          << " day " << i;
      EXPECT_EQ(metadata.system_profile().board_name(), "") << " day " << i;
    }
  }
}

TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservationsInitialBackfill) {
  const MetricDefinition* metric =
      project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
  std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
  CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
                          util::TimePointToDayIndex(starting_time_, *converter, *metric));

  {
    std::vector<Observation> observations;
    std::vector<ObservationMetadata> metadatas;
    int contribution_count = 0;
    TestObservationStoreWriter test_writer(
        [&observations, &metadatas, &contribution_count](
            std::unique_ptr<observation_store::StoredObservation> observation,
            std::unique_ptr<ObservationMetadata> metadata) {
          if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
              metadata->report_id() ==
                  privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
            metadatas.push_back(*metadata);
            if (observation->has_unencrypted()) {
              observations.push_back(observation->unencrypted());
            }
            if (!observation->contribution_id().empty()) {
              contribution_count++;
            }
          }
        });

    logger::ObservationWriter observation_writer(test_writer, nullptr);
    ConstructObservationGenerator(observation_writer, *converter);
    GenerateObservationsOnce(
        util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));

    // Initially 3 observations should be generated for the backstop backfill.
    EXPECT_EQ(observations.size(), 3);
    EXPECT_EQ(contribution_count, 3);
    for (const Observation& obs : observations) {
      EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
      EXPECT_EQ(obs.observation_type_case(),
                Observation::ObservationTypeCase::kReportParticipation);
    }
    EXPECT_EQ(metadatas.size(), 3);
    for (const ObservationMetadata& metadata : metadatas) {
      EXPECT_EQ(metadata.customer_id(), kCustomerId);
      // Current system profile is used.
      EXPECT_EQ(metadata.system_profile().system_version(),
                system_data_.system_profile().system_version());
      EXPECT_EQ(metadata.system_profile().board_name(), "");
    }
    EXPECT_EQ(metadatas[0].day_index(), start_day_index - 2);
    EXPECT_EQ(metadatas[1].day_index(), start_day_index - 1);
    EXPECT_EQ(metadatas[2].day_index(), start_day_index);
  }

  // After the initial backfill, ReportParticipation observations should be generated for all days,
  // even after the week.
  for (int i = 1; i <= 10; i++) {
    std::vector<Observation> observations;
    std::vector<ObservationMetadata> metadatas;
    int contribution_count = 0;
    TestObservationStoreWriter test_writer(
        [&observations, &metadatas, &contribution_count](
            std::unique_ptr<observation_store::StoredObservation> observation,
            std::unique_ptr<ObservationMetadata> metadata) {
          if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
              metadata->report_id() ==
                  privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
            metadatas.push_back(*metadata);
            if (observation->has_unencrypted()) {
              observations.push_back(observation->unencrypted());
            }
            if (!observation->contribution_id().empty()) {
              contribution_count++;
            }
          }
        });

    logger::ObservationWriter observation_writer(test_writer, nullptr);
    ConstructObservationGenerator(observation_writer, *converter);
    GenerateObservationsOnce(
        util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));

    EXPECT_EQ(observations.size(), 1) << " day " << i;
    EXPECT_EQ(contribution_count, 1) << " day " << i;
    EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize) << " day " << i;
    EXPECT_EQ(observations[0].observation_type_case(),
              Observation::ObservationTypeCase::kReportParticipation)
        << " day " << i;
    EXPECT_EQ(metadatas.size(), 1) << " day " << i;
    for (const ObservationMetadata& metadata : metadatas) {
      EXPECT_EQ(metadata.customer_id(), kCustomerId) << " day " << i;
      EXPECT_EQ(metadata.day_index(), start_day_index + i) << " day " << i;
      // Current system profile is used.
      EXPECT_EQ(metadata.system_profile().system_version(),
                system_data_.system_profile().system_version())
          << " day " << i;
      EXPECT_EQ(metadata.system_profile().board_name(), "") << " day " << i;
    }
  }
}

}  // namespace cobalt::local_aggregation
