blob: 88bd49e5644a56acae50072e1c8af19ae223b7e2 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/observation_generator.h"
#include <memory>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "src/algorithms/random/test_secure_random.h"
#include "src/lib/util/clock.h"
#include "src/lib/util/datetime_util.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/aggregation_procedures/aggregation_procedure.h"
#include "src/local_aggregation_1_1/backfill_manager.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/local_aggregation.pb.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/fake_logger.h"
#include "src/logger/internal_metrics.h"
#include "src/logger/internal_metrics_config.cb.h"
#include "src/logger/observation_writer.h"
#include "src/logger/privacy_encoder.h"
#include "src/logger/project_context_factory.h"
#include "src/logger/types.h"
#include "src/observation_store/file_observation_store.h"
#include "src/observation_store/observation_store.h"
#include "src/observation_store/observation_store_internal.pb.h"
#include "src/pb/metadata_builder.h"
#include "src/pb/observation_batch.pb.h"
#include "src/public/lib/registry_identifiers.h"
#include "src/public/lib/status.h"
#include "src/registry/testing/testing.h"
#include "src/system_data/client_secret.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using TimeInfo = util::TimeInfo;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
// The expected size in bytes of an Observation's |random_id| field.
constexpr size_t kRandomIdSize = 8u;
const lib::ProjectIdentifier kProjectIdentifier =
lib::CustomerIdentifier(kCustomerId).ForProject(kProjectId);
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
std::string bytes;
if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
class FakePrivacyEncoder : public logger::PrivacyEncoder {
public:
explicit FakePrivacyEncoder(bool return_private_observations, int num_private_observations = 0)
: PrivacyEncoder(std::make_unique<TestSecureRandomNumberGenerator>(0),
std::make_unique<RandomNumberGenerator>(0)),
return_private_observations_(return_private_observations),
num_private_observations_(num_private_observations) {}
lib::statusor::StatusOr<std::vector<std::unique_ptr<Observation>>> MaybeMakePrivateObservations(
std::unique_ptr<Observation> observation, const MetricDefinition& /*metric_def*/,
const ReportDefinition& /*report_def*/) override {
std::vector<std::unique_ptr<Observation>> observations;
if (!return_private_observations_) {
observations.push_back(std::move(observation));
return observations;
}
for (int i = 0; i < num_private_observations_; i++) {
auto observation = std::make_unique<Observation>();
observations.push_back(std::move(observation));
}
return observations;
}
private:
bool return_private_observations_;
int num_private_observations_;
};
} // namespace
class ObservationGeneratorTest : public util::testing::TestWithFiles {
public:
void SetUp() override {
MakeTestFolder();
system_clock_ = std::make_unique<util::IncrementingSystemClock>();
system_clock_->increment_by(std::chrono::hours(24 * 100));
starting_time_info_ = TimeInfo::FromTimePoint(system_clock_->now(), MetricDefinition::UTC);
test_clock_ = std::make_unique<util::FakeValidatedClock>(system_clock_.get());
test_clock_->SetAccurate(true);
metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, test_clock_.get(),
system_data_cache_path(), fs());
// The current SystemProfile has a system version of 101 and 1 experiment token.
system_data_.SetVersion("101");
metadata_builder_->SnapshotSystemData();
SetRegistry();
}
void SetRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry()) {
project_context_factory_ = std::make_unique<logger::ProjectContextFactory>(std::move(registry));
aggregate_storage_ = LocalAggregateStorage::New(
LocalAggregateStorage::StorageStrategy::Immediate, local_aggregation_store_path(), fs(),
project_context_factory_.get(), metadata_builder_.get(), 0);
}
MetricAggregateRef GetMetricAggregate(uint32_t metric_id) {
lib::statusor::StatusOr<MetricAggregateRef> metric_aggregate_or =
aggregate_storage_->GetMetricAggregate(kProjectIdentifier.ForMetric(metric_id));
EXPECT_TRUE(metric_aggregate_or.ok());
return metric_aggregate_or.ConsumeValueOrDie();
}
void ConstructObservationGenerator(
const logger::ObservationWriter* observation_writer,
std::unique_ptr<FakePrivacyEncoder> privacy_encoder,
bool generate_observations_with_current_system_profile = false) {
observation_generator_ = std::make_unique<ObservationGenerator>(
aggregate_storage_.get(), project_context_factory_.get(), &system_data_, observation_writer,
std::move(privacy_encoder), generate_observations_with_current_system_profile);
}
void TearDown() override { observation_generator_->ShutDown(); }
Status GenerateObservationsOnce(util::TimeInfo utc, util::TimeInfo local) {
return observation_generator_->GenerateObservationsOnce(utc, local);
}
friend class BasicLogger;
protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<util::IncrementingSystemClock> system_clock_;
std::unique_ptr<util::FakeValidatedClock> test_clock_;
TimeInfo starting_time_info_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
std::unique_ptr<logger::ProjectContextFactory> project_context_factory_;
std::unique_ptr<LocalAggregateStorage> aggregate_storage_;
std::unique_ptr<ObservationGenerator> observation_generator_;
};
class TestObservationStoreWriter : public observation_store::ObservationStoreWriterInterface {
public:
explicit TestObservationStoreWriter(
std::function<void(std::unique_ptr<observation_store::StoredObservation>,
std::unique_ptr<ObservationMetadata>)>
watcher)
: watcher_(std::move(watcher)) {}
Status StoreObservation(std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) override {
watcher_(std::move(observation), std::move(metadata));
return Status::OkStatus();
}
private:
std::function<void(std::unique_ptr<observation_store::StoredObservation>,
std::unique_ptr<ObservationMetadata>)>
watcher_;
};
TEST_F(ObservationGeneratorTest, GeneratesHourlyObservationsAsExpected) {
const uint32_t kMaxHourId = 101;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
for (uint32_t i = starting_time_info_.hour_id; i < starting_time_info_.hour_id + kMaxHourId;
i += 2) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()->mutable_data()->set_count(
(i - starting_time_info_.hour_id + 1) * 100);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = starting_time_info_.hour_id; i < starting_time_info_.hour_id + kMaxHourId;
i += 4) {
GenerateObservationsOnce(TimeInfo::FromHourId(i), TimeInfo::FromHourId(i));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), util::HourIdToDayIndex(i)) << "Error for i: " << i;
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_GT(last_observation->integer().values_size(), 0);
EXPECT_EQ(last_observation->integer().values(0).value(),
(i - starting_time_info_.hour_id + 1) * 100);
}
}
TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsAsExpected) {
const uint32_t kMaxDayIndex = 5;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), i);
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
}
TEST_F(ObservationGeneratorTest, GeneratesNoObservationsForNoAggregate) {
const uint32_t kMaxDayIndex = 5;
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i));
EXPECT_FALSE(last_metadata);
EXPECT_FALSE(last_observation);
}
}
TEST_F(ObservationGeneratorTest, GeneratesNoObservationsForNoAggregateData) {
const uint32_t kMaxDayIndex = 5;
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
report->mutable_daily()->set_last_day_index(starting_time_info_.day_index);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i));
EXPECT_FALSE(last_metadata);
EXPECT_FALSE(last_observation);
}
}
TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsAsExpectedDespiteFailure) {
const uint32_t kMaxDayIndex = 5;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
// Set the MetricType to an invalid type.
SetRegistry(
testing::MutateProject(GetRegistry(), kCustomerId, kProjectId, [](ProjectConfig* project) {
project->mutable_metrics(kOccurrenceMetricMetricIndex)
->set_metric_type(MetricDefinition::INTEGER_HISTOGRAM);
}));
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
// This generation should fail because the aggregation procedure couldn't be created.
EXPECT_NE(
StatusCode::OK,
GenerateObservationsOnce(TimeInfo::FromDayIndex(0), TimeInfo::FromDayIndex(0)).error_code());
// Reset to default registry.
SetRegistry();
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
// Data should still be collected as expected.
for (uint32_t i = starting_time_info_.day_index + 1;
i <= starting_time_info_.day_index + kMaxDayIndex; i += 1) {
EXPECT_EQ(StatusCode::OK,
GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i))
.error_code());
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), i);
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
}
TEST_F(ObservationGeneratorTest, GeneratesDailyObservationsWithReportAllAsExpected) {
SetRegistry(
testing::MutateProject(GetRegistry(), kCustomerId, kProjectId, [](ProjectConfig* project) {
project->mutable_metrics(kOccurrenceMetricMetricIndex)
->mutable_reports(kOccurrenceMetricUniqueDeviceCountsReport1DayReportIndex)
->set_system_profile_selection(SystemProfileSelectionPolicy::REPORT_ALL);
}));
const uint32_t kMaxDayIndex = 5;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport1DayReportId];
for (uint32_t i = 1; i <= kMaxDayIndex; i += 1) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricUniqueDeviceCountsReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = 1; i <= kMaxDayIndex; i += 1) {
GenerateObservationsOnce(TimeInfo::FromDayIndex(i), TimeInfo::FromDayIndex(i));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), i);
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
}
TEST_F(ObservationGeneratorTest, GeneratesPrivateObservations) {
uint32_t kMaxHourId = 101;
int kNumPrivateObs = 2;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
auto* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report->mutable_hourly()->set_last_hour_id(starting_time_info_.hour_id - 2);
for (uint32_t i = starting_time_info_.hour_id; i < starting_time_info_.hour_id + kMaxHourId;
i += 2) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()->mutable_data()->set_count(
(i - starting_time_info_.hour_id + 1) * 100);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
observations.reserve(kNumPrivateObs);
std::vector<ObservationMetadata> metadatas;
metadatas.reserve(kNumPrivateObs);
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer,
std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs));
GenerateObservationsOnce(TimeInfo::FromHourId(starting_time_info_.hour_id),
TimeInfo::FromHourId(starting_time_info_.hour_id));
EXPECT_EQ(observations.size(), kNumPrivateObs);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(metadatas.size(), kNumPrivateObs);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info_.hour_id));
// Stored data's system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(), system_profile.system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(ObservationGeneratorTest, GeneratesPrivateObservationsForNoObservation) {
int kNumPrivateObs = 2;
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
auto* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report->mutable_hourly()->set_last_hour_id(starting_time_info_.hour_id - 2);
ASSERT_TRUE(aggregate.Save().ok());
}
std::vector<Observation> observations;
observations.reserve(kNumPrivateObs);
std::vector<ObservationMetadata> metadatas;
metadatas.reserve(kNumPrivateObs);
int contribution_count = 0;
TestObservationStoreWriter test_writer(
[&observations, &metadatas, &contribution_count](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() == kOccurrenceMetricFleetwideOccurrenceCountsReportReportId) {
metadatas.push_back(*metadata);
if (observation->has_unencrypted()) {
observations.push_back(observation->unencrypted());
}
if (!observation->contribution_id().empty()) {
contribution_count++;
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer,
std::make_unique<FakePrivacyEncoder>(true, kNumPrivateObs));
GenerateObservationsOnce(TimeInfo::FromHourId(starting_time_info_.hour_id),
TimeInfo::FromHourId(starting_time_info_.hour_id));
EXPECT_EQ(observations.size(), kNumPrivateObs);
EXPECT_EQ(contribution_count, 1);
for (const Observation& obs : observations) {
EXPECT_EQ(obs.random_id().size(), kRandomIdSize);
}
EXPECT_EQ(metadatas.size(), kNumPrivateObs);
for (const ObservationMetadata& metadata : metadatas) {
EXPECT_EQ(metadata.customer_id(), kCustomerId);
EXPECT_EQ(metadata.day_index(), util::HourIdToDayIndex(starting_time_info_.hour_id));
// Current system profile is used.
EXPECT_EQ(metadata.system_profile().system_version(),
system_data_.system_profile().system_version());
EXPECT_EQ(metadata.system_profile().board_name(), "");
}
}
TEST_F(ObservationGeneratorTest, GenerateObservationsWithCurrentSystemProfile) {
const uint32_t kDayIndex = starting_time_info_.day_index + 1;
SystemProfile system_profile;
// No system profile expect OS.
system_profile.set_os(SystemProfile::ANDROID);
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
// Use the report that has experiment tokens in the system profile.
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId];
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[kDayIndex]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() ==
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false),
/*generate_observations_with_current_system_profile=*/true);
GenerateObservationsOnce(TimeInfo::FromDayIndex(kDayIndex), TimeInfo::FromDayIndex(kDayIndex));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), kDayIndex);
// Unset system profile fields were set from the current system profile.
EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
EXPECT_EQ(last_metadata->system_profile().system_version(),
system_data_.system_profile().system_version());
ASSERT_EQ(last_metadata->system_profile().experiment_tokens_size(), 1);
EXPECT_EQ(last_metadata->system_profile().experiment_tokens(0).token(),
system_data_.system_profile().experiment_tokens(0).token());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
TEST_F(ObservationGeneratorTest, GenerateObservationsWithCurrentSystemProfileDoesNotOverwrite) {
const uint32_t kDayIndex = starting_time_info_.day_index + 1;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::ANDROID);
system_profile.set_system_version("100");
ExperimentToken* token = system_profile.add_experiment_tokens();
token->set_ns("ns2222");
token->set_token("token2222");
token = system_profile.add_experiment_tokens();
token->set_ns("ns3333");
token->set_token("token3333");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
// Use the report that has experiment tokens in the system profile.
ReportAggregate* report =
&(*aggregate.aggregate()->mutable_by_report_id())
[kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId];
SystemProfileAggregate* system_profile_agg =
(*report->mutable_daily()->mutable_by_day_index())[kDayIndex]
.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()
->mutable_data()
->mutable_at_least_once()
->set_at_least_once(true);
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
std::unique_ptr<ObservationMetadata> last_metadata;
std::unique_ptr<Observation> last_observation;
std::string last_contribution_id;
TestObservationStoreWriter test_writer(
[&last_metadata, &last_observation, &last_contribution_id](
std::unique_ptr<observation_store::StoredObservation> observation,
std::unique_ptr<ObservationMetadata> metadata) {
if (metadata->report_id() ==
kOccurrenceMetricUniqueDeviceCountsSelectFirstReport1DayReportId) {
last_metadata = std::move(metadata);
if (observation->has_unencrypted()) {
last_observation = std::unique_ptr<Observation>(observation->release_unencrypted());
last_contribution_id = observation->contribution_id();
}
}
});
logger::ObservationWriter observation_writer(&test_writer, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false),
/*generate_observations_with_current_system_profile=*/true);
GenerateObservationsOnce(TimeInfo::FromDayIndex(kDayIndex), TimeInfo::FromDayIndex(kDayIndex));
EXPECT_EQ(last_contribution_id.size(), kRandomIdSize);
ASSERT_TRUE(last_metadata);
EXPECT_EQ(last_metadata->customer_id(), kCustomerId);
EXPECT_EQ(last_metadata->day_index(), kDayIndex);
// All system profile fields were set from the aggregate system profile.
EXPECT_EQ(last_metadata->system_profile().os(), system_profile.os());
EXPECT_EQ(last_metadata->system_profile().system_version(), system_profile.system_version());
ASSERT_EQ(last_metadata->system_profile().experiment_tokens_size(), 2);
EXPECT_EQ(last_metadata->system_profile().experiment_tokens(0).token(),
system_profile.experiment_tokens(0).token());
EXPECT_EQ(last_metadata->system_profile().experiment_tokens(1).token(),
system_profile.experiment_tokens(1).token());
EXPECT_EQ(last_metadata->system_profile().board_name(), "");
ASSERT_TRUE(last_observation);
EXPECT_EQ(last_observation->random_id().size(), kRandomIdSize);
ASSERT_TRUE(last_observation->has_integer());
ASSERT_EQ(last_observation->integer().values_size(), 1);
EXPECT_EQ(last_observation->integer().values(0).value(), 1);
}
class BasicLogger : public logger::testing::FakeLogger {
public:
explicit BasicLogger(ObservationGeneratorTest* test) : test_(test) {}
Status LogInteger(uint32_t metric_id, int64_t value,
const std::vector<uint32_t>& event_codes) override {
// Lock the aggregate store to simulate actually performing log.
test_->aggregate_storage_->GetMetricAggregate(
lib::CustomerIdentifier(1).ForProject(1).ForMetric(1));
return logger::testing::FakeLogger::LogInteger(metric_id, value, event_codes);
}
private:
ObservationGeneratorTest* test_;
};
TEST_F(ObservationGeneratorTest, DoesNotDeadlock) {
aggregate_storage_ = LocalAggregateStorage::New(
LocalAggregateStorage::StorageStrategy::Delayed, local_aggregation_store_path(), fs(),
project_context_factory_.get(), metadata_builder_.get(), 0);
observation_store::FileObservationStore obs_store(10000, 10000, 10000, fs(), nullptr,
test_folder() + "/obs_store");
BasicLogger logger(this);
logger::InternalMetricsImpl internal_metrics(&logger, nullptr);
obs_store.ResetInternalMetrics(&internal_metrics);
const uint32_t kMaxHourId = 101;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
MetricAggregateRef aggregate = GetMetricAggregate(kOccurrenceMetricMetricId);
ReportAggregate* report =
&(*aggregate.aggregate()
->mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
for (uint32_t i = starting_time_info_.hour_id; i < starting_time_info_.hour_id + kMaxHourId;
i += 2) {
SystemProfileAggregate* system_profile_agg =
(*report->mutable_hourly()->mutable_by_hour_id())[i].add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->add_by_event_code()->mutable_data()->set_count(
(i - starting_time_info_.hour_id + 1) * 100);
}
aggregate.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(aggregate.Save().ok());
}
logger::ObservationWriter observation_writer(&obs_store, nullptr);
ConstructObservationGenerator(&observation_writer, std::make_unique<FakePrivacyEncoder>(false));
for (uint32_t i = starting_time_info_.hour_id; i < starting_time_info_.hour_id + kMaxHourId;
i += 4) {
GenerateObservationsOnce(TimeInfo::FromHourId(i), TimeInfo::FromHourId(i));
}
}
} // namespace cobalt::local_aggregation