| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/immediate_local_aggregate_storage.h" |
| |
| #include <memory> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/strings/escaping.h" |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/util/hash.h" |
| #include "src/lib/util/testing/test_with_files.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/logging.h" |
| #include "src/pb/metadata_builder.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/registry/cobalt_registry.pb.h" |
| #include "src/system_data/fake_system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| using lib::statusor::StatusOr; |
| using ::testing::Contains; |
| using ::testing::Not; |
| using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef; |
| |
| namespace { |
| |
| std::unique_ptr<CobaltRegistry> GetRegistry() { |
| std::string bytes; |
| if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) { |
| LOG(ERROR) << "Unable to decode Base64 String"; |
| return nullptr; |
| } |
| |
| auto registry = std::make_unique<CobaltRegistry>(); |
| if (!registry->ParseFromString(bytes)) { |
| LOG(ERROR) << "Unable to parse registry from bytes"; |
| return nullptr; |
| } |
| |
| return registry; |
| } |
| |
| } // namespace |
| |
| class ImmediateLocalAggregateStorageTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| MakeTestFolder(); |
| ReplaceRegistry(); |
| } |
| |
| public: |
| void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(), |
| size_t per_project_reserved_bytes = 0) { |
| system_data_.ResetChangeCallbacks(); |
| system_data_.SetVersion("100"); |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(std::move(registry)); |
| mock_clock_ = |
| std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0)); |
| mock_clock_->increment_by(std::chrono::hours(24 * 100)); |
| validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get()); |
| validated_clock_->SetAccurate(true); |
| metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(), |
| system_data_cache_path(), fs()); |
| metadata_builder_->SnapshotSystemData(); |
| storage_ = std::make_unique<ImmediateLocalAggregateStorage>( |
| test_folder(), fs(), global_project_context_factory_.get(), metadata_builder_.get(), |
| per_project_reserved_bytes); |
| } |
| |
| protected: |
| system_data::FakeSystemData system_data_; |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<util::IncrementingSystemClock> mock_clock_; |
| std::unique_ptr<util::FakeValidatedClock> validated_clock_; |
| std::unique_ptr<MetadataBuilder> metadata_builder_; |
| std::unique_ptr<ImmediateLocalAggregateStorage> storage_; |
| }; |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, MakesExpectedFiles) { |
| // Root directory should contain expected customer_id: 123 |
| ASSERT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Contains("123")); |
| |
| // Customer 123 directory should contain expected project: 100 |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(), |
| Contains("100")); |
| |
| // Customer 123 Project 100 directory should *not* contain the metric file: 1 |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Not(Contains("1"))); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile()); |
| agg.Save(); |
| } |
| |
| EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), |
| Contains(kFilteredSystemProfilesFile)); |
| // Customer 123 Project 100 directory should contain the metric file: 1 |
| EXPECT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, CanReadAlreadyWrittenFiles) { |
| SystemProfile system_profile; |
| system_profile.set_os(SystemProfile::FUCHSIA); |
| system_profile.set_system_version("100"); |
| uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString()); |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| agg.aggregate()->set_version(100); |
| agg.StoreFilteredSystemProfile(system_profile_hash, system_profile); |
| ASSERT_TRUE(agg.Save().ok()); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> retrieved_system_profile = |
| agg.RetrieveFilteredSystemProfile(system_profile_hash); |
| EXPECT_TRUE(agg.Save().ok()); |
| EXPECT_EQ(retrieved_system_profile.ConsumeValueOrDie().system_version(), |
| system_profile.system_version()); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldReports) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(1000); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[1000]; |
| SystemProfileAggregate* system_profile_aggregate = |
| mutable_bucket.add_system_profile_aggregates(); |
| EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code(); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| |
| auto registry = GetRegistry(); |
| google::protobuf::RepeatedPtrField<ReportDefinition>* reports = |
| registry->mutable_customers(0) |
| ->mutable_projects(0) |
| ->mutable_metrics(kOccurrenceMetricMetricIndex) |
| ->mutable_reports(); |
| reports->erase(reports->begin() + kOccurrenceMetricUniqueDeviceCountsReport7DaysReportIndex); |
| ReplaceRegistry(std::move(registry)); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef metric_agg = agg_or.ConsumeValueOrDie(); |
| EXPECT_FALSE(metric_agg.aggregate()->by_report_id().contains( |
| kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId)); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldMetrics) { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| agg_or.ConsumeValueOrDie().Save(); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| |
| auto registry = GetRegistry(); |
| google::protobuf::RepeatedPtrField<MetricDefinition>* metrics = |
| registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics(); |
| metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex); |
| ReplaceRegistry(std::move(registry)); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Not(Contains("1"))); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, CleansUpOldProjects) { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| agg_or.ConsumeValueOrDie().Save(); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(), |
| Contains("100")); |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| |
| auto registry = GetRegistry(); |
| registry->mutable_customers(0)->mutable_projects()->RemoveLast(); |
| ReplaceRegistry(std::move(registry)); |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(), |
| Not(Contains("100"))); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, MigrateDailyToSystemProfileAggregates) { |
| { |
| // Have the storage create an aggregate file with a single MetricAggregate in it. |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains("1")); |
| |
| // Load the written metric aggregate file. |
| MetricAggregate metric_agg; |
| Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| &metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // Update the stored local aggregate data to use the deprecated_by_event_code field. |
| uint32_t day_index = cobalt::util::TimeToDayIndex( |
| std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC); |
| ReportAggregate& report_agg = |
| (*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(day_index); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code(); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| |
| // Write the updated local aggregate storage file and recreate the storage so it reads the file. |
| status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie(); |
| report_agg = migrated_metric_agg.aggregate()->by_report_id().at( |
| kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId); |
| ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index)); |
| const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index); |
| EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0); |
| ASSERT_EQ(bucket.system_profile_aggregates_size(), 1); |
| const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), |
| util::DayIndexToUnixSeconds(day_index)); |
| EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::DayIndexToUnixSeconds(day_index)); |
| EXPECT_TRUE(system_profile_aggregate.by_event_code(0).data().at_least_once().at_least_once()); |
| |
| StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile( |
| system_profile_aggregate.system_profile_hash()); |
| ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK); |
| SystemProfile system_profile = system_profile_or.ConsumeValueOrDie(); |
| EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version()); |
| EXPECT_EQ(system_profile.os(), system_data_.system_profile().os()); |
| EXPECT_EQ(system_profile.board_name(), ""); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, MigrateHourlyToSystemProfileAggregates) { |
| { |
| // Have the storage create an aggregate file with a single MetricAggregate in it. |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains("1")); |
| |
| // Load the written metric aggregate file. |
| MetricAggregate metric_agg; |
| Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| &metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // Update the stored local aggregate data to use the deprecated_by_event_code field. |
| uint32_t hour_id = cobalt::util::TimeToHourId( |
| std::chrono::system_clock::to_time_t(mock_clock_->now()), MetricDefinition::UTC); |
| ReportAggregate& report_agg = |
| (*metric_agg |
| .mutable_by_report_id())[kOccurrenceMetricFleetwideOccurrenceCountsReportReportId]; |
| report_agg.mutable_hourly()->set_last_hour_id(hour_id); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id]; |
| EventCodesAggregateData* data = mutable_bucket.add_deprecated_by_event_code(); |
| data->mutable_data()->set_count(10); |
| |
| // Write the updated local aggregate storage file and recreate the storage so it reads the file. |
| status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie(); |
| report_agg = migrated_metric_agg.aggregate()->by_report_id().at( |
| kOccurrenceMetricFleetwideOccurrenceCountsReportReportId); |
| ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id)); |
| const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id); |
| EXPECT_EQ(bucket.deprecated_by_event_code_size(), 0); |
| ASSERT_EQ(bucket.system_profile_aggregates_size(), 1); |
| const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), util::HourIdToUnixSeconds(hour_id)); |
| EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), util::HourIdToUnixSeconds(hour_id)); |
| EXPECT_EQ(system_profile_aggregate.by_event_code(0).data().count(), 10); |
| |
| StatusOr<SystemProfile> system_profile_or = migrated_metric_agg.RetrieveFilteredSystemProfile( |
| system_profile_aggregate.system_profile_hash()); |
| ASSERT_EQ(system_profile_or.status().error_code(), StatusCode::OK); |
| SystemProfile system_profile = system_profile_or.ConsumeValueOrDie(); |
| EXPECT_EQ(system_profile.system_version(), system_data_.system_profile().system_version()); |
| EXPECT_EQ(system_profile.os(), system_data_.system_profile().os()); |
| EXPECT_EQ(system_profile.board_name(), ""); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, MigrateAtLeastOnce) { |
| { |
| // Have the storage create an aggregate file with a single MetricAggregate in it. |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains("1")); |
| |
| // Load the written metric aggregate file. |
| MetricAggregate metric_agg; |
| Status status = fs()->Read(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| &metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| |
| // Update the stored local aggregate data to use the deprecated_at_least_once type. |
| uint32_t day_index = 100; |
| ReportAggregate& report_agg = |
| (*metric_agg.mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(day_index); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| SystemProfileAggregate* system_profile_aggregate = mutable_bucket.add_system_profile_aggregates(); |
| EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code(); |
| data->mutable_data()->set_deprecated_at_least_once(true); |
| |
| // Write the updated local aggregate storage file and recreate the storage so it reads the file. |
| status = fs()->Write(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId, "/", |
| kOccurrenceMetricMetricId), |
| metric_agg); |
| ASSERT_EQ(status.error_code(), StatusCode::OK); |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| report_agg = agg_or.ConsumeValueOrDie().aggregate()->by_report_id().at( |
| kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId); |
| ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index)); |
| const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index); |
| EXPECT_FALSE( |
| bucket.system_profile_aggregates(0).by_event_code(0).data().has_deprecated_at_least_once()); |
| EXPECT_TRUE( |
| bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().at_least_once()); |
| EXPECT_EQ( |
| bucket.system_profile_aggregates(0).by_event_code(0).data().at_least_once().last_day_index(), |
| day_index); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, DeleteDataWorks) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| agg.StoreFilteredSystemProfile(uint64_t{12345}, SystemProfile()); |
| agg.Save(); |
| } |
| |
| EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), |
| Contains(kFilteredSystemProfilesFile)); |
| ASSERT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Contains("123")); |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123")).ConsumeValueOrDie(), |
| Contains("100")); |
| EXPECT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| |
| storage_->DeleteData(); |
| |
| EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), |
| Not(Contains(kFilteredSystemProfilesFile))); |
| EXPECT_THAT(fs()->ListFiles(test_folder()).ConsumeValueOrDie(), Not(Contains("123"))); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, StoreAndSave) { |
| SystemProfile system_profile; |
| system_profile.set_os(SystemProfile::FUCHSIA); |
| system_profile.set_system_version("100"); |
| uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString()); |
| |
| { |
| // Have the storage create an aggregate file with a single MetricAggregate in it. |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| // Add some aggregate data broken down by system profile for the metric. |
| uint32_t day_index = 100; |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(day_index - 1); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates(); |
| system_profile_agg->set_system_profile_hash(system_profile_hash); |
| system_profile_agg->set_last_seen_timestamp(1000); |
| system_profile_agg->set_first_seen_timestamp(1000); |
| EventCodesAggregateData* data = system_profile_agg->add_by_event_code(); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| |
| agg.StoreFilteredSystemProfile(system_profile_hash, system_profile); |
| |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| // Allow the system to garbage collect unused system profiles. |
| storage_->GarbageCollection(); |
| |
| { |
| // Make sure the system profile can be retrieved. |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> stored_system_profile = |
| agg.RetrieveFilteredSystemProfile(system_profile_hash); |
| ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK); |
| EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(), |
| system_profile.system_version()); |
| } |
| |
| // Reload the storage to make sure the system profile changes are persisted to files. |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> stored_system_profile = |
| agg.RetrieveFilteredSystemProfile(system_profile_hash); |
| ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK); |
| EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(), |
| system_profile.system_version()); |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, GarbageCollection) { |
| uint32_t day_index = 100; |
| SystemProfile system_profile; |
| system_profile.set_os(SystemProfile::FUCHSIA); |
| system_profile.set_system_version("100"); |
| uint64_t system_profile_hash = util::Farmhash64(system_profile.SerializeAsString()); |
| |
| // Have the storage create an aggregate file with a single MetricAggregate in it. |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| // Add some aggregate data broken down by system profile for the metric. |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(day_index - 1); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| SystemProfileAggregate* system_profile_agg = mutable_bucket.add_system_profile_aggregates(); |
| system_profile_agg->set_system_profile_hash(system_profile_hash); |
| system_profile_agg->set_last_seen_timestamp(1000); |
| system_profile_agg->set_first_seen_timestamp(1000); |
| EventCodesAggregateData* data = system_profile_agg->add_by_event_code(); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| |
| agg.StoreFilteredSystemProfile(system_profile_hash, system_profile); |
| |
| // Save all the changes to the metric aggregate. |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } |
| |
| // Allow the system to garbage collect unused system profiles, none should be removed. |
| storage_->GarbageCollection(); |
| |
| // Update the system profile due to an OTA to a new version. |
| SystemProfile updated_system_profile(system_profile); |
| updated_system_profile.set_system_version("101"); |
| uint64_t updated_system_profile_hash = |
| util::Farmhash64(updated_system_profile.SerializeAsString()); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> stored_system_profile = |
| agg.RetrieveFilteredSystemProfile(system_profile_hash); |
| ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK); |
| EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(), |
| system_profile.system_version()); |
| |
| // Update the aggregate data to use the updated system profile. |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| SystemProfileAggregate* system_profile_agg = |
| mutable_bucket.mutable_system_profile_aggregates(0); |
| system_profile_agg->set_system_profile_hash(updated_system_profile_hash); |
| system_profile_agg->set_last_seen_timestamp(1001); |
| |
| agg.StoreFilteredSystemProfile(updated_system_profile_hash, updated_system_profile); |
| |
| // Save all the changes to the metric aggregate. |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } |
| |
| // Allow the system to garbage collect unused system profiles, like the old system profile. |
| storage_->GarbageCollection(); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> stored_system_profile = |
| agg.RetrieveFilteredSystemProfile(system_profile_hash); |
| EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND); |
| stored_system_profile = agg.RetrieveFilteredSystemProfile(updated_system_profile_hash); |
| ASSERT_EQ(stored_system_profile.status().error_code(), StatusCode::OK); |
| EXPECT_EQ(stored_system_profile.ConsumeValueOrDie().system_version(), |
| updated_system_profile.system_version()); |
| |
| // Remove the aggregate data that used the updated system profile. |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| (*report_agg.mutable_daily()->mutable_by_day_index()).erase(day_index); |
| |
| // Save all the changes to the metric aggregate. |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } |
| |
| // Allow the system to garbage collect unused system profiles, all should be removed. |
| storage_->GarbageCollection(); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| StatusOr<SystemProfile> stored_system_profile = |
| agg.RetrieveFilteredSystemProfile(updated_system_profile_hash); |
| EXPECT_EQ(stored_system_profile.status().error_code(), StatusCode::NOT_FOUND); |
| } |
| } |
| |
| TEST_F(ImmediateLocalAggregateStorageTest, UsageTrackingWorks) { |
| // Set per-project-reserved-bytes to 2 |
| ReplaceRegistry(GetRegistry(), 2); |
| |
| lib::ProjectIdentifier proj = lib::CustomerIdentifier(123).ForProject(100); |
| ASSERT_EQ(storage_->AmountStored(), 0); |
| ASSERT_EQ(storage_->AmountStored(proj), 0); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = std::move(agg_or.ValueOrDie()); |
| agg.aggregate()->set_version(100); // Aggregate should now take up 2 bytes |
| agg.Save(); |
| } |
| |
| ASSERT_EQ(storage_->SlushUsed(), 0); |
| ASSERT_EQ(storage_->AmountStored(), 2); |
| ASSERT_EQ(storage_->AmountStored(proj), 2); |
| |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = std::move(agg_or.ValueOrDie()); |
| agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes. |
| agg.Save(); |
| } |
| |
| ASSERT_EQ(storage_->SlushUsed(), 2); |
| ASSERT_EQ(storage_->AmountStored(), 4); |
| ASSERT_EQ(storage_->AmountStored(proj), 4); |
| |
| // Add another 4 bytes. All should be counted by slush. |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(proj.ForMetric(2)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = std::move(agg_or.ValueOrDie()); |
| agg.aggregate()->set_version(1000000); // Aggregate should now take up 4 bytes. |
| agg.Save(); |
| } |
| |
| ASSERT_EQ(storage_->SlushUsed(), 6); |
| ASSERT_EQ(storage_->AmountStored(), 8); |
| ASSERT_EQ(storage_->AmountStored(proj), 8); |
| |
| // Reloading the store from disk, the usage counts should remain the same. |
| ReplaceRegistry(GetRegistry(), 2); |
| ASSERT_EQ(storage_->SlushUsed(), 6); |
| ASSERT_EQ(storage_->AmountStored(), 8); |
| ASSERT_EQ(storage_->AmountStored(proj), 8); |
| |
| // Reloading with different slush size should only update SlushUsed. |
| ReplaceRegistry(GetRegistry(), 4); |
| ASSERT_EQ(storage_->SlushUsed(), 4); |
| ASSERT_EQ(storage_->AmountStored(), 8); |
| ASSERT_EQ(storage_->AmountStored(proj), 8); |
| } |
| |
| class ImmediateLocalAggregateStorageMissingDirectoryTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| MakeTestFolder(); |
| ReplaceRegistry(); |
| } |
| |
| public: |
| void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry()) { |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(std::move(registry)); |
| storage_ = std::make_unique<ImmediateLocalAggregateStorage>( |
| test_subfolder(), fs(), global_project_context_factory_.get(), nullptr, 0); |
| } |
| |
| protected: |
| std::string test_subfolder() { return absl::StrCat(test_folder(), "/sub"); } |
| |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<ImmediateLocalAggregateStorage> storage_; |
| }; |
| |
| TEST_F(ImmediateLocalAggregateStorageMissingDirectoryTest, DontAssumeDirectoryExists) { |
| // Root directory should contain expected customer_id: 123 |
| ASSERT_THAT(fs()->ListFiles(test_subfolder()).ConsumeValueOrDie(), Contains("123")); |
| |
| // Customer 123 directory should contain expected project: 100 |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123")).ConsumeValueOrDie(), |
| Contains("100")); |
| |
| // Customer 123 Project 100 directory should *not* contain the metric file: 1 |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123/100")).ConsumeValueOrDie(), |
| Not(Contains("1"))); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(123).ForProject(100).ForMetric(1)); |
| ASSERT_TRUE(agg_or.ok()); |
| agg_or.ConsumeValueOrDie().Save(); |
| |
| // Customer 123 Project 100 directory should contain the metric file: 1 |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_subfolder(), "/123/100")).ConsumeValueOrDie(), |
| Contains("1")); |
| } |
| |
| } // namespace cobalt::local_aggregation |