blob: 01ddadddb3637419cabb47baba7b2ae610596803 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/local_aggregation_1_1/local_aggregate_storage/delayed_local_aggregate_storage.h"
#include <chrono>
#include <future>
#include <memory>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "src/lib/statusor/statusor.h"
#include "src/lib/util/testing/test_with_files.h"
#include "src/local_aggregation_1_1/local_aggregate_storage/local_aggregate_storage.h"
#include "src/local_aggregation_1_1/testing/test_registry.cb.h"
#include "src/logger/project_context_factory.h"
#include "src/logging.h"
#include "src/registry/cobalt_registry.pb.h"
namespace cobalt::local_aggregation {
using lib::statusor::StatusOr;
using MetricAggregateRef = LocalAggregateStorage::MetricAggregateRef;
const std::chrono::seconds kMaxWait = std::chrono::seconds(5);
namespace {
std::unique_ptr<CobaltRegistry> GetRegistry() {
std::string bytes;
if (!absl::Base64Unescape(kCobaltRegistryBase64, &bytes)) {
LOG(ERROR) << "Unable to decode Base64 String";
return nullptr;
}
auto registry = std::make_unique<CobaltRegistry>();
if (!registry->ParseFromString(bytes)) {
LOG(ERROR) << "Unable to parse registry from bytes";
return nullptr;
}
return registry;
}
} // namespace
class DelayedLocalAggregateStorageTest : public util::testing::TestWithFiles {
private:
void SetUp() override {
MakeTestFolder();
InitStorage();
}
public:
void InitStorage(std::unique_ptr<CobaltRegistry> registry = GetRegistry(),
std::chrono::milliseconds writeback_delay = std::chrono::milliseconds(200)) {
storage_ = nullptr;
global_project_context_factory_ =
std::make_unique<logger::ProjectContextFactory>(std::move(registry));
storage_ = std::make_unique<DelayedLocalAggregateStorage>(
test_folder(), fs(), global_project_context_factory_.get(), writeback_delay);
}
protected:
std::unique_ptr<logger::ProjectContextFactory> global_project_context_factory_;
std::unique_ptr<DelayedLocalAggregateStorage> storage_;
};
TEST_F(DelayedLocalAggregateStorageTest, CanReadAlreadyWrittenFiles) {
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
InitStorage();
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_EQ(agg.aggregate()->version(), 100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
}
TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldMetrics) {
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
google::protobuf::RepeatedPtrField<MetricDefinition> *metrics =
registry->mutable_customers(0)->mutable_projects(0)->mutable_metrics();
metrics->erase(metrics->begin() + kOccurrenceMetricMetricIndex);
InitStorage(std::move(registry));
ASSERT_FALSE(storage_->GetMetricAggregate(123, 100, 1).ok());
}
TEST_F(DelayedLocalAggregateStorageTest, CleansUpOldProjects) {
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
auto registry = GetRegistry();
registry->mutable_customers(0)->mutable_projects()->RemoveLast();
InitStorage(std::move(registry));
ASSERT_FALSE(storage_->GetMetricAggregate(123, 100, 1).ok());
}
TEST_F(DelayedLocalAggregateStorageTest, DeleteDataWorks) {
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
storage_->WaitUntilSave(kMaxWait);
storage_->DeleteData();
ASSERT_FALSE(storage_->HasMetricAggregate(123, 100, 1));
}
TEST_F(DelayedLocalAggregateStorageTest, ShutDownIsFast) {
// Construct a storage with an extremely long writeback frequency.
InitStorage(GetRegistry(), std::chrono::hours(99999));
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
agg.aggregate()->set_version(100);
ASSERT_TRUE(agg.Save().ok());
}
// Destroy the storage. Immediate writeback should occur.
storage_ = nullptr;
InitStorage(GetRegistry(), std::chrono::hours(99999));
{
StatusOr<MetricAggregateRef> agg_or = storage_->GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_EQ(agg.aggregate()->version(), 100);
}
}
class DelayedLocalAggregateStorageTestDeadlock : public util::testing::TestWithFiles {};
TEST_F(DelayedLocalAggregateStorageTestDeadlock, NoDeadlock) {
std::promise<bool> complete;
// Run the test in a thread in case it hangs.
auto thread = std::thread([this, &complete]() {
logger::ProjectContextFactory global_project_context_factory(GetRegistry());
DelayedLocalAggregateStorage storage(test_folder(), fs(), &global_project_context_factory,
std::chrono::milliseconds(100));
StatusOr<MetricAggregateRef> agg_or = storage.GetMetricAggregate(123, 100, 1);
ASSERT_TRUE(agg_or.ok());
MetricAggregateRef agg = agg_or.ConsumeValueOrDie();
ASSERT_TRUE(agg.Save().ok());
ASSERT_TRUE(storage.WaitUntilSaveStart(kMaxWait));
ASSERT_TRUE(agg.Save().ok()); // Save while storage_ is attempting to save should not hang.
complete.set_value(true);
});
// If the promise isn't resolved after 5 seconds, a deadlock has likely occurred.
if (complete.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
ADD_FAILURE() << "Deadlock found";
thread.detach();
return;
}
thread.join();
}
} // namespace cobalt::local_aggregation