| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h" |
| |
| #include <chrono> |
| #include <future> |
| #include <memory> |
| |
| #include <gmock/gmock.h> |
| #include <gtest/gtest.h> |
| |
| #include "absl/strings/escaping.h" |
| #include "absl/strings/str_cat.h" |
| #include "src/lib/statusor/statusor.h" |
| #include "src/lib/util/testing/test_with_files.h" |
| #include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h" |
| #include "src/local_aggregation_1_1/testing/test_registry.cb.h" |
| #include "src/logger/project_context_factory.h" |
| #include "src/logging.h" |
| #include "src/registry/cobalt_registry.pb.h" |
| |
| namespace cobalt::local_aggregation { |
| using lib::statusor::StatusOr; |
| using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef; |
| |
| const std::chrono::seconds kMaxWait = std::chrono::seconds(5); |
| |
| namespace { |
| |
| std::unique_ptr<CobaltRegistry> GetRegistry() { |
| std::string bytes; |
| if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) { |
| LOG(ERROR) << "Unable to decode Base64 String"; |
| return nullptr; |
| } |
| |
| auto registry = std::make_unique<CobaltRegistry>(); |
| if (!registry->ParseFromString(bytes)) { |
| LOG(ERROR) << "Unable to parse registry from bytes"; |
| return nullptr; |
| } |
| |
| return registry; |
| } |
| |
| } // namespace |
| |
| class DelayedLocalAggregateStorageTest : public util::testing::TestWithFiles { |
| private: |
| void SetUp() override { |
| MakeTestFolder(); |
| InitStorage(); |
| } |
| |
| public: |
| void InitStorage(std::unique_ptr<CobaltRegistry> registry = GetRegistry(), |
| std::chrono::milliseconds writeback_delay = std::chrono::milliseconds(200)) { |
| storage_ = nullptr; |
| global_project_context_factory_ = |
| std::make_unique<logger::ProjectContextFactory>(std::move(registry)); |
| storage_ = std::make_unique<DelayedLocalAggregateStorage>( |
| test_folder(), fs(), global_project_context_factory_.get(), writeback_delay); |
| } |
| |
| protected: |
| std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_; |
| std::unique_ptr<DelayedLocalAggregateStorage> storage_; |
| }; |
| |
| TEST_F(DelayedLocalAggregateStorageTest, CanReadAlreadyWrittenFiles) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| agg.aggregate()->set_version(100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| storage_->WaitUntilSave(kMaxWait); |
| |
| InitStorage(); |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| ASSERT_EQ(agg.aggregate()->version(), 100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| storage_->WaitUntilSave(kMaxWait); |
| } |
| |
| TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldMetrics) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| storage_->WaitUntilSave(kMaxWait); |
| |
| auto registry = GetRegistry(); |
| google::protobuf::RepeatedPtrField<MetricDefinition> *metrics = |
| registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics(); |
| metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex); |
| InitStorage(std::move(registry)); |
| |
| ASSERT_FALSE(storage_->GetMetricAggregate(123, 100, 1).ok()); |
| } |
| |
| TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldProjects) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| storage_->WaitUntilSave(kMaxWait); |
| |
| auto registry = GetRegistry(); |
| registry->mutable_customers(0)->mutable_projects()->RemoveLast(); |
| InitStorage(std::move(registry)); |
| |
| ASSERT_FALSE(storage_->GetMetricAggregate(123, 100, 1).ok()); |
| } |
| |
| TEST_F(DelayedLocalAggregateStorageTest, DeleteDataWorks) { |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| agg.aggregate()->set_version(100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| storage_->WaitUntilSave(kMaxWait); |
| |
| storage_->DeleteData(); |
| |
| ASSERT_FALSE(storage_->HasMetricAggregate(123, 100, 1)); |
| } |
| |
| TEST_F(DelayedLocalAggregateStorageTest, ShutDownIsFast) { |
| // Construct a storage with an extremely long writeback frequency. |
| InitStorage(GetRegistry(), std::chrono::hours(99999)); |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| agg.aggregate()->set_version(100); |
| ASSERT_TRUE(agg.Save().ok()); |
| } |
| // Destroy the storage. Immediate writeback should occur. |
| storage_ = nullptr; |
| |
| InitStorage(GetRegistry(), std::chrono::hours(99999)); |
| { |
| StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| ASSERT_EQ(agg.aggregate()->version(), 100); |
| } |
| } |
| |
| class DelayedLocalAggregateStorageTestDeadlock : public util::testing::TestWithFiles {}; |
| TEST_F(DelayedLocalAggregateStorageTestDeadlock, NoDeadlock) { |
| std::promise<bool> complete; |
| // Run the test in a thread in case it hangs. |
| auto thread = std::thread([this, &complete]() { |
| logger::ProjectContextFactory global_project_context_factory(GetRegistry()); |
| DelayedLocalAggregateStorage storage(test_folder(), fs(), &global_project_context_factory, |
| std::chrono::milliseconds(100)); |
| |
| StatusOr<MetricAggregateRef> agg_or = storage.GetMetricAggregate(123, 100, 1); |
| ASSERT_TRUE(agg_or.ok()); |
| MetricAggregateRef agg = agg_or.ConsumeValueOrDie(); |
| |
| ASSERT_TRUE(agg.Save().ok()); |
| ASSERT_TRUE(storage.WaitUntilSaveStart(kMaxWait)); |
| ASSERT_TRUE(agg.Save().ok()); // Save while storage_ is attempting to save should not hang. |
| |
| complete.set_value(true); |
| }); |
| |
| // If the promise isn't resolved after 5 seconds, a deadlock has likely occurred. |
| if (complete.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { |
| ADD_FAILURE() << "Deadlock found"; |
| thread.detach(); |
| return; |
| } |
| |
| thread.join(); |
| } |
| |
| } // namespace cobalt::local_aggregation |