blob: a3ff9fa882a60d658cb45a3cad783a576523f9d6 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/app/merging/merge_resolver.h"
#include <string>
#include <utility>
#include <lib/async/cpp/task.h>
#include <lib/backoff/testing/test_backoff.h>
#include <lib/callback/cancellable_helper.h>
#include <lib/callback/capture.h>
#include <lib/callback/set_when_called.h>
#include <lib/fit/function.h>
#include <lib/fxl/files/scoped_temp_dir.h>
#include <lib/fxl/macros.h>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "peridot/bin/ledger/app/constants.h"
#include "peridot/bin/ledger/app/merging/last_one_wins_merge_strategy.h"
#include "peridot/bin/ledger/app/merging/test_utils.h"
#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/bin/ledger/coroutine/coroutine_impl.h"
#include "peridot/bin/ledger/encryption/primitives/hash.h"
#include "peridot/bin/ledger/storage/fake/fake_page_storage.h"
#include "peridot/bin/ledger/storage/public/constants.h"
#include "peridot/bin/ledger/storage/public/page_storage.h"
namespace ledger {
namespace {
using ::testing::UnorderedElementsAre;
class FakePageStorageImpl : public storage::PageStorageEmptyImpl {
public:
FakePageStorageImpl(std::unique_ptr<storage::PageStorage> page_storage)
: storage_(std::move(page_storage)) {}
void MarkCommitContentsUnavailable(storage::CommitIdView commit_id) {
removed_commit_ids_.insert(commit_id.ToString());
}
void GetHeadCommitIds(
fit::function<void(storage::Status, std::vector<storage::CommitId>)>
callback) override {
storage_->GetHeadCommitIds(std::move(callback));
}
void GetCommit(storage::CommitIdView commit_id,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Commit>)>
callback) override {
storage_->GetCommit(commit_id, std::move(callback));
}
storage::Status AddCommitWatcher(storage::CommitWatcher* watcher) override {
return storage_->AddCommitWatcher(watcher);
}
storage::Status RemoveCommitWatcher(
storage::CommitWatcher* watcher) override {
return storage_->RemoveCommitWatcher(watcher);
}
void GetObject(storage::ObjectIdentifier object_identifier, Location location,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Object>)>
callback) override {
storage_->GetObject(std::move(object_identifier), location,
std::move(callback));
}
void StartCommit(
const storage::CommitId& commit_id, storage::JournalType journal_type,
fit::function<void(storage::Status, std::unique_ptr<storage::Journal>)>
callback) override {
storage_->StartCommit(commit_id, journal_type, std::move(callback));
}
void StartMergeCommit(
const storage::CommitId& left, const storage::CommitId& right,
fit::function<void(storage::Status, std::unique_ptr<storage::Journal>)>
callback) override {
storage_->StartMergeCommit(left, right, std::move(callback));
}
void CommitJournal(std::unique_ptr<storage::Journal> journal,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Commit>)>
callback) override {
storage_->CommitJournal(std::move(journal), std::move(callback));
}
void RollbackJournal(std::unique_ptr<storage::Journal> journal,
fit::function<void(storage::Status)> callback) override {
storage_->RollbackJournal(std::move(journal), std::move(callback));
}
void AddObjectFromLocal(
storage::ObjectType object_type,
std::unique_ptr<storage::DataSource> data_source,
fit::function<void(storage::Status, storage::ObjectIdentifier)> callback)
override {
storage_->AddObjectFromLocal(object_type, std::move(data_source),
std::move(callback));
}
void GetCommitContents(
const storage::Commit& commit, std::string min_key,
fit::function<bool(storage::Entry)> on_next,
fit::function<void(storage::Status)> on_done) override {
storage_->GetCommitContents(commit, std::move(min_key), std::move(on_next),
std::move(on_done));
}
void GetCommitContentsDiff(
const storage::Commit& base_commit, const storage::Commit& other_commit,
std::string min_key,
fit::function<bool(storage::EntryChange)> on_next_diff,
fit::function<void(storage::Status)> on_done) override {
if (removed_commit_ids_.find(base_commit.GetId()) !=
removed_commit_ids_.end() ||
removed_commit_ids_.find(other_commit.GetId()) !=
removed_commit_ids_.end()) {
on_done(storage::Status::NOT_CONNECTED_ERROR);
return;
}
storage_->GetCommitContentsDiff(base_commit, other_commit,
std::move(min_key), std::move(on_next_diff),
std::move(on_done));
}
private:
std::set<storage::CommitId> removed_commit_ids_;
std::unique_ptr<storage::PageStorage> storage_;
FXL_DISALLOW_COPY_AND_ASSIGN(FakePageStorageImpl);
};
class RecordingTestStrategy : public MergeStrategy {
public:
RecordingTestStrategy() {}
~RecordingTestStrategy() override {}
void SetOnError(fit::closure on_error) override {
this->on_error = std::move(on_error);
}
void SetOnMerge(fit::closure on_merge) { on_merge_ = std::move(on_merge); }
void Merge(storage::PageStorage* /*storage*/, PageManager* /*page_manager*/,
std::unique_ptr<const storage::Commit> /*head_1*/,
std::unique_ptr<const storage::Commit> /*head_2*/,
std::unique_ptr<const storage::Commit> /*ancestor*/,
fit::function<void(Status)> callback) override {
this->callback = std::move(callback);
merge_calls++;
if (on_merge_) {
on_merge_();
}
}
void Cancel() override { cancel_calls++; }
fit::closure on_error;
fit::function<void(Status)> callback;
uint32_t merge_calls = 0;
uint32_t cancel_calls = 0;
private:
fit::closure on_merge_;
};
class MergeResolverTest : public TestWithPageStorage {
public:
MergeResolverTest() {}
~MergeResolverTest() override {}
protected:
storage::PageStorage* page_storage() override { return page_storage_.get(); }
void SetUp() override {
TestWithPageStorage::SetUp();
std::unique_ptr<storage::PageStorage> storage;
ASSERT_TRUE(CreatePageStorage(&storage));
page_storage_ = std::make_unique<FakePageStorageImpl>(std::move(storage));
}
storage::CommitId CreateCommit(
storage::CommitIdView parent_id,
fit::function<void(storage::Journal*)> contents) {
return CreateCommit(page_storage_.get(), parent_id, std::move(contents));
}
storage::CommitId CreateCommit(
storage::PageStorage* storage, storage::CommitIdView parent_id,
fit::function<void(storage::Journal*)> contents) {
bool called;
storage::Status status;
std::unique_ptr<storage::Journal> journal;
storage->StartCommit(
parent_id.ToString(), storage::JournalType::IMPLICIT,
callback::Capture(callback::SetWhenCalled(&called), &status, &journal));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
contents(journal.get());
std::unique_ptr<const storage::Commit> commit;
storage->CommitJournal(
std::move(journal),
callback::Capture(callback::SetWhenCalled(&called), &status, &commit));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
return commit->GetId();
}
storage::CommitId CreateMergeCommit(
storage::CommitIdView parent_id1, storage::CommitIdView parent_id2,
fit::function<void(storage::Journal*)> contents) {
return CreateMergeCommit(page_storage_.get(), parent_id1, parent_id2,
std::move(contents));
}
storage::CommitId CreateMergeCommit(
storage::PageStorage* storage, storage::CommitIdView parent_id1,
storage::CommitIdView parent_id2,
fit::function<void(storage::Journal*)> contents) {
bool called;
storage::Status status;
std::unique_ptr<storage::Journal> journal;
storage->StartMergeCommit(
parent_id1.ToString(), parent_id2.ToString(),
callback::Capture(callback::SetWhenCalled(&called), &status, &journal));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
contents(journal.get());
storage::Status actual_status;
std::unique_ptr<const storage::Commit> actual_commit;
storage->CommitJournal(std::move(journal),
callback::Capture(callback::SetWhenCalled(&called),
&actual_status, &actual_commit));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, actual_status);
return actual_commit->GetId();
}
std::vector<storage::Entry> GetCommitContents(const storage::Commit& commit) {
storage::Status status;
std::vector<storage::Entry> result;
auto on_next = [&result](storage::Entry e) {
result.push_back(e);
return true;
};
bool called;
page_storage_->GetCommitContents(
commit, "", std::move(on_next),
callback::Capture(callback::SetWhenCalled(&called), &status));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
return result;
}
std::unique_ptr<FakePageStorageImpl> page_storage_;
private:
FXL_DISALLOW_COPY_AND_ASSIGN(MergeResolverTest);
};
TEST_F(MergeResolverTest, Empty) {
// Set up conflict
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "bar"));
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(QuitLoopClosure());
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, ids.size());
}
class CapturingMergeStrategy : public MergeStrategy {
public:
CapturingMergeStrategy() {}
~CapturingMergeStrategy() override {}
void SetOnError(fit::function<void()> /*on_error*/) override {}
void Merge(storage::PageStorage* /*storage*/, PageManager* /*page_manager*/,
std::unique_ptr<const storage::Commit> merge_head_1,
std::unique_ptr<const storage::Commit> merge_head_2,
std::unique_ptr<const storage::Commit> merge_ancestor,
fit::function<void(Status)> merge_callback) override {
head_1 = std::move(merge_head_1);
head_2 = std::move(merge_head_2);
ancestor = std::move(merge_ancestor);
callback = std::move(merge_callback);
}
void Cancel() override{};
std::unique_ptr<const storage::Commit> head_1;
std::unique_ptr<const storage::Commit> head_2;
std::unique_ptr<const storage::Commit> ancestor;
fit::function<void(Status)> callback;
private:
FXL_DISALLOW_ASSIGN(CapturingMergeStrategy);
};
TEST_F(MergeResolverTest, CommonAncestor) {
// Add commits forming the following history graph:
// (root) -> (1) -> (2) -> (3)
// \
// -> (4) -> (5)
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
RunLoopUntilIdle();
// Set a merge strategy to capture the requested merge.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<CapturingMergeStrategy> strategy =
std::make_unique<CapturingMergeStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Verify that the strategy is asked to merge commits 5 and 3, with 2 as the
// common ancestor.
EXPECT_EQ(commit_3, strategy_ptr->head_1->GetId());
EXPECT_EQ(commit_5, strategy_ptr->head_2->GetId());
EXPECT_EQ(commit_2, strategy_ptr->ancestor->GetId());
// Resolve the conflict.
CreateMergeCommit(strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId(),
AddKeyValueToJournal("key_foo", "abc"));
strategy_ptr->callback(Status::OK);
strategy_ptr->callback = nullptr;
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
}
TEST_F(MergeResolverTest, LastOneWins) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_3));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_5));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(callback::SetWhenCalled(&called));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, ids.size());
std::unique_ptr<const storage::Commit> commit;
page_storage_->GetCommit(
ids[0],
::callback::Capture(callback::SetWhenCalled(&called), &status, &commit));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_TRUE(commit);
std::vector<storage::Entry> content_vector = GetCommitContents(*commit);
// Entries are ordered by keys
ASSERT_EQ(2u, content_vector.size());
EXPECT_EQ("key2", content_vector[0].key);
std::string value;
EXPECT_TRUE(GetValue(content_vector[0].object_identifier, &value));
EXPECT_EQ("val2.1", value);
EXPECT_EQ("key3", content_vector[1].key);
EXPECT_TRUE(GetValue(content_vector[1].object_identifier, &value));
EXPECT_EQ("val3.0", value);
}
TEST_F(MergeResolverTest, LastOneWinsDiffNotAvailable) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_THAT(ids, UnorderedElementsAre(commit_3, commit_5));
page_storage_->MarkCommitContentsUnavailable(commit_2);
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(callback::SetWhenCalled(&called));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
}
TEST_F(MergeResolverTest, None) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_3));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_5));
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.set_on_empty(QuitLoopClosure());
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
}
TEST_F(MergeResolverTest, UpdateMidResolution) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_1, AddKeyValueToJournal("key3", "val3.0"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_2));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_3));
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.set_on_empty(callback::SetWhenCalled(&called));
resolver.SetMergeStrategy(std::make_unique<LastOneWinsMergeStrategy>());
async::PostTask(dispatcher(), [&resolver] {
resolver.SetMergeStrategy(std::make_unique<LastOneWinsMergeStrategy>());
});
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, ids.size());
}
// Merge of merges backoff is only triggered when commits are coming from sync.
// To test this, we need to create conflicts and make it as if they are not
// created locally. This is done by preventing commit notifications for new
// commits, then issuing manually a commit notification "from sync". As this
// implies using a fake PageStorage, we don't test the resolution itself, only
// that backoff is triggered correctly.
TEST_F(MergeResolverTest, WaitOnMergeOfMerges) {
storage::fake::FakePageStorage page_storage(&environment_, "page_id");
bool on_empty_called;
auto backoff = std::make_unique<backoff::TestBackoff>();
auto backoff_ptr = backoff.get();
MergeResolver resolver([] {}, &environment_, &page_storage,
std::move(backoff));
resolver.set_on_empty(callback::SetWhenCalled(&on_empty_called));
auto strategy = std::make_unique<RecordingTestStrategy>();
strategy->SetOnMerge(QuitLoopClosure());
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
EXPECT_TRUE(on_empty_called);
page_storage.SetDropCommitNotifications(true);
// Set up conflict
storage::CommitId commit_0 = CreateCommit(
&page_storage, storage::kFirstPageCommitId, [](storage::Journal*) {});
storage::CommitId commit_1 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_3 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId merge_1 =
CreateMergeCommit(&page_storage, commit_1, commit_3,
AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId merge_2 =
CreateMergeCommit(&page_storage, commit_2, commit_3,
AddKeyValueToJournal("key3", "val3.0"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage.GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), merge_1));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), merge_2));
page_storage.SetDropCommitNotifications(false);
storage::CommitWatcher* watcher = &resolver;
watcher->OnNewCommits({}, storage::ChangeSource::CLOUD);
// Note we can't use "RunLoopUntilIdle()" because the FakePageStorage delays
// before inserting tasks into the message loop.
RunLoopFor(zx::sec(5));
EXPECT_GT(backoff_ptr->get_next_count, 0);
}
TEST_F(MergeResolverTest, AutomaticallyMergeIdenticalCommits) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_1));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_2));
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.set_on_empty(callback::SetWhenCalled(&called));
auto merge_strategy = std::make_unique<RecordingTestStrategy>();
auto merge_strategy_ptr = merge_strategy.get();
resolver.SetMergeStrategy(std::move(merge_strategy));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, ids.size());
EXPECT_EQ(0u, merge_strategy_ptr->merge_calls);
}
TEST_F(MergeResolverTest, NoConflictCallback_ConflictsResolved) {
// Set up conflict.
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "bar"));
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(MakeQuitTaskOnce());
bool called;
storage::Status status;
std::vector<storage::CommitId> ids;
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, ids.size());
size_t callback_calls = 0;
auto conflicts_resolved_callback = [&resolver, &callback_calls]() {
EXPECT_TRUE(resolver.IsEmpty());
callback_calls++;
};
ConflictResolutionWaitStatus wait_status;
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
// Check that the callback was called 2 times.
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
EXPECT_EQ(2u, callback_calls);
EXPECT_EQ(ConflictResolutionWaitStatus::CONFLICTS_RESOLVED, wait_status);
ids.clear();
page_storage_->GetHeadCommitIds(
callback::Capture(callback::SetWhenCalled(&called), &status, &ids));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, ids.size());
callback_calls = 0;
CreateCommit(ids[0], AddKeyValueToJournal("foo", "baw"));
CreateCommit(ids[0], AddKeyValueToJournal("foo", "bat"));
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
// Check that callback wasn't called (callback queue cleared after all the
// callbacks in it were called).
RunLoopFor(zx::sec(10));
EXPECT_EQ(0u, callback_calls);
}
TEST_F(MergeResolverTest, NoConflictCallback_NoConflicts) {
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(MakeQuitTaskOnce());
size_t callback_calls = 0;
auto conflicts_resolved_callback = [&resolver, &callback_calls]() {
EXPECT_TRUE(resolver.IsEmpty());
callback_calls++;
};
ConflictResolutionWaitStatus wait_status;
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
// Check that the callback was called 1 times.
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
EXPECT_EQ(1u, callback_calls);
EXPECT_EQ(ConflictResolutionWaitStatus::NO_CONFLICTS, wait_status);
}
TEST_F(MergeResolverTest, HasUnfinishedMerges) {
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<CapturingMergeStrategy> strategy =
std::make_unique<CapturingMergeStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
EXPECT_FALSE(resolver.HasUnfinishedMerges());
// Set up a conflict and verify that HasUnfinishedMerges() returns true.
storage::CommitId commit_1 = CreateCommit(storage::kFirstPageCommitId,
AddKeyValueToJournal("foo", "bar"));
storage::CommitId commit_2 = CreateCommit(storage::kFirstPageCommitId,
AddKeyValueToJournal("foo", "baz"));
RunLoopUntilIdle();
EXPECT_TRUE(resolver.HasUnfinishedMerges());
// Resolve the conflict and verify that HasUnfinishedMerges() returns false.
ASSERT_TRUE(strategy_ptr->head_1);
ASSERT_TRUE(strategy_ptr->head_2);
ASSERT_TRUE(strategy_ptr->ancestor);
ASSERT_TRUE(strategy_ptr->callback);
CreateMergeCommit(strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId(),
AddKeyValueToJournal("key3", "val3.0"));
strategy_ptr->callback(Status::OK);
strategy_ptr->callback = nullptr;
RunLoopUntilIdle();
EXPECT_FALSE(resolver.HasUnfinishedMerges());
}
} // namespace
} // namespace ledger