blob: 9e56cdf4b51d66ff06d3fa4267249f276ea39343 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h"
#include <memory>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "src/lib/util/hash.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
#include "src/pb/metadata_builder.h"
#include "src/public/lib/statusor/statusor.h"
#include "src/registry/cobalt_registry.pb.h"
#include "src/system_data/fake_system_data.h"
namespace cobalt::local_aggregation {
using lib::statusor::StatusOr;
using ::testing::Contains;
using ::testing::Not;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
std::string bytes;
if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
} // namespace
class ImmediateLocalAggregateStorageTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
MakeTestFolder();
ReplaceRegistry();
}
public:
void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(),
size_t per_project_reserved_bytes = 0) {
system_data_.ResetChangeCallbacks();
system_data_.SetVersion("100");
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(std::move(registry));
mock_clock_ =
std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0));
mock_clock_->increment_by(std::chrono::hours(24 * 100));
validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get());
validated_clock_->SetAccurate(true);
metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(),
system_data_cache_path(), fs());
metadata_builder_->SnapshotSystemData();
storage_ = std::make_unique<ImmediateLocalAggregateStorage>(
test_folder(), fs(), global_project_context_factory_.get(), metadata_builder_.get(),
per_project_reserved_bytes);
}
protected:
system_data::FakeSystemData system_data_;
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<util::IncrementingSystemClock> mock_clock_;
std::unique_ptr<util::FakeValidatedClock> validated_clock_;
std::unique_ptr<MetadataBuilder> metadata_builder_;
std::unique_ptr<ImmediateLocalAggregateStorage> storage_;
};
TEST_F(ImmediateLocalAggregateStorageTest, MakesExpectedFiles) {
// Root directory should contain expected customer_id: 123
ASSERT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Contains("123"));
// Customer 123 directory should contain expected project: 100
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(),
Contains("100"));
// Customer 123 Project 100 directory should *not* contain the metric file: 1
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Not(Contains("1")));
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
agg.Save();
}
EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(),
Contains(kFilteredSystemProfilesFile));
// Customer 123 Project 100 directory should contain the metric file: 1
EXPECT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
}
TEST_F(ImmediateLocalAggregateStorageTest, CanReadAlreadyWrittenFiles) {
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_TRUE(agg.Save().ok());
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> retrieved_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
EXPECT_TRUE(agg.Save().ok());
EXPECT_EQ(retrieved_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
}
TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldReports) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(1000);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[1000];
SystemProfileAggregate* system_profile_aggregate =
mutable_bucket.add_system_profile_aggregates();
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
auto registry = GetRegistry();
google::protobuf::RepeatedPtrField<ReportDefinition>* reports =
registry->mutable_customers(0)
->mutable_projects(0)
->mutable_metrics(kOccurrenceMetricMetricIndex)
->mutable_reports();
reports->erase(reports->begin() + kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex);
ReplaceRegistry(std::move(registry));
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef metric_agg = agg_or.ConsumeValueOrDie();
EXPECT_FALSE(metric_agg.aggregate()->by_report_id().contains(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId));
}
TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldMetrics) {
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
agg_or.ConsumeValueOrDie().Save();
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
auto registry = GetRegistry();
google::protobuf::RepeatedPtrField<MetricDefinition>* metrics =
registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics();
metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex);
ReplaceRegistry(std::move(registry));
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Not(Contains("1")));
}
TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldProjects) {
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
agg_or.ConsumeValueOrDie().Save();
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(),
Contains("100"));
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
auto registry = GetRegistry();
registry->mutable_customers(0)->mutable_projects()->RemoveLast();
ReplaceRegistry(std::move(registry));
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(),
Not(Contains("100")));
}
TEST_F(ImmediateLocalAggregateStorageTest, MigrateDailyToSystemProfileAggregates) {
{
// Have the storage create an aggregate file with a single MetricAggregate in it.
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
.ConsumeValueOrDie(),
Contains("1"));
// Load the written metric aggregate file.
MetricAggregate metric_agg;
Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
&metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_by_event_code field.
uint32_t day_index = cobalt::util::TimeToDayIndex(
std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC);
ReportAggregate& report_agg =
(*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index));
const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(),
util::DayIndexToUnixSeconds(day_index));
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::DayIndexToUnixSeconds(day_index));
EXPECT_TRUE(system_profile_aggregate.by_event_code(0).data().at_least_once().at_least_once());
StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile(
system_profile_aggregate.system_profile_hash());
ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK);
SystemProfile system_profile = system_profile_or.ConsumeValueOrDie();
EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version());
EXPECT_EQ(system_profile.os(), system_data_.system_profile().os());
EXPECT_EQ(system_profile.board_name(), "");
}
TEST_F(ImmediateLocalAggregateStorageTest, MigrateHourlyToSystemProfileAggregates) {
{
// Have the storage create an aggregate file with a single MetricAggregate in it.
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
.ConsumeValueOrDie(),
Contains("1"));
// Load the written metric aggregate file.
MetricAggregate metric_agg;
Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
&metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_by_event_code field.
uint32_t hour_id = cobalt::util::TimeToHourId(
std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC);
ReportAggregate& report_agg =
(*metric_agg
.mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId];
report_agg.mutable_hourly()->set_last_hour_id(hour_id);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id];
EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code();
data->mutable_data()->set_count(10);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie();
report_agg = migrated_metric_agg.aggregate()->by_report_id().at(
kOccurrenceMetricFleetwideOccurrenceCountsReportReportId);
ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id));
const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id);
EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0);
ASSERT_EQ(bucket.system_profile_aggregates_size(), 1);
const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0);
EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), util::HourIdToUnixSeconds(hour_id));
EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::HourIdToUnixSeconds(hour_id));
EXPECT_EQ(system_profile_aggregate.by_event_code(0).data().count(), 10);
StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile(
system_profile_aggregate.system_profile_hash());
ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK);
SystemProfile system_profile = system_profile_or.ConsumeValueOrDie();
EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version());
EXPECT_EQ(system_profile.os(), system_data_.system_profile().os());
EXPECT_EQ(system_profile.board_name(), "");
}
TEST_F(ImmediateLocalAggregateStorageTest, MigrateAtLeastOnce) {
{
// Have the storage create an aggregate file with a single MetricAggregate in it.
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId))
.ConsumeValueOrDie(),
Contains("1"));
// Load the written metric aggregate file.
MetricAggregate metric_agg;
Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
&metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
// Update the stored local aggregate data to use the deprecated_at_least_once type.
uint32_t day_index = 100;
ReportAggregate& report_agg =
(*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_aggregate = mutable_bucket.add_system_profile_aggregates();
EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code();
data->mutable_data()->set_deprecated_at_least_once(true);
// Write the updated local aggregate storage file and recreate the storage so it reads the file.
status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/",
kOccurrenceMetricMetricId),
metric_agg);
ASSERT_EQ(status.error_code(), StatusCode::OK);
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
report_agg = agg_or.ConsumeValueOrDie().aggregate()->by_report_id().at(
kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId);
ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index));
const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index);
EXPECT_FALSE(
bucket.system_profile_aggregates(0).by_event_code(0).data().has_deprecated_at_least_once());
EXPECT_TRUE(
bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().at_least_once());
EXPECT_EQ(
bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().last_day_index(),
day_index);
}
TEST_F(ImmediateLocalAggregateStorageTest, DeleteDataWorks) {
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.StoreFilteredSystemProfile(12345ULL, SystemProfile());
agg.Save();
}
EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(),
Contains(kFilteredSystemProfilesFile));
ASSERT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Contains("123"));
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(),
Contains("100"));
EXPECT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
storage_->DeleteData();
EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(),
Not(Contains(kFilteredSystemProfilesFile)));
EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Not(Contains("123")));
}
TEST_F(ImmediateLocalAggregateStorageTest, StoreAndSave) {
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
{
// Have the storage create an aggregate file with a single MetricAggregate in it.
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
// Add some aggregate data broken down by system profile for the metric.
uint32_t day_index = 100;
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index - 1);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1000);
system_profile_agg->set_first_seen_timestamp(1000);
EventCodesAggregateData* data = system_profile_agg->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
} // Scope ensures the first MetricAggregateRef gets deleted before another one is created.
// Allow the system to garbage collect unused system profiles.
storage_->GarbageCollection();
{
// Make sure the system profile can be retrieved.
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
}
// Reload the storage to make sure the system profile changes are persisted to files.
ReplaceRegistry();
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
}
TEST_F(ImmediateLocalAggregateStorageTest, GarbageCollection) {
uint32_t day_index = 100;
SystemProfile system_profile;
system_profile.set_os(SystemProfile::FUCHSIA);
system_profile.set_system_version("100");
uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString());
// Have the storage create an aggregate file with a single MetricAggregate in it.
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
// Add some aggregate data broken down by system profile for the metric.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
report_agg.mutable_daily()->set_last_day_index(day_index - 1);
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates();
system_profile_agg->set_system_profile_hash(system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1000);
system_profile_agg->set_first_seen_timestamp(1000);
EventCodesAggregateData* data = system_profile_agg->add_by_event_code();
data->mutable_data()->mutable_at_least_once()->set_at_least_once(true);
agg.StoreFilteredSystemProfile(system_profile_hash, system_profile);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
// Allow the system to garbage collect unused system profiles, none should be removed.
storage_->GarbageCollection();
// Update the system profile due to an OTA to a new version.
SystemProfile updated_system_profile(system_profile);
updated_system_profile.set_system_version("101");
uint64_t updated_system_profile_hash =
util::Farmhash64(updated_system_profile.SerializeAsString());
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
system_profile.system_version());
// Update the aggregate data to use the updated system profile.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
AggregationPeriodBucket& mutable_bucket =
(*report_agg.mutable_daily()->mutable_by_day_index())[day_index];
SystemProfileAggregate* system_profile_agg =
mutable_bucket.mutable_system_profile_aggregates(0);
system_profile_agg->set_system_profile_hash(updated_system_profile_hash);
system_profile_agg->set_last_seen_timestamp(1001);
agg.StoreFilteredSystemProfile(updated_system_profile_hash, updated_system_profile);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
// Allow the system to garbage collect unused system profiles, like the old system profile.
storage_->GarbageCollection();
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(system_profile_hash);
EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND);
stored_system_profile = agg.RetrieveFilteredSystemProfile(updated_system_profile_hash);
ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK);
EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(),
updated_system_profile.system_version());
// Remove the aggregate data that used the updated system profile.
ReportAggregate& report_agg =
(*agg.aggregate()
->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId];
(*report_agg.mutable_daily()->mutable_by_day_index()).erase(day_index);
// Save all the changes to the metric aggregate.
ASSERT_EQ(agg.Save().error_code(), StatusCode::OK);
}
// Allow the system to garbage collect unused system profiles, all should be removed.
storage_->GarbageCollection();
{
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId)
.ForProject(kProjectId)
.ForMetric(kOccurrenceMetricMetricId));
ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK);
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
StatusOr<SystemProfile> stored_system_profile =
agg.RetrieveFilteredSystemProfile(updated_system_profile_hash);
EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND);
}
}
TEST_F(ImmediateLocalAggregateStorageTest, UsageTrackingWorks) {
// Set per-project-reserved-bytes to 2
ReplaceRegistry(GetRegistry(), 2);
lib::ProjectIdentifier proj = lib::CustomerIdentifier(123).ForProject(100);
ASSERT_EQ(storage_->AmountStored(), 0);
ASSERT_EQ(storage_->AmountStored(proj), 0);
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(100); // Aggregate should now take up 2 bytes
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 0);
ASSERT_EQ(storage_->AmountStored(), 2);
ASSERT_EQ(storage_->AmountStored(proj), 2);
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes.
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 2);
ASSERT_EQ(storage_->AmountStored(), 4);
ASSERT_EQ(storage_->AmountStored(proj), 4);
// Add another 4 bytes. All should be counted by slush.
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(2));
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = std::move(agg_or.ValueOrDie());
agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes.
agg.Save();
}
ASSERT_EQ(storage_->SlushUsed(), 6);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
// Reloading the store from disk, the usage counts should remain the same.
ReplaceRegistry(GetRegistry(), 2);
ASSERT_EQ(storage_->SlushUsed(), 6);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
// Reloading with different slush size should only update SlushUsed.
ReplaceRegistry(GetRegistry(), 4);
ASSERT_EQ(storage_->SlushUsed(), 4);
ASSERT_EQ(storage_->AmountStored(), 8);
ASSERT_EQ(storage_->AmountStored(proj), 8);
}
class ImmediateLocalAggregateStorageMissingDirectoryTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
MakeTestFolder();
ReplaceRegistry();
}
public:
void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry()) {
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(std::move(registry));
storage_ = std::make_unique<ImmediateLocalAggregateStorage>(
test_subfolder(), fs(), global_project_context_factory_.get(), nullptr, 0);
}
protected:
std::string test_subfolder() { return absl::StrCat(test_folder(), "/sub"); }
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<ImmediateLocalAggregateStorage> storage_;
};
TEST_F(ImmediateLocalAggregateStorageMissingDirectoryTest, DontAssumeDirectoryExists) {
// Root directory should contain expected customer_id: 123
ASSERT_THAT(fs()->ListFiles(test_subfolder()).ConsumeValueOrDie(), Contains("123"));
// Customer 123 directory should contain expected project: 100
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123")).ConsumeValueOrDie(),
Contains("100"));
// Customer 123 Project 100 directory should *not* contain the metric file: 1
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123/100")).ConsumeValueOrDie(),
Not(Contains("1")));
StatusOr<MetricAggregateRef> agg_or =
storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1));
ASSERT_TRUE(agg_or.ok());
agg_or.ConsumeValueOrDie().Save();
// Customer 123 Project 100 directory should contain the metric file: 1
ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123/100")).ConsumeValueOrDie(),
Contains("1"));
}
} // namespace cobalt::local_aggregation