| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/strings/escaping.h" |
| #include "src/lib/util/testing/test_with_files.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logging.h" |
| #include "src/public/lib/statusor/statusor.h" |
| #include "src/system_data/fake_system_data.h" |
| |
| namespace cobalt::local_aggregation { |
| |
| using lib::statusor::StatusOr; |
| using ::testing::Contains; |
| using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef; |
| |
| namespace { |
| |
| std::unique_ptr<CobaltRegistry> GetRegistry() { |
| std::string bytes; |
| if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) { |
| LOG(ERROR) << "Unable to decode Base64 String"; |
| return nullptr; |
| } |
| |
| auto registry = std::make_unique<CobaltRegistry>(); |
| if (!registry->ParseFromString(bytes)) { |
| LOG(ERROR) << "Unable to parse registry from bytes"; |
| return nullptr; |
| } |
| |
| return registry; |
| } |
| |
| } // namespace |
| |
| class LocalAggregateStorageTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| MakeTestFolder(); |
| ReplaceRegistry(); |
| } |
| |
| public: |
| void ReplaceRegistry(std::unique_ptr<CobaltRegistry> registry = GetRegistry(), |
| int64_t per_project_reserved_bytes = 0) { |
| system_data_.ResetChangeCallbacks(); |
| system_data_.SetVersion("100"); |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(std::move(registry)); |
| mock_clock_ = |
| std::make_unique<util::IncrementingSystemClock>(std::chrono::system_clock::duration(0)); |
| mock_clock_->increment_by(std::chrono::hours(24 * 100)); |
| validated_clock_ = std::make_unique<util::FakeValidatedClock>(mock_clock_.get()); |
| validated_clock_->SetAccurate(true); |
| metadata_builder_ = std::make_unique<MetadataBuilder>(&system_data_, validated_clock_.get(), |
| system_data_cache_path(), fs()); |
| metadata_builder_->SnapshotSystemData(); |
| storage_ = LocalAggregateStorage::New( |
| LocalAggregateStorage::StorageStrategy::Immediate, test_folder(), fs(), |
| global_project_context_factory_.get(), metadata_builder_.get(), per_project_reserved_bytes); |
| } |
| |
| protected: |
| system_data::FakeSystemData system_data_; |
| std::unique_ptr<util::IncrementingSystemClock> mock_clock_; |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<LocalAggregateStorage> storage_; |
| std::unique_ptr<util::FakeValidatedClock> validated_clock_; |
| std::unique_ptr<MetadataBuilder> metadata_builder_; |
| }; |
| |
| TEST_F(LocalAggregateStorageTest, HandlesBookkeepingAsExpected) { |
| lib::ProjectIdentifier proj = lib::CustomerIdentifier(123).ForProject(100); |
| |
| // Passing a positive size_increase works as expected |
| storage_->UpdateProjectSizeBy(proj, 10); |
| EXPECT_EQ(storage_->AmountStored(proj), 10); |
| |
| // Passing a negative size_increase works as expected |
| storage_->UpdateProjectSizeBy(proj, -5); |
| EXPECT_EQ(storage_->AmountStored(proj), 5); |
| |
| // If the size increase is too large and would take the value negative, it should be clamped to 0. |
| storage_->UpdateProjectSizeBy(proj, -8); |
| EXPECT_EQ(storage_->AmountStored(proj), 0); |
| } |
| |
| TEST_F(LocalAggregateStorageTest, MigrateDailyReportAllToSelectLast) { |
| // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for |
| // a REPORT_ALL metric. |
| uint64_t system_profile_hash_1 = uint64_t{1234}; |
| uint64_t system_profile_hash_2 = uint64_t{5678}; |
| uint32_t day_index = 1000; |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| ReportAggregate& report_agg = |
| (*agg.aggregate() |
| ->mutable_by_report_id())[kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId]; |
| report_agg.mutable_daily()->set_last_day_index(day_index); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_daily()->mutable_by_day_index())[day_index]; |
| |
| SystemProfileAggregate* system_profile_aggregate = |
| mutable_bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash_1); |
| system_profile_aggregate->set_first_seen_timestamp(9000); |
| system_profile_aggregate->set_last_seen_timestamp(9001); |
| EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code(); |
| data->add_event_codes(1); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| |
| system_profile_aggregate = mutable_bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash_2); |
| system_profile_aggregate->set_first_seen_timestamp(10000); |
| system_profile_aggregate->set_last_seen_timestamp(10001); |
| data = system_profile_aggregate->add_by_event_code(); |
| data->add_event_codes(2); |
| data->mutable_data()->mutable_at_least_once()->set_at_least_once(true); |
| |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains("1")); |
| |
| // Reload the storage, triggering a migration. |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie(); |
| const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at( |
| kOccurrenceMetricUniqueDeviceCountsReport7DaysReportId); |
| ASSERT_TRUE(report_agg.daily().by_day_index().contains(day_index)); |
| const AggregationPeriodBucket& bucket = report_agg.daily().by_day_index().at(day_index); |
| |
| ASSERT_EQ(bucket.system_profile_aggregates_size(), 1); |
| const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_2); |
| EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000); |
| EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001); |
| |
| ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2); |
| const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0); |
| ASSERT_EQ(data.event_codes_size(), 1); |
| EXPECT_EQ(data.event_codes(0), 1); |
| EXPECT_TRUE(data.data().at_least_once().at_least_once()); |
| const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1); |
| ASSERT_EQ(data2.event_codes_size(), 1); |
| EXPECT_EQ(data2.event_codes(0), 2); |
| EXPECT_TRUE(data2.data().at_least_once().at_least_once()); |
| } |
| |
| TEST_F(LocalAggregateStorageTest, MigrateHourlyReportAllToSelectFirst) { |
| // Create multiple SystemProfileAggregates in the report's aggregation, as would be created for |
| // a REPORT_ALL metric. |
| uint64_t system_profile_hash_1 = uint64_t{1234}; |
| uint64_t system_profile_hash_2 = uint64_t{5678}; |
| uint32_t hour_id = 100001; |
| { |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| ReportAggregate& report_agg = |
| (*agg.aggregate()->mutable_by_report_id())[kOccurrenceMetricHourlyDeviceHistogramsReportId]; |
| report_agg.mutable_hourly()->set_last_hour_id(hour_id); |
| AggregationPeriodBucket& mutable_bucket = |
| (*report_agg.mutable_hourly()->mutable_by_hour_id())[hour_id]; |
| |
| SystemProfileAggregate* system_profile_aggregate = |
| mutable_bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash_1); |
| system_profile_aggregate->set_first_seen_timestamp(9000); |
| system_profile_aggregate->set_last_seen_timestamp(9001); |
| EventCodesAggregateData* data = system_profile_aggregate->add_by_event_code(); |
| data->add_event_codes(1); |
| data->mutable_data()->set_count(3); |
| |
| system_profile_aggregate = mutable_bucket.add_system_profile_aggregates(); |
| system_profile_aggregate->set_system_profile_hash(system_profile_hash_2); |
| system_profile_aggregate->set_first_seen_timestamp(10000); |
| system_profile_aggregate->set_last_seen_timestamp(10001); |
| data = system_profile_aggregate->add_by_event_code(); |
| data->add_event_codes(2); |
| data->mutable_data()->set_count(5); |
| |
| ASSERT_EQ(agg.Save().error_code(), StatusCode::OK); |
| } // Scope ensures the first MetricAggregateRef gets deleted before another one is created. |
| |
| ASSERT_THAT(fs()->ListFiles(absl::StrCat(test_folder(), "/", kCustomerId, "/", kProjectId)) |
| .ConsumeValueOrDie(), |
| Contains("1")); |
| |
| // Reload the storage, triggering a migration. |
| ReplaceRegistry(); |
| |
| StatusOr<MetricAggregateRef> agg_or = |
| storage_->GetMetricAggregate(lib::CustomerIdentifier(kCustomerId) |
| .ForProject(kProjectId) |
| .ForMetric(kOccurrenceMetricMetricId)); |
| ASSERT_EQ(agg_or.status().error_code(), StatusCode::OK); |
| MetricAggregateRef migrated_metric_agg = agg_or.ConsumeValueOrDie(); |
| const ReportAggregate& report_agg = migrated_metric_agg.aggregate()->by_report_id().at( |
| kOccurrenceMetricHourlyDeviceHistogramsReportId); |
| ASSERT_TRUE(report_agg.hourly().by_hour_id().contains(hour_id)); |
| const AggregationPeriodBucket& bucket = report_agg.hourly().by_hour_id().at(hour_id); |
| |
| ASSERT_EQ(bucket.system_profile_aggregates_size(), 1); |
| const SystemProfileAggregate& system_profile_aggregate = bucket.system_profile_aggregates(0); |
| EXPECT_EQ(system_profile_aggregate.system_profile_hash(), system_profile_hash_1); |
| EXPECT_EQ(system_profile_aggregate.first_seen_timestamp(), 9000); |
| EXPECT_EQ(system_profile_aggregate.last_seen_timestamp(), 10001); |
| |
| ASSERT_EQ(system_profile_aggregate.by_event_code_size(), 2); |
| const EventCodesAggregateData& data = system_profile_aggregate.by_event_code(0); |
| ASSERT_EQ(data.event_codes_size(), 1); |
| EXPECT_EQ(data.event_codes(0), 1); |
| EXPECT_EQ(data.data().count(), 3); |
| const EventCodesAggregateData& data2 = system_profile_aggregate.by_event_code(1); |
| ASSERT_EQ(data2.event_codes_size(), 1); |
| EXPECT_EQ(data2.event_codes(0), 2); |
| EXPECT_EQ(data2.data().count(), 5); |
| } |
| |
| } // namespace cobalt::local_aggregation |