blob: 0183c41167043f3ed63ee70f6db580aa5cb52a18 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation/observation_generator.h"
#include <memory>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "src/algorithms/random/random.h"
#include "src/algorithms/random/test_secure_random.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation/backfill_manager.h"
#include "src/local_aggregation/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/local_aggregation/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation/local_aggregation.pb.h"
#include "src/local_aggregation/testing/test_privacy_registry.cb.h"
#include "src/local_aggregation/testing/test_registry.cb.h"
#include "src/logger/fake_logger.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/observation_writer.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/logger/types.h"
#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/public/lib/statusor/status_macros.h"
#include "src/registry/testing/testing.h"
#include "src/system_data/client_secret.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using TimeInfo = util::TimeInfo;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
// The expected size in bytes of an Observation's |random_id| field.
constexpr size_t kRandomIdSize = 8u;
const lib::ProjectIdentifier kProjectIdentifier =
lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId);
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry(const std::string& registryBase64) {
std::string bytes;
if (!absl::Base64Unescape(registryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
class FakePrivacyEncoder : public logger::PrivacyEncoder {
public:
explicit FakePrivacyEncoder(bool return_private_observations, int num_private_observations = 0)
: PrivacyEncoder(std::make_unique<TestSecureRandomNumberGenerator>(0),
std::make_unique<RandomNumberGenerator>(0)),
return_private_observations_(return_private_observations),
num_private_observations_(num_private_observations) {}
lib::statusor::StatusOr<std::vector<std::unique_ptr<Observation>>> MaybeMakePrivateObservations(
std::unique_ptr<Observation> observation, const MetricDefinition& /*metric_def*/,
const ReportDefinition& /*report_def*/) override {
std::vector<std::unique_ptr<Observation>> observations;
if (!return_private_observations_) {
observations.push_back(std::move(observation));
return observations;
}
for (int i = 0; i < num_private_observations_; i++) {
auto observation = std::make_unique<Observation>();
observations.push_back(std::move(observation));
}
return observations;
}
private:
bool return_private_observations_;
int num_private_observations_;
};
} // namespace
class ObservationGeneratorTest : public util::testing::TestWithFiles {
public:
void SetUp() override {
MakeTestFolder();
system_clock_ = std::make_unique<util::IncrementingSystemClock>();
system_clock_->increment_by(std::chrono::hours(util::kNumHoursPerDay * 100));
starting_time_ = system_clock_->now();
test_clock_ = std::make_unique<util::FakeValidatedClock>(system_clock_.get());
test_clock_->SetAccurate(true);
// The current SystemProfile has a system version of 101 and 1 experiment ID.
system_data_.SetVersion("101");
SetRegistry();
}
void SetRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(kCobaltRegistryBase64)) {
project_context_factory_ = std::make_unique<logger::ProjectContextFactory>(
util::WrapNotNullUniquePtr(std::move(registry)).value());
aggregate_storage_ = LocalAggregateStorage::New(
LocalAggregateStorage::StorageStrategy::Immediate, local_aggregation_store_path(), fs(),
*project_context_factory_, system_data_, 0);
project_context_ = GetProjectContext();
}
MetricAggregateRef GetMetricAggregate(uint32_t metric_id) {
lib::statusor::StatusOr<MetricAggregateRef> metric_aggregate_or =
aggregate_storage_->GetMetricAggregate(kProjectIdentifier.ForMetric(metric_id));
EXPECT_TRUE(metric_aggregate_or.ok());
return std::move(metric_aggregate_or).value();
}
std::shared_ptr<const logger::ProjectContext> GetProjectContext() {
return project_context_factory_->NewProjectContext(
lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId));
}
void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
std::unique_ptr<FakePrivacyEncoder> privacy_encoder,
util::CivilTimeConverterInterface& civil_time_converter,
logger::InternalMetrics* internal_metrics = nullptr,
bool generate_observations_with_current_system_profile = false,
bool test_dont_backfill_empty_reports = false) {
observation_generator_ = std::make_unique<ObservationGenerator>(
*aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
std::move(privacy_encoder), civil_time_converter,
generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
observation_generator_->ResetInternalMetrics(internal_metrics);
}
void TearDown() override {
if (observation_generator_) {
observation_generator_->ShutDown();
}
}
Status GenerateObservationsOnce(std::chrono::system_clock::time_point system_time) {
return observation_generator_->GenerateObservationsOnce(system_time);
}
friend class BasicLogger;
protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<util::IncrementingSystemClock> system_clock_;
std::unique_ptr<util::FakeValidatedClock> test_clock_;
std::chrono::system_clock::time_point starting_time_;
std::unique_ptr<logger::ProjectContextFactory> project_context_factory_;
// The `ProjectContext` for the test customer and project.
std::shared_ptr<const logger::ProjectContext> project_context_;
std::unique_ptr<LocalAggregateStorage> aggregate_storage_;
std::unique_ptr<ObservationGenerator> observation_generator_;
};
class TestObservationStoreWriter : public observation_store::ObservationStoreWriterInterface {
public:
explicit TestObservationStoreWriter(
std::function<void(std::unique_ptr<observation_store::StoredObservation>,
std::unique_ptr<ObservationMetadata>)>
watcher)
: watcher_(std::move(watcher)) {}
Status StoreObservation(std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) override {
watcher_(std::move(observation), std::move(metadata));
return Status::OkStatus();
}
private:
std::function<void(std::unique_ptr<observation_store::StoredObservation>,
std::unique_ptr<ObservationMetadata>)>
watcher_;
};
struct ObservationGeneratorTestParams {
uint32_t metric_id;
uint32_t report_id;
std::function<std::unique_ptr<util::CivilTimeConverterInterface>()> converter_fn;
};
class ObservationGeneratorTestWithParams
: public ObservationGeneratorTest,
public ::testing::WithParamInterface<ObservationGeneratorTestParams> {};
using HourlyObservationGeneratorTest = ObservationGeneratorTestWithParams;
using DailyObservationGeneratorTest = ObservationGeneratorTestWithParams;
using SystemProfileObservationGeneratorTest = ObservationGeneratorTestWithParams;
TEST_P(HourlyObservationGeneratorTest, GeneratesHourlyObservations) {
// The delta between the hour IDs for consecutive hours, assuming no change in time zone
// (including DST status).
const int kHourIdsPerHour = 2;
const uint32_t kMaxHourIdOffset = 101;
ObservationGeneratorTestParams params = GetParam();
std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
util::TimePointToHourId(starting_time_, *converter, metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
for (uint32_t i = 0; i <= kMaxHourIdOffset; i += kHourIdsPerHour) {
uint32_t count = (i + 1) * 100;
SystemProfileAggregate* agg =
(*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id + i]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->set_count(count);
}
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
uint32_t metric_id = params.metric_id;
uint32_t report_id = params.report_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
for (uint32_t i = 0; i <= kMaxHourIdOffset; i += 2 * kHourIdsPerHour) {
// Generate observations for the hour ending at `end_time`.
std::chrono::system_clock::time_point end_time =
starting_time_ + std::chrono::hours(i / kHourIdsPerHour);
GenerateObservationsOnce(end_time + std::chrono::hours(1));
CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index,
util::TimePointToDayIndex(end_time, *converter, metric));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), day_index)
<< "Incorrect day index for hour ID offset i: " << i;
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(), (i + 1) * 100)
<< "Incorrect count for hour ID offset i: " << i;
}
}
INSTANTIATE_TEST_SUITE_P(
GenerateHourlyObservations, HourlyObservationGeneratorTest,
::testing::Values(
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceMetricFleetwideOccurrenceCountsReportReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceMetricFleetwideOccurrenceCountsReportReportId,
.converter_fn =
[]() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId,
.converter_fn = []() {
return std::make_unique<util::FakeCivilTimeConverter>(2, false);
}}));
TEST_F(ObservationGeneratorTest, GeneratesHourlyObservationsWithDstStart) {
// Use a civil time converter that simulates a transition from PST (UTC-8h) to PDT (UTC-7h), with
// the transition occurring 1 hour after the start of the test.
std::unique_ptr<util::FakeCivilTimeConverter> converter =
std::make_unique<util::FakeCivilTimeConverter>(
/*start_utc_offset=*/-8, /*start_isdst=*/false,
/*end_utc_offset=*/-7, /*end_isdst=*/true,
/*threshold=*/starting_time_ + std::chrono::hours(1));
const MetricDefinition& metric = *project_context_->GetMetric(kOccurrenceOtherTzMetricMetricId);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
util::TimePointToHourId(starting_time_, *converter, metric));
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceOtherTzMetricMetricId &&
metadata->report_id() ==
kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())
[kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];
SystemProfileAggregate* agg =
(*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->set_count(100);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
// Increment the clock by 1 hour, reaching the time zone transition threshold. The hour ID
// increases by 3.
system_clock_->increment_by(std::chrono::hours(1));
CB_ASSERT_OK_AND_ASSIGN(uint32_t next_hour_id,
util::TimePointToHourId(system_clock_->now(), *converter, metric));
ASSERT_EQ(next_hour_id, start_hour_id + 3);
GenerateObservationsOnce(system_clock_->now());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(start_hour_id));
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(), 100);
{
MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())
[kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];
SystemProfileAggregate* agg =
(*report_agg->mutable_hourly()->mutable_by_hour_id())[next_hour_id]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->set_count(200);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
// Increment the clock by 1 hour. The hour ID increases by 2 since `next_hour_id`, for a total of
// `start_hour_id + 5`.
system_clock_->increment_by(std::chrono::hours(1));
CB_ASSERT_OK_AND_ASSIGN(uint32_t final_hour_id,
util::TimePointToHourId(system_clock_->now(), *converter, metric));
ASSERT_EQ(final_hour_id, start_hour_id + 5);
GenerateObservationsOnce(system_clock_->now());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(next_hour_id));
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(), 200);
}
TEST_F(ObservationGeneratorTest, GeneratesHourlyObservationsWithDstEnd) {
// Use a civil time converter that simulates a transition from PDT (UTC-7h) to PST (UTC-8h), with
// the transition occurring 1 hour after the start of the test.
std::unique_ptr<util::FakeCivilTimeConverter> converter =
std::make_unique<util::FakeCivilTimeConverter>(
/*start_utc_offset=*/-7, /*start_isdst=*/true,
/*end_utc_offset=*/-8, /*end_isdst=*/false,
/*threshold=*/starting_time_ + std::chrono::hours(1));
const MetricDefinition& metric = *project_context_->GetMetric(kOccurrenceOtherTzMetricMetricId);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_hour_id,
util::TimePointToHourId(starting_time_, *converter, metric));
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceOtherTzMetricMetricId &&
metadata->report_id() ==
kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())
[kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];
SystemProfileAggregate* agg =
(*report_agg->mutable_hourly()->mutable_by_hour_id())[start_hour_id]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->set_count(100);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
// Increment the clock by 1 hour, reaching the time zone transition threshold. The hour ID
// increases by 1.
system_clock_->increment_by(std::chrono::hours(1));
CB_ASSERT_OK_AND_ASSIGN(uint32_t next_hour_id,
util::TimePointToHourId(system_clock_->now(), *converter, metric));
ASSERT_EQ(next_hour_id, start_hour_id + 1);
GenerateObservationsOnce(system_clock_->now());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(start_hour_id));
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(), 100);
{
MetricAggregateRef metric_agg = GetMetricAggregate(metric.id());
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())
[kOccurrenceOtherTzMetricFleetwideOccurrenceCountsReportReportId];
SystemProfileAggregate* agg =
(*report_agg->mutable_hourly()->mutable_by_hour_id())[next_hour_id]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->set_count(200);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
// Increment the clock by 1 hour. The hour ID increases by 2 since `next_hour_id`, for a total of
// `start_hour_id + 3`.
system_clock_->increment_by(std::chrono::hours(1));
CB_ASSERT_OK_AND_ASSIGN(uint32_t final_hour_id,
util::TimePointToHourId(system_clock_->now(), *converter, metric));
ASSERT_EQ(final_hour_id, start_hour_id + 3);
GenerateObservationsOnce(system_clock_->now());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(next_hour_id));
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(), 200);
}
TEST_P(DailyObservationGeneratorTest, GeneratesDailyObservations) {
const uint32_t kMaxDayIndexOffset = 5;
ObservationGeneratorTestParams params = GetParam();
std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
for (uint32_t i = 0; i <= kMaxDayIndexOffset; i += 1) {
SystemProfileAggregate* agg =
(*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index + i]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
}
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
uint32_t metric_id = params.metric_id;
uint32_t report_id = params.report_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
for (uint32_t i = 0; i <= kMaxDayIndexOffset; i += 1) {
// Generate observations for the day ending at `end_time`.
std::chrono::system_clock::time_point end_time =
starting_time_ + std::chrono::hours(i * util::kNumHoursPerDay);
GenerateObservationsOnce(end_time + std::chrono::hours(util::kNumHoursPerDay));
CB_ASSERT_OK_AND_ASSIGN(uint32_t day_index,
util::TimePointToDayIndex(end_time, *converter, metric));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), day_index)
<< "Incorrect day index for day index offset i: " << i;
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1)
<< "Incorrect count for day index offset i: " << i;
}
}
INSTANTIATE_TEST_SUITE_P(
GenerateDailyObservations, DailyObservationGeneratorTest,
::testing::Values(
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceMetricUniqueDeviceCountsReport1DayReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
.converter_fn =
[]() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsReport1DayReportId,
.converter_fn = []() {
return std::make_unique<util::FakeCivilTimeConverter>(2, false);
}}));
TEST_F(ObservationGeneratorTest, GeneratesNoObservationsForNoAggregateData) {
const uint32_t kNumDays = 5;
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
TimeInfo::FromTimePoint(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
report->mutable_daily()->set_last_day_index(starting_time_info.day_index);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceMetricMetricId &&
metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
for (uint32_t i = 0; i <= kNumDays; ++i) {
GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay * kNumDays));
EXPECT_FALSE(last_metadata);
EXPECT_FALSE(last_observation);
}
}
TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsAsExpectedDespiteFailure) {
const uint32_t kNumDays = 5;
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceMetricMetricId &&
metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
// Set the MetricType to an invalid type.
SetRegistry(testing::MutateProject(GetRegistry(kCobaltRegistryBase64), kCustomerId, kProjectId,
[](ProjectConfig* project) {
project->mutable_metrics(kOccurrenceMetricMetricIndex)
->set_metric_type(MetricDefinition::INTEGER_HISTOGRAM);
}));
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
// This generation should fail because the aggregation procedure
// couldn't be created.
EXPECT_NE(StatusCode::OK,
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)))
.error_code());
// Reset to default registry.
SetRegistry();
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
// Data should still be collected as expected.
for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
EXPECT_EQ(StatusCode::OK,
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(i + 1)))
.error_code());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), i);
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
}
TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsWithReportAllAsExpected) {
SetRegistry(testing::MutateProject(
GetRegistry(kCobaltRegistryBase64), kCustomerId, kProjectId, [](ProjectConfig* project) {
project->mutable_metrics(kOccurrenceMetricMetricIndex)
->mutable_reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex)
->set_system_profile_selection(SystemProfileSelectionPolicy::REPORT_ALL);
}));
const uint32_t kNumDays = 5;
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceMetricMetricId &&
metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
for (uint32_t i = start_day_index; i < start_day_index + kNumDays; i += 1) {
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(i + 1)));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), i);
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
}
TEST_F(ObservationGeneratorTest, GeneratesPrivateObservations) {
uint32_t kMaxHourOffset = 101;
int kNumPrivateObs = 2;
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
TimeInfo::FromTimePoint(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
auto* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report->mutable_hourly()->set_last_hour_id(starting_time_info.hour_id - 2);
for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
i += 2) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()->mutable_data()->set_count(
(i - starting_time_info.hour_id + 1) * 100);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
observations.reserve(kNumPrivateObs);
std::vector<ObservationMetadata> metadatas;
metadatas.reserve(kNumPrivateObs);
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceMetricMetricId &&
metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(
observation_writer, std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs), *converter);
GenerateObservationsOnce(starting_time_ + util::kOneHour);
EXPECT_EQ(observations.size(), kNumPrivateObs);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(metadatas.size(), kNumPrivateObs);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info.hour_id));
// Stored data's system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(ObservationGeneratorTest, GeneratesPrivateObservationsForNoObservation) {
int kNumPrivateObs = 2;
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
TimeInfo::FromTimePoint(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
auto* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report->mutable_hourly()->set_last_hour_id(starting_time_info.hour_id - 2);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
observations.reserve(kNumPrivateObs);
std::vector<ObservationMetadata> metadatas;
metadatas.reserve(kNumPrivateObs);
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == kOccurrenceMetricMetricId &&
metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(
observation_writer, std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs), *converter);
GenerateObservationsOnce(starting_time_ + util::kOneHour);
EXPECT_EQ(observations.size(), kNumPrivateObs);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(metadatas.size(), kNumPrivateObs);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info.hour_id));
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_P(SystemProfileObservationGeneratorTest, GenerateObservationsFallbackToCurrentSystemProfile) {
ObservationGeneratorTestParams params = GetParam();
std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::ANDROID);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
// Use the report that has experiment IDs in the system profile.
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
SystemProfileAggregate* agg =
(*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
// Don't store the system_profile.
ASSERT_TRUE(metric_agg.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
uint32_t metric_id = params.metric_id;
uint32_t report_id = params.report_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter);
GenerateObservationsOnce(starting_time_ + util::kOneDay);
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), start_day_index);
// System profile fields were set from the current system profile.
EXPECT_NE(last_metadata->system_profile().os(), system_profile.os());
EXPECT_NE(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().os(), system_data_.system_profile().os());
EXPECT_EQ(last_metadata->system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
// Use a report that has experiment IDs in the system profile.
TEST_P(SystemProfileObservationGeneratorTest, GenerateObservationsWithCurrentSystemProfile) {
ObservationGeneratorTestParams params = GetParam();
std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, metric));
SystemProfile system_profile;
// No system profile fields except OS.
system_profile.set_os(SystemProfile::ANDROID);
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
SystemProfileAggregate* agg =
(*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
uint32_t metric_id = params.metric_id;
uint32_t report_id = params.report_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter, /*internal_metrics=*/nullptr,
/*generate_observations_with_current_system_profile=*/true);
// Generate observations for the day with day index `start_day_index`.
GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), start_day_index);
// Unset system profile fields were set from the current system
// profile.
EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
EXPECT_EQ(last_metadata->system_profile().system_version(),
system_data_.system_profile().system_version());
ASSERT_EQ(last_metadata->system_profile().experiment_ids_size(), 1);
EXPECT_EQ(last_metadata->system_profile().experiment_ids(0),
system_data_.system_profile().experiment_ids(0));
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
// Use a report that has experiment IDs in the system profile.
TEST_P(SystemProfileObservationGeneratorTest,
GenerateObservationsWithCurrentSystemProfileDoesNotOverwrite) {
ObservationGeneratorTestParams params = GetParam();
std::unique_ptr<util::CivilTimeConverterInterface> converter = params.converter_fn();
const MetricDefinition& metric = *project_context_->GetMetric(params.metric_id);
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::ANDROID);
system_profile.set_system_version("100");
system_profile.add_experiment_ids(2222);
system_profile.add_experiment_ids(3333);
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef metric_agg = GetMetricAggregate(params.metric_id);
ReportAggregate* report_agg =
&(*metric_agg.aggregate()->mutable_by_report_id())[params.report_id];
SystemProfileAggregate* agg =
(*report_agg->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
agg->set_system_profile_hash(system_profile_hash);
agg->add_by_event_code()->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
metric_agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(metric_agg.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
uint32_t metric_id = params.metric_id;
uint32_t report_id = params.report_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id, &metric_id, &report_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == metric_id && metadata->report_id() == report_id) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter, /*internal_metrics=*/nullptr,
/*generate_observations_with_current_system_profile=*/true);
// Generate observations for the day with day index `start_day_index`.
GenerateObservationsOnce(starting_time_ + std::chrono::hours(util::kNumHoursPerDay));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), start_day_index);
// All system profile fields were set from the aggregate system
// profile.
EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
ASSERT_EQ(last_metadata->system_profile().experiment_ids_size(), 2);
EXPECT_EQ(last_metadata->system_profile().experiment_ids(0), system_profile.experiment_ids(0));
EXPECT_EQ(last_metadata->system_profile().experiment_ids(1), system_profile.experiment_ids(1));
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
INSTANTIATE_TEST_SUITE_P(
GenerateObservationWithSystemProfile, SystemProfileObservationGeneratorTest,
::testing::Values(
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
.converter_fn = []() { return std::make_unique<util::UtcTimeConverter>(); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceMetricMetricId,
.report_id = kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
.converter_fn =
[]() { return std::make_unique<util::FakeCivilTimeConverter>(2, false); }},
ObservationGeneratorTestParams{
.metric_id = kOccurrenceOtherTzMetricMetricId,
.report_id = kOccurrenceOtherTzMetricUniqueDeviceCountsSelectFirstReport1DayReportId,
.converter_fn = []() {
return std::make_unique<util::FakeCivilTimeConverter>(2, false);
}}));
class BasicLogger : public logger::testing::FakeLogger {
public:
explicit BasicLogger(ObservationGeneratorTest* test) : test_(test) {}
Status LogInteger(uint32_t metric_id, int64_t value,
const std::vector<uint32_t>& event_codes) override {
// Lock the aggregate store to simulate actually performing log.
test_->aggregate_storage_->GetMetricAggregate(
lib::CustomerIdentifier(1).ForProject(1).ForMetric(1));
return logger::testing::FakeLogger::LogInteger(metric_id, value, event_codes);
}
private:
ObservationGeneratorTest* test_;
};
TEST_F(ObservationGeneratorTest, DoesNotDeadlock) {
aggregate_storage_ = LocalAggregateStorage::New(LocalAggregateStorage::StorageStrategy::Delayed,
local_aggregation_store_path(), fs(),
*project_context_factory_, system_data_, 0);
observation_store::FileObservationStore obs_store(10000, 10000, 10000, fs(), nullptr,
test_folder() + "/obs_store");
BasicLogger logger(this);
logger::InternalMetricsImpl internal_metrics(logger, nullptr);
obs_store.ResetInternalMetrics(&internal_metrics);
const MetricDefinition* metric = project_context_->GetMetric(kOccurrenceMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(TimeInfo starting_time_info,
TimeInfo::FromTimePoint(starting_time_, *converter, *metric));
const uint32_t kMaxHourOffset = 101;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
i += 2) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()->mutable_data()->set_count(
(i - starting_time_info.hour_id + 1) * 100);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
logger::ObservationWriter observation_writer(obs_store, nullptr);
ConstructObservationGenerator(observation_writer, std::make_unique<FakePrivacyEncoder>(false),
*converter, &internal_metrics);
for (uint32_t i = starting_time_info.hour_id; i < starting_time_info.hour_id + kMaxHourOffset;
i += 4) {
GenerateObservationsOnce(util::FromUnixSeconds(util::HourIdToUnixSeconds(i + 1)));
}
}
// Test class for tests that use the real privacy generator.
class PrivacyObservationGeneratorTest : public ObservationGeneratorTest {
public:
void SetUp() override {
ObservationGeneratorTest::SetUp();
SetRegistry(GetRegistry(privacy::kCobaltRegistryBase64));
}
void ConstructObservationGenerator(const logger::ObservationWriter& observation_writer,
util::CivilTimeConverterInterface& civil_time_converter,
logger::InternalMetrics* internal_metrics = nullptr,
bool generate_observations_with_current_system_profile = false,
bool test_dont_backfill_empty_reports = false) {
std::unique_ptr<logger::PrivacyEncoder> privacy_encoder =
std::make_unique<logger::PrivacyEncoder>(
std::make_unique<TestSecureRandomNumberGenerator>(0),
std::make_unique<RandomNumberGenerator>(0));
observation_generator_ = std::make_unique<ObservationGenerator>(
*aggregate_storage_, *project_context_factory_, system_data_, observation_writer,
std::move(privacy_encoder), civil_time_converter,
generate_observations_with_current_system_profile, test_dont_backfill_empty_reports);
observation_generator_->ResetInternalMetrics(internal_metrics);
}
};
TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservations) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
EXPECT_EQ(observations.size(), 2);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kPrivateIndex);
EXPECT_EQ(observations[1].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation);
EXPECT_EQ(metadatas.size(), 2);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), start_day_index);
// Stored data's system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservationsForNoObservation) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
EXPECT_EQ(observations.size(), 1);
EXPECT_EQ(contribution_count, 1);
EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize);
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation);
EXPECT_EQ(metadatas.size(), 1);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), start_day_index);
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservationsNewReport) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// Initially no observations should be generated for the new report.
EXPECT_EQ(observations.size(), 0);
EXPECT_EQ(contribution_count, 0);
EXPECT_EQ(metadatas.size(), 0);
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesDailyPrivateObservationsBackfillAfterSomeTime) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId];
// Initialize the report to simulate 10 days of being offline.
report->mutable_daily()->set_last_day_index(start_day_index - 10);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport1DayReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// Only 3 observations should be generated for the backstop backfill.
EXPECT_EQ(observations.size(), 3);
EXPECT_EQ(contribution_count, 3);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
EXPECT_EQ(obs.observation_type_case(), Observation::ObservationTypeCase::kReportParticipation);
}
ASSERT_EQ(metadatas.size(), 3);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
EXPECT_EQ(metadatas[0].day_index(), start_day_index - 2);
EXPECT_EQ(metadatas[1].day_index(), start_day_index - 1);
EXPECT_EQ(metadatas[2].day_index(), start_day_index);
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservations) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
// Private and Report participation observations generated for every day of the week.
for (int i = 0; i < 7; i++) {
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));
EXPECT_EQ(observations.size(), 2);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kPrivateIndex);
EXPECT_EQ(observations[1].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation);
EXPECT_EQ(metadatas.size(), 2);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), start_day_index + i);
// Stored data's system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + 7)));
// Only Report participation observation generated after the week is over.
EXPECT_EQ(observations.size(), 1);
EXPECT_EQ(contribution_count, 1);
EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize);
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation);
EXPECT_EQ(metadatas.size(), 1);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), start_day_index + 7);
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservationsForNoObservation) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
ASSERT_TRUE(aggregate.Save().ok());
}
// ReportParticipation observations should be generated for all days, even after the week.
for (int i = 0; i <= 10; i++) {
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));
EXPECT_EQ(observations.size(), 1) << " day " << i;
EXPECT_EQ(contribution_count, 1) << " day " << i;
EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize) << " day " << i;
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation)
<< " day " << i;
EXPECT_EQ(metadatas.size(), 1) << " day " << i;
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId) << " day " << i;
EXPECT_EQ(metadata.day_index(), start_day_index + i) << " day " << i;
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version())
<< " day " << i;
EXPECT_EQ(metadata.system_profile().board_name(), "") << " day " << i;
}
}
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservationsNewReport) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
{
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// Initially no observations should be generated for the new report.
EXPECT_EQ(observations.size(), 0);
EXPECT_EQ(contribution_count, 0);
EXPECT_EQ(metadatas.size(), 0);
}
// After the initial backfill, ReportParticipation observations should be generated for all days,
// even after the week.
for (int i = 1; i <= 10; i++) {
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1 + i)));
EXPECT_EQ(observations.size(), 1) << " day " << i;
EXPECT_EQ(contribution_count, 1) << " day " << i;
EXPECT_EQ(observations[0].random_id().size(), kRandomIdSize) << " day " << i;
EXPECT_EQ(observations[0].observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation)
<< " day " << i;
EXPECT_EQ(metadatas.size(), 1) << " day " << i;
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId) << " day " << i;
EXPECT_EQ(metadata.day_index(), start_day_index + i) << " day " << i;
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version())
<< " day " << i;
EXPECT_EQ(metadata.system_profile().board_name(), "") << " day " << i;
}
}
}
TEST_F(PrivacyObservationGeneratorTest, GeneratesWeeklyPrivateObservationsBackfillAfterSomeTime) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId];
// Initialize the report to simulate 10 days of being offline.
report->mutable_daily()->set_last_day_index(start_day_index - 10);
ASSERT_TRUE(aggregate.Save().ok());
}
{
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyUniqueDeviceCountsReport7DaysReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(
util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// Only 3 observations should be generated for the backstop backfill.
EXPECT_EQ(observations.size(), 3);
EXPECT_EQ(contribution_count, 3);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
EXPECT_EQ(obs.observation_type_case(),
Observation::ObservationTypeCase::kReportParticipation);
}
ASSERT_EQ(metadatas.size(), 3);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
EXPECT_EQ(metadatas[0].day_index(), start_day_index - 2);
EXPECT_EQ(metadatas[1].day_index(), start_day_index - 1);
EXPECT_EQ(metadatas[2].day_index(), start_day_index);
}
}
TEST_F(PrivacyObservationGeneratorTest, SkipsReportsNotInTheReleaseStage) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kOccurrenceMetricPrivacyMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kOccurrenceMetricPrivacyMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kOccurrenceMetricPrivacyNotCollectedDebugReportReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kOccurrenceMetricPrivacyMetricId &&
metadata->report_id() ==
privacy::kOccurrenceMetricPrivacyNotCollectedDebugReportReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// No observations should be generated for the DEBUG report on the GA device.
EXPECT_EQ(observations.size(), 0);
EXPECT_EQ(contribution_count, 0);
EXPECT_EQ(metadatas.size(), 0);
}
TEST_F(PrivacyObservationGeneratorTest, SkipsMetricsNotInTheReleaseStage) {
const MetricDefinition* metric =
project_context_->GetMetric(privacy::kNotCollectedDebugMetricMetricId);
std::unique_ptr<util::UtcTimeConverter> converter = std::make_unique<util::UtcTimeConverter>();
CB_ASSERT_OK_AND_ASSIGN(uint32_t start_day_index,
util::TimePointToDayIndex(starting_time_, *converter, *metric));
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(privacy::kNotCollectedDebugMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[privacy::kNotCollectedDebugMetricUniqueDeviceCountsReport1DayReportId];
report->mutable_daily()->set_last_day_index(start_day_index - 1);
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[start_day_index]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
std::vector<ObservationMetadata> metadatas;
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->metric_id() == privacy::kNotCollectedDebugMetricMetricId &&
metadata->report_id() ==
privacy::kNotCollectedDebugMetricUniqueDeviceCountsReport1DayReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(test_writer, nullptr);
ConstructObservationGenerator(observation_writer, *converter);
GenerateObservationsOnce(util::FromUnixSeconds(util::DayIndexToUnixSeconds(start_day_index + 1)));
// No observations should be generated for the DEBUG metric on the GA device.
EXPECT_EQ(observations.size(), 0);
EXPECT_EQ(contribution_count, 0);
EXPECT_EQ(metadatas.size(), 0);
}
} // namespace cobalt::local_aggregation