blob: c24af20c6ff8eb7fb5c13b8134ae26dc9eb45c58 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/app/merging/merge_resolver.h"
#include <lib/async/cpp/task.h>
#include <lib/backoff/testing/test_backoff.h>
#include <lib/callback/cancellable_helper.h>
#include <lib/callback/capture.h>
#include <lib/callback/set_when_called.h>
#include <lib/fit/function.h>
#include <string>
#include <utility>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "src/ledger/bin/app/constants.h"
#include "src/ledger/bin/app/merging/last_one_wins_merge_strategy.h"
#include "src/ledger/bin/app/merging/test_utils.h"
#include "src/ledger/bin/app/page_utils.h"
#include "src/ledger/bin/encryption/primitives/hash.h"
#include "src/ledger/bin/storage/fake/fake_page_storage.h"
#include "src/ledger/bin/storage/public/constants.h"
#include "src/ledger/bin/storage/public/page_storage.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/lib/fxl/macros.h"
namespace ledger {
namespace {
using ::testing::AnyOf;
using ::testing::Eq;
using ::testing::UnorderedElementsAre;
std::vector<storage::CommitId> ToCommitIds(
const std::vector<std::unique_ptr<const storage::Commit>>& commits) {
std::vector<storage::CommitId> ids;
ids.reserve(commits.size());
for (const auto& commit : commits) {
ids.push_back(commit->GetId());
}
return ids;
}
class FakePageStorageImpl : public storage::PageStorageEmptyImpl {
public:
FakePageStorageImpl(std::unique_ptr<storage::PageStorage> page_storage)
: storage_(std::move(page_storage)) {}
void MarkCommitContentsUnavailable(storage::CommitIdView commit_id) {
removed_commit_ids_.insert(commit_id.ToString());
}
storage::Status GetHeadCommits(
std::vector<std::unique_ptr<const storage::Commit>>* head_commits)
override {
return storage_->GetHeadCommits(head_commits);
}
void GetMergeCommitIds(
storage::CommitIdView parent1, storage::CommitIdView parent2,
fit::function<void(storage::Status, std::vector<storage::CommitId>)>
callback) override {
storage_->GetMergeCommitIds(parent1, parent2, std::move(callback));
}
void GetCommit(storage::CommitIdView commit_id,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Commit>)>
callback) override {
storage_->GetCommit(commit_id, std::move(callback));
}
void AddCommitWatcher(storage::CommitWatcher* watcher) override {
storage_->AddCommitWatcher(watcher);
}
void RemoveCommitWatcher(storage::CommitWatcher* watcher) override {
storage_->RemoveCommitWatcher(watcher);
}
void GetObject(storage::ObjectIdentifier object_identifier, Location location,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Object>)>
callback) override {
storage_->GetObject(std::move(object_identifier), location,
std::move(callback));
}
std::unique_ptr<storage::Journal> StartCommit(
std::unique_ptr<const storage::Commit> commit) override {
return storage_->StartCommit(std::move(commit));
}
std::unique_ptr<storage::Journal> StartMergeCommit(
std::unique_ptr<const storage::Commit> left,
std::unique_ptr<const storage::Commit> right) override {
return storage_->StartMergeCommit(std::move(left), std::move(right));
}
void CommitJournal(std::unique_ptr<storage::Journal> journal,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Commit>)>
callback) override {
storage_->CommitJournal(std::move(journal), std::move(callback));
}
void AddObjectFromLocal(
storage::ObjectType object_type,
std::unique_ptr<storage::DataSource> data_source,
storage::ObjectReferencesAndPriority tree_references,
fit::function<void(storage::Status, storage::ObjectIdentifier)> callback)
override {
storage_->AddObjectFromLocal(object_type, std::move(data_source),
std::move(tree_references),
std::move(callback));
}
void GetCommitContents(
const storage::Commit& commit, std::string min_key,
fit::function<bool(storage::Entry)> on_next,
fit::function<void(storage::Status)> on_done) override {
storage_->GetCommitContents(commit, std::move(min_key), std::move(on_next),
std::move(on_done));
}
void GetCommitContentsDiff(
const storage::Commit& base_commit, const storage::Commit& other_commit,
std::string min_key,
fit::function<bool(storage::EntryChange)> on_next_diff,
fit::function<void(storage::Status)> on_done) override {
if (removed_commit_ids_.find(base_commit.GetId()) !=
removed_commit_ids_.end() ||
removed_commit_ids_.find(other_commit.GetId()) !=
removed_commit_ids_.end()) {
on_done(storage::Status::NETWORK_ERROR);
return;
}
storage_->GetCommitContentsDiff(base_commit, other_commit,
std::move(min_key), std::move(on_next_diff),
std::move(on_done));
}
private:
std::set<storage::CommitId> removed_commit_ids_;
std::unique_ptr<storage::PageStorage> storage_;
FXL_DISALLOW_COPY_AND_ASSIGN(FakePageStorageImpl);
};
class RecordingTestStrategy : public MergeStrategy {
public:
RecordingTestStrategy() {}
~RecordingTestStrategy() override {}
void SetOnError(fit::closure on_error) override {
this->on_error = std::move(on_error);
}
void SetOnMerge(fit::closure on_merge) { on_merge_ = std::move(on_merge); }
void Merge(storage::PageStorage* storage, PageManager* page_manager,
std::unique_ptr<const storage::Commit> merge_head_1,
std::unique_ptr<const storage::Commit> merge_head_2,
std::unique_ptr<const storage::Commit> merge_ancestor,
fit::function<void(storage::Status)> merge_callback) override {
EXPECT_TRUE(storage::Commit::TimestampOrdered(merge_head_1, merge_head_2));
storage_ = storage;
page_manager_ = page_manager;
callback = std::move(merge_callback);
head_1 = std::move(merge_head_1);
head_2 = std::move(merge_head_2);
ancestor = std::move(merge_ancestor);
merge_calls++;
if (on_merge_) {
on_merge_();
}
}
void Forward(MergeStrategy* strategy) {
strategy->Merge(storage_, page_manager_, std::move(head_1),
std::move(head_2), std::move(ancestor),
std::move(callback));
}
void Cancel() override { cancel_calls++; }
fit::closure on_error;
std::unique_ptr<const storage::Commit> head_1;
std::unique_ptr<const storage::Commit> head_2;
std::unique_ptr<const storage::Commit> ancestor;
fit::function<void(storage::Status)> callback;
uint32_t merge_calls = 0;
uint32_t cancel_calls = 0;
private:
storage::PageStorage* storage_;
PageManager* page_manager_;
fit::closure on_merge_;
};
class MergeResolverTest : public TestWithPageStorage {
public:
MergeResolverTest() {}
~MergeResolverTest() override {}
protected:
storage::PageStorage* page_storage() override { return page_storage_.get(); }
void SetUp() override {
TestWithPageStorage::SetUp();
std::unique_ptr<storage::PageStorage> storage;
ASSERT_TRUE(CreatePageStorage(&storage));
page_storage_ = std::make_unique<FakePageStorageImpl>(std::move(storage));
}
storage::CommitId CreateCommit(
storage::CommitIdView parent_id,
fit::function<void(storage::Journal*)> contents) {
return CreateCommit(page_storage_.get(), parent_id, std::move(contents));
}
storage::CommitId CreateCommit(
storage::PageStorage* storage, storage::CommitIdView parent_id,
fit::function<void(storage::Journal*)> contents) {
storage::Status status;
bool called;
std::unique_ptr<const storage::Commit> base;
storage->GetCommit(
parent_id,
callback::Capture(callback::SetWhenCalled(&called), &status, &base));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
std::unique_ptr<storage::Journal> journal =
storage->StartCommit(std::move(base));
contents(journal.get());
std::unique_ptr<const storage::Commit> commit;
storage->CommitJournal(
std::move(journal),
callback::Capture(callback::SetWhenCalled(&called), &status, &commit));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
return commit->GetId();
}
storage::CommitId CreateMergeCommit(
storage::CommitIdView parent_id1, storage::CommitIdView parent_id2,
fit::function<void(storage::Journal*)> contents) {
return CreateMergeCommit(page_storage_.get(), parent_id1, parent_id2,
std::move(contents));
}
storage::CommitId CreateMergeCommit(
storage::PageStorage* storage, storage::CommitIdView parent_id1,
storage::CommitIdView parent_id2,
fit::function<void(storage::Journal*)> contents) {
storage::Status status;
bool called;
std::unique_ptr<const storage::Commit> base1;
storage->GetCommit(
parent_id1,
callback::Capture(callback::SetWhenCalled(&called), &status, &base1));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
std::unique_ptr<const storage::Commit> base2;
storage->GetCommit(
parent_id2,
callback::Capture(callback::SetWhenCalled(&called), &status, &base2));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
std::unique_ptr<storage::Journal> journal =
storage->StartMergeCommit(std::move(base1), std::move(base2));
contents(journal.get());
storage::Status actual_status;
std::unique_ptr<const storage::Commit> actual_commit;
storage->CommitJournal(std::move(journal),
callback::Capture(callback::SetWhenCalled(&called),
&actual_status, &actual_commit));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, actual_status);
return actual_commit->GetId();
}
std::vector<storage::Entry> GetCommitContents(const storage::Commit& commit) {
storage::Status status;
std::vector<storage::Entry> result;
auto on_next = [&result](storage::Entry e) {
result.push_back(e);
return true;
};
bool called;
page_storage_->GetCommitContents(
commit, "", std::move(on_next),
callback::Capture(callback::SetWhenCalled(&called), &status));
RunLoopUntilIdle();
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
return result;
}
// Checks that a string represents a valid set of changes: it is sorted and
// does not contain duplicates.
bool ValidSet(std::string state) {
return std::is_sorted(state.begin(), state.end()) &&
std::adjacent_find(state.begin(), state.end()) == state.end();
}
// Merge two sets of changes, represented by sorted strings. Assuming that all
// changes are represented by unique letters, this checks that the base has
// exactly the common changes between left and right, and returns a version
// that includes all the changes of left and right.
// This is exactly the property we expect merging to verify.
std::string MergeAsSets(std::string left, std::string right,
std::string base) {
std::string out;
std::string expected_base;
EXPECT_TRUE(ValidSet(base));
EXPECT_TRUE(ValidSet(left));
EXPECT_TRUE(ValidSet(right));
std::set_intersection(left.begin(), left.end(), right.begin(), right.end(),
std::back_inserter(expected_base));
EXPECT_EQ(expected_base, base)
<< " when merging " << left << " and " << right;
std::set_union(left.begin(), left.end(), right.begin(), right.end(),
std::back_inserter(out));
EXPECT_TRUE(ValidSet(out));
return out;
}
std::string GetKeyOrEmpty(const storage::Commit& commit, std::string key) {
std::vector<storage::Entry> entries = GetCommitContents(commit);
auto it =
std::find_if(entries.begin(), entries.end(),
[&key](storage::Entry entry) { return entry.key == key; });
if (it == entries.end()) {
return "";
}
std::string value;
EXPECT_TRUE(GetValue(it->object_identifier, &value));
return value;
}
void MergeCommitsAsSets(const storage::Commit& left,
const storage::Commit& right,
const storage::Commit& base) {
std::string merge =
MergeAsSets(GetKeyOrEmpty(left, "k"), GetKeyOrEmpty(right, "k"),
GetKeyOrEmpty(base, "k"));
CreateMergeCommit(left.GetId(), right.GetId(),
AddKeyValueToJournal("k", merge));
}
std::unique_ptr<FakePageStorageImpl> page_storage_;
private:
FXL_DISALLOW_COPY_AND_ASSIGN(MergeResolverTest);
};
TEST_F(MergeResolverTest, Empty) {
// Set up conflict
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "bar"));
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(QuitLoopClosure());
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, commits.size());
}
TEST_F(MergeResolverTest, CommonAncestor) {
// Add commits forming the following history graph:
// (root) -> (1) -> (2) -> (3)
// \
// -> (4) -> (5)
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
RunLoopUntilIdle();
// Set a merge strategy to capture the requested merge.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Verify that the strategy is asked to merge commits 5 and 3, with 2 as the
// common ancestor.
EXPECT_EQ(commit_3, strategy_ptr->head_1->GetId());
EXPECT_EQ(commit_5, strategy_ptr->head_2->GetId());
EXPECT_EQ(commit_2, strategy_ptr->ancestor->GetId());
// Resolve the conflict.
CreateMergeCommit(strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId(),
AddKeyValueToJournal("key_foo", "abc"));
strategy_ptr->callback(storage::Status::OK);
strategy_ptr->callback = nullptr;
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
}
TEST_F(MergeResolverTest, LastOneWins) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
auto ids = ToCommitIds(commits);
EXPECT_THAT(ids, UnorderedElementsAre(commit_3, commit_5));
bool called;
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(callback::SetWhenCalled(&called));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, commits.size());
std::vector<storage::Entry> content_vector = GetCommitContents(*commits[0]);
// Entries are ordered by keys
ASSERT_EQ(2u, content_vector.size());
EXPECT_EQ("key2", content_vector[0].key);
std::string value;
EXPECT_TRUE(GetValue(content_vector[0].object_identifier, &value));
EXPECT_EQ("val2.1", value);
EXPECT_EQ("key3", content_vector[1].key);
EXPECT_TRUE(GetValue(content_vector[1].object_identifier, &value));
EXPECT_EQ("val3.0", value);
}
TEST_F(MergeResolverTest, LastOneWinsDiffNotAvailable) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_THAT(ToCommitIds(commits), UnorderedElementsAre(commit_3, commit_5));
page_storage_->MarkCommitContentsUnavailable(commit_2);
bool called;
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(callback::SetWhenCalled(&called));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
}
TEST_F(MergeResolverTest, None) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_2, AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId commit_4 =
CreateCommit(commit_2, DeleteKeyFromJournal("key1"));
storage::CommitId commit_5 =
CreateCommit(commit_4, AddKeyValueToJournal("key2", "val2.1"));
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
std::vector<storage::CommitId> ids = ToCommitIds(commits);
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_3));
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), commit_5));
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.set_on_empty(QuitLoopClosure());
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
}
TEST_F(MergeResolverTest, UpdateMidResolution) {
// Set up conflict
storage::CommitId commit_1 = CreateCommit(
storage::kFirstPageCommitId, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 =
CreateCommit(commit_1, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId commit_3 =
CreateCommit(commit_1, AddKeyValueToJournal("key3", "val3.0"));
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
EXPECT_THAT(ToCommitIds(commits), UnorderedElementsAre(commit_2, commit_3));
bool called;
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.set_on_empty(callback::SetWhenCalled(&called));
resolver.SetMergeStrategy(std::make_unique<LastOneWinsMergeStrategy>());
async::PostTask(dispatcher(), [&resolver] {
resolver.SetMergeStrategy(std::make_unique<LastOneWinsMergeStrategy>());
});
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_TRUE(resolver.IsEmpty());
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, commits.size());
}
// Merge of merges backoff is only triggered when commits are coming from sync.
// To test this, we need to create conflicts and make it as if they are not
// created locally. This is done by preventing commit notifications for new
// commits, then issuing manually a commit notification "from sync". As this
// implies using a fake PageStorage, we don't test the resolution itself, only
// that backoff is triggered correctly.
TEST_F(MergeResolverTest, WaitOnMergeOfMerges) {
storage::fake::FakePageStorage page_storage(&environment_, "page_id");
bool on_empty_called;
auto backoff = std::make_unique<backoff::TestBackoff>();
auto backoff_ptr = backoff.get();
MergeResolver resolver([] {}, &environment_, &page_storage,
std::move(backoff));
resolver.set_on_empty(callback::SetWhenCalled(&on_empty_called));
auto strategy = std::make_unique<RecordingTestStrategy>();
strategy->SetOnMerge(QuitLoopClosure());
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
EXPECT_TRUE(on_empty_called);
page_storage.SetDropCommitNotifications(true);
// Set up conflict
storage::CommitId commit_0 = CreateCommit(
&page_storage, storage::kFirstPageCommitId, [](storage::Journal*) {});
storage::CommitId commit_1 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_2 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key1", "val1.0"));
storage::CommitId commit_3 = CreateCommit(
&page_storage, commit_0, AddKeyValueToJournal("key2", "val2.0"));
storage::CommitId merge_1 =
CreateMergeCommit(&page_storage, commit_1, commit_3,
AddKeyValueToJournal("key3", "val3.0"));
storage::CommitId merge_2 =
CreateMergeCommit(&page_storage, commit_2, commit_3,
AddKeyValueToJournal("key3", "val3.0"));
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage.GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
EXPECT_THAT(ToCommitIds(commits), UnorderedElementsAre(merge_1, merge_2));
page_storage.SetDropCommitNotifications(false);
storage::CommitWatcher* watcher = &resolver;
watcher->OnNewCommits({}, storage::ChangeSource::CLOUD);
// Note we can't use "RunLoopUntilIdle()" because the FakePageStorage delays
// before inserting tasks into the message loop.
RunLoopFor(zx::sec(5));
EXPECT_GT(backoff_ptr->get_next_count, 0);
}
TEST_F(MergeResolverTest, NoConflictCallback_ConflictsResolved) {
// Set up conflict.
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "bar"));
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(MakeQuitTaskOnce());
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(2u, commits.size());
RunLoopUntilIdle();
size_t callback_calls = 0;
auto conflicts_resolved_callback = [&resolver, &callback_calls]() {
EXPECT_TRUE(resolver.IsEmpty());
callback_calls++;
};
ConflictResolutionWaitStatus wait_status;
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
// Check that the callback was called 2 times.
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
EXPECT_EQ(2u, callback_calls);
EXPECT_EQ(ConflictResolutionWaitStatus::CONFLICTS_RESOLVED, wait_status);
commits.clear();
status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, commits.size());
callback_calls = 0;
CreateCommit(commits[0]->GetId(), AddKeyValueToJournal("foo", "baw"));
CreateCommit(commits[0]->GetId(), AddKeyValueToJournal("foo", "bat"));
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
// Check that callback wasn't called (callback queue cleared after all the
// callbacks in it were called).
RunLoopFor(zx::sec(10));
EXPECT_EQ(0u, callback_calls);
}
TEST_F(MergeResolverTest, NoConflictCallback_NoConflicts) {
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("foo", "baz"));
std::unique_ptr<LastOneWinsMergeStrategy> strategy =
std::make_unique<LastOneWinsMergeStrategy>();
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
resolver.SetMergeStrategy(std::move(strategy));
resolver.set_on_empty(MakeQuitTaskOnce());
size_t callback_calls = 0;
auto conflicts_resolved_callback = [&resolver, &callback_calls]() {
EXPECT_TRUE(resolver.IsEmpty());
callback_calls++;
};
ConflictResolutionWaitStatus wait_status;
resolver.RegisterNoConflictCallback(
callback::Capture(conflicts_resolved_callback, &wait_status));
// Check that the callback was called 1 times.
RunLoopUntilIdle();
EXPECT_TRUE(resolver.IsEmpty());
EXPECT_EQ(1u, callback_calls);
EXPECT_EQ(ConflictResolutionWaitStatus::NO_CONFLICTS, wait_status);
}
TEST_F(MergeResolverTest, HasUnfinishedMerges) {
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
EXPECT_FALSE(resolver.HasUnfinishedMerges());
// Set up a conflict and verify that HasUnfinishedMerges() returns true.
storage::CommitId commit_1 = CreateCommit(storage::kFirstPageCommitId,
AddKeyValueToJournal("foo", "bar"));
storage::CommitId commit_2 = CreateCommit(storage::kFirstPageCommitId,
AddKeyValueToJournal("foo", "baz"));
RunLoopUntilIdle();
EXPECT_TRUE(resolver.HasUnfinishedMerges());
// Resolve the conflict and verify that HasUnfinishedMerges() returns false.
ASSERT_TRUE(strategy_ptr->head_1);
ASSERT_TRUE(strategy_ptr->head_2);
ASSERT_TRUE(strategy_ptr->ancestor);
ASSERT_TRUE(strategy_ptr->callback);
CreateMergeCommit(strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId(),
AddKeyValueToJournal("key3", "val3.0"));
strategy_ptr->callback(storage::Status::OK);
strategy_ptr->callback = nullptr;
RunLoopUntilIdle();
EXPECT_FALSE(resolver.HasUnfinishedMerges());
}
// The commit graph is as follows:
// (root)
// / | \
// (A) (B) (C)
// | X \ /
// |/ \ (E)
// (D) \ |
// (F)
// (D) and (F) are both heads, with (D) containing the changes (A) and (B), and
// (F) containing (A), (B), (C). This should merge to the content of (F) without
// invoking the conflict resolver.
TEST_F(MergeResolverTest, MergeSubsets) {
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "a"));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "b"));
storage::CommitId commit_c =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "c"));
storage::CommitId commit_d =
CreateMergeCommit(commit_a, commit_b, AddKeyValueToJournal("k", "d"));
storage::CommitId commit_e =
CreateMergeCommit(commit_b, commit_c, AddKeyValueToJournal("k", "e"));
storage::CommitId commit_f =
CreateMergeCommit(commit_a, commit_e, AddKeyValueToJournal("k", "f"));
RunLoopUntilIdle();
// Set a merge strategy to check that no merge is requested
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Verify that the strategy has not been called
EXPECT_FALSE(strategy_ptr->callback);
// Verify there is only one head with the content of commit F
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_EQ(1u, commits.size());
bool called;
std::unique_ptr<const storage::Commit> commitptr_f;
page_storage_->GetCommit(
commit_f, callback::Capture(callback::SetWhenCalled(&called), &status,
&commitptr_f));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_TRUE(commitptr_f);
EXPECT_EQ(commits[0]->GetRootIdentifier(), commitptr_f->GetRootIdentifier());
}
// Check that two equivalent commits are merged to a commit with the content of
// one of the two. The commit graph is as follows:
// (root)
// | |
// (A) (B)
// | \/ |
// | /\ |
// (C) (D)
TEST_F(MergeResolverTest, MergeEquivalents) {
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "a"));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "b"));
storage::CommitId commit_c =
CreateMergeCommit(commit_a, commit_b, AddKeyValueToJournal("k", "c"));
storage::CommitId commit_d =
CreateMergeCommit(commit_a, commit_b, AddKeyValueToJournal("k", "d"));
RunLoopUntilIdle();
// Set a merge strategy to check that no merge is requested
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Verify that the strategy has not been called
EXPECT_FALSE(strategy_ptr->callback);
// Verify there is only one head with the content of commit C or D
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_EQ(1u, commits.size());
bool called;
std::unique_ptr<const storage::Commit> commitptr_c;
page_storage_->GetCommit(
commit_c, callback::Capture(callback::SetWhenCalled(&called), &status,
&commitptr_c));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_TRUE(commitptr_c);
std::unique_ptr<const storage::Commit> commitptr_d;
page_storage_->GetCommit(
commit_d, callback::Capture(callback::SetWhenCalled(&called), &status,
&commitptr_d));
RunLoopUntilIdle();
ASSERT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_TRUE(commitptr_d);
EXPECT_THAT(commits[0]->GetRootIdentifier(),
AnyOf(Eq(commitptr_c->GetRootIdentifier()),
Eq(commitptr_d->GetRootIdentifier())));
}
// Tests that already existing merges are used
// The commit graph is:
// In this test, the commits have the following structure:
// (root)
// / \
// (A) (B)
// | \ / |
// (C) \/ (D)
// | /\ |
// | / \ |
// (E) (F)
// | (G)
// | /
// (H)
// and (G) is a merge of (A) and (B)
// Then merging (F) and (H) should be done using (G) as a base.
TEST_F(MergeResolverTest, ReuseExistingMerge) {
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "a"));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "b"));
storage::CommitId commit_c =
CreateCommit(commit_a, AddKeyValueToJournal("k", "c"));
storage::CommitId commit_d =
CreateCommit(commit_b, AddKeyValueToJournal("k", "d"));
storage::CommitId commit_e =
CreateMergeCommit(commit_c, commit_b, AddKeyValueToJournal("k", "e"));
storage::CommitId commit_f =
CreateMergeCommit(commit_a, commit_d, AddKeyValueToJournal("k", "f"));
storage::CommitId commit_g =
CreateMergeCommit(commit_a, commit_b, AddKeyValueToJournal("k", "g"));
// commit (H) is necessary because otherwise (G) is a head
storage::CommitId commit_h =
CreateMergeCommit(commit_e, commit_g, AddKeyValueToJournal("k", "h"));
RunLoopUntilIdle();
// Set a merge strategy.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// The merge strategy is called once to merge E and F with G as a base
ASSERT_TRUE(strategy_ptr->callback);
EXPECT_EQ(commit_g, strategy_ptr->ancestor->GetId());
EXPECT_THAT((std::vector<storage::CommitId>{strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId()}),
UnorderedElementsAre(commit_f, commit_h));
// Create the merge
CreateMergeCommit(strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId(),
AddKeyValueToJournal("k", "merge"));
strategy_ptr->callback(storage::Status::OK);
RunLoopUntilIdle();
// There is only one head now
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ(1u, commits.size());
}
// Tests that recursive merge work correctly: they terminate and produce a
// commit that integrates each change once.
// The commit graph is the following:
// (root)
// / | \
// (A) (B) (C)
// | \/ \/ |
// (D)/\ /\(E)
// |/ X \|
// (F) / \ (G)
// | / \ |
// (H) (I)
// Then a merge of (H) and (I) will use (A), (B), (C) as a base.
// The merge can proceed in different ways, but will always call the strategy 3
// times. The conflict resolver computes left+right-base on sets represented as
// strings. The final state should be equivalent to "abcde".
TEST_F(MergeResolverTest, RecursiveMerge) {
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "a"));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "b"));
storage::CommitId commit_c =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "c"));
storage::CommitId commit_d =
CreateCommit(commit_a, AddKeyValueToJournal("k", "ad"));
storage::CommitId commit_e =
CreateCommit(commit_c, AddKeyValueToJournal("k", "ce"));
storage::CommitId commit_f =
CreateMergeCommit(commit_b, commit_d, AddKeyValueToJournal("k", "abd"));
storage::CommitId commit_g =
CreateMergeCommit(commit_b, commit_e, AddKeyValueToJournal("k", "bce"));
storage::CommitId commit_h =
CreateMergeCommit(commit_f, commit_c, AddKeyValueToJournal("k", "abcd"));
storage::CommitId commit_i =
CreateMergeCommit(commit_a, commit_g, AddKeyValueToJournal("k", "abce"));
RunLoopUntilIdle();
// Set up a merge strategy.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Do three merges, merging values as sets.
for (int i = 0; i < 3; i++) {
EXPECT_TRUE(strategy_ptr->callback);
if (strategy_ptr->callback) {
MergeCommitsAsSets(*strategy_ptr->head_1, *strategy_ptr->head_2,
*strategy_ptr->ancestor);
strategy_ptr->callback(storage::Status::OK);
}
strategy_ptr->callback = nullptr;
RunLoopUntilIdle();
}
EXPECT_FALSE(strategy_ptr->callback);
std::vector<std::unique_ptr<const storage::Commit>> commits;
storage::Status status = page_storage_->GetHeadCommits(&commits);
EXPECT_EQ(storage::Status::OK, status);
ASSERT_EQ(1u, commits.size());
// Check the value of k in the commit.
EXPECT_EQ("abcde", GetKeyOrEmpty(*commits[0], "k"));
}
// Check that merges are done in timestamp order: in a merge with three bases,
// the two commits with highest timestamp are used first. The commit graph is
// the following: we add the commits (U) and (V) to ensure that (B) and (C) have
// a higher generation than (A), so we can detect if merging is done in
// generation order instead of timestamp order.
// (root)
// / | \
// | (U) (V)
// | | |
// (A) (B) (C)
// | \/ \/ |
// (D)/\ /\(E)
// |/ X \|
// (F) / \ (G)
// | / \ |
// (H) (I)
// We do not test the order of subsequent merges.
TEST_F(MergeResolverTest, RecursiveMergeOrder) {
storage::CommitId commit_u =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "u"));
storage::CommitId commit_v =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "v"));
// Commit a, b and c can be done in any order.
storage::CommitId commit_b =
CreateCommit(commit_u, AddKeyValueToJournal("k", "bu"));
// Ensure time advances between the commits
RunLoopFor(zx::duration(1));
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "a"));
RunLoopFor(zx::duration(1));
storage::CommitId commit_c =
CreateCommit(commit_v, AddKeyValueToJournal("k", "cv"));
storage::CommitId commit_d =
CreateCommit(commit_a, AddKeyValueToJournal("k", "ad"));
storage::CommitId commit_e =
CreateCommit(commit_c, AddKeyValueToJournal("k", "cev"));
storage::CommitId commit_f =
CreateMergeCommit(commit_b, commit_d, AddKeyValueToJournal("k", "abdu"));
storage::CommitId commit_g =
CreateMergeCommit(commit_b, commit_e, AddKeyValueToJournal("k", "bcev"));
storage::CommitId commit_h = CreateMergeCommit(
commit_f, commit_c, AddKeyValueToJournal("k", "abcduv"));
storage::CommitId commit_i = CreateMergeCommit(
commit_a, commit_g, AddKeyValueToJournal("k", "abceuv"));
RunLoopUntilIdle();
// Set up a merge strategy
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Inspect the first merge. It should be between b and a.
ASSERT_TRUE(strategy_ptr->callback);
EXPECT_EQ(storage::kFirstPageCommitId, strategy_ptr->ancestor->GetId());
EXPECT_EQ(strategy_ptr->head_1->GetId(), commit_b);
EXPECT_EQ(strategy_ptr->head_2->GetId(), commit_a);
}
// Checks that last-one-wins picks up changes in the right order for recursive
// merges. When doing recursive merges, the set of commits to be merged is to
// construct the base is known in advance, so the order should be completely
// determinstic: keys coming from newer commits always win against older
// commits, even with intermediate merges.
//
// The commit graph is the following. The goal is to observe the construction of
// the merge base (we are not interested in the final merge), so we construct
// commits (H) and (I) whose set of common ancestors is {(A), (B), (C)}.
//
// (root)
// / | \
// (A) (B) (C)
// | \/ \/ |
// (D)/\ /\(E)
// |/ X \|
// (F) / \ (G)
// | / \ |
// (H) (I)
//
// The merge can proceed in different ways: they may be intervening merges that
// are done without calling the conflict resolver because one commit contains a
// subset of the changes of the other. This test only checks the merges that
// involve the conflict resolver. There are three such merges: one between A and
// B, one between a merge of A and B, and C, and one between commits equivalent
// to H and I.
//
// At the time of writing this comment, the actual sequence of merges is the
// following (assuming D < E in timestamp order):
// - Try to merge H and I. The set of ancestors is {A, B, C}
// - Merge A and B to J, calling the LastOneWinsStrategy
// - Try to merge J and H (they are the two oldest heads). This is an automatic
// merge to K, with the same content as H.
// - Try to merge K and I. The set of ancestors is still {A, B, C}
// - A and B are already merged to J
// - Merge J and C to L, calling the LastOneWinsStrategy
// - Try to merge K and L. This is an automatic merge to M, with the same
// content as H.
// - Try to merge M and I. The set of ancestors is {A, B, C}
// - A and B are already merged to J
// - J and C are already merged to L.
// - Merge M and I (identical to H and I) with ancestor L.
TEST_F(MergeResolverTest, RecursiveMergeLastOneWins) {
// Ensure that A, B, C are in chronological order
// We insert a key k1 in A, B and C. The value in C should win.
// We also insert a key k2 in A and B. If A and C are merged first, the value
// in A will be "refreshed" and be considered as recent as C, and will win
// against the value in B. We check that this does not happen.
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, [this](auto journal) {
AddKeyValueToJournal("k1", "a")(journal);
AddKeyValueToJournal("k2", "a")(journal);
});
RunLoopFor(zx::duration(1));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, [this](auto journal) {
AddKeyValueToJournal("k1", "b")(journal);
AddKeyValueToJournal("k2", "b")(journal);
});
RunLoopFor(zx::duration(1));
storage::CommitId commit_c = CreateCommit(storage::kFirstPageCommitId,
AddKeyValueToJournal("k1", "c"));
// Build the rest of the graph. We add values to generate changes.
storage::CommitId commit_d =
CreateCommit(commit_a, AddKeyValueToJournal("k", "d"));
storage::CommitId commit_e =
CreateCommit(commit_c, AddKeyValueToJournal("k", "e"));
storage::CommitId commit_f =
CreateMergeCommit(commit_b, commit_d, [](auto journal) {});
storage::CommitId commit_g =
CreateMergeCommit(commit_b, commit_e, [](auto journal) {});
storage::CommitId commit_h =
CreateMergeCommit(commit_f, commit_c, [](auto journal) {});
storage::CommitId commit_i =
CreateMergeCommit(commit_a, commit_g, [](auto journal) {});
RunLoopUntilIdle();
// Set up a merge strategy.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
// Set up a last one wins strategy to forward merges to.
LastOneWinsMergeStrategy last_one_wins_strategy;
// Do two merges using last-one-wins. Check that they are merges of A and B
// (generating a commit AB whose id we cannot recover), then of AB and C.
RunLoopUntilIdle();
ASSERT_TRUE(strategy_ptr->callback);
EXPECT_EQ(strategy_ptr->head_1->GetId(), commit_a);
EXPECT_EQ(strategy_ptr->head_2->GetId(), commit_b);
EXPECT_EQ(strategy_ptr->ancestor->GetId(), storage::kFirstPageCommitId);
strategy_ptr->Forward(&last_one_wins_strategy);
RunLoopUntilIdle();
ASSERT_TRUE(strategy_ptr->callback);
EXPECT_EQ(strategy_ptr->head_2->GetId(), commit_c);
EXPECT_EQ(strategy_ptr->ancestor->GetId(),
storage::kFirstPageCommitId.ToString());
// Check that the first head for the second merge holds the correct values.
EXPECT_EQ("b", GetKeyOrEmpty(*strategy_ptr->head_1, "k1"));
EXPECT_EQ("b", GetKeyOrEmpty(*strategy_ptr->head_1, "k2"));
strategy_ptr->Forward(&last_one_wins_strategy);
// Inspect the last merge: its base is the merge of A, B and C.
RunLoopUntilIdle();
ASSERT_TRUE(strategy_ptr->callback);
// Check if the ancestor is the one we expect.
EXPECT_EQ("c", GetKeyOrEmpty(*strategy_ptr->ancestor, "k1"));
EXPECT_EQ("b", GetKeyOrEmpty(*strategy_ptr->ancestor, "k2"));
}
// Identical change commits should not be considered equivalent.
// This creates two commits with identical contents, and check that the conflict
// resolver is called anyway.
TEST_F(MergeResolverTest, DoNotAutoMergeIdenticalCommits) {
storage::CommitId commit_a =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "v"));
storage::CommitId commit_b =
CreateCommit(storage::kFirstPageCommitId, AddKeyValueToJournal("k", "v"));
// Set up a merge strategy.
MergeResolver resolver([] {}, &environment_, page_storage_.get(),
std::make_unique<backoff::TestBackoff>());
std::unique_ptr<RecordingTestStrategy> strategy =
std::make_unique<RecordingTestStrategy>();
auto strategy_ptr = strategy.get();
resolver.SetMergeStrategy(std::move(strategy));
RunLoopUntilIdle();
// Inspect the first merge
ASSERT_TRUE(strategy_ptr->callback);
EXPECT_EQ(storage::kFirstPageCommitId, strategy_ptr->ancestor->GetId());
EXPECT_THAT((std::vector<storage::CommitId>{strategy_ptr->head_1->GetId(),
strategy_ptr->head_2->GetId()}),
UnorderedElementsAre(commit_a, commit_b));
}
} // namespace
} // namespace ledger