[ledger] Maintain the order of insertions/deletions in merge journals.
Test: MergingIntegrationTest.CustomConflictResolutionMergeValuesOrder
LE-691 #done
ConflictResolverClient was putting all deletions before insertions in
the merge journal. After this CL, operations in the journal match the
order of merged_values given to the resolver.
Change-Id: I0e4452165e0845f87cafd2117513d294a7cc1af8
diff --git a/peridot/bin/ledger/app/merging/conflict_resolver_client.cc b/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
index be2674e..9197d2a 100644
--- a/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
+++ b/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
@@ -98,17 +98,18 @@
}
}
-void ConflictResolverClient::OnNextMergeResult(
+void ConflictResolverClient::GetOrCreateObjectIdetifier(
const MergedValue& merged_value,
- const fxl::RefPtr<
- callback::Waiter<storage::Status, storage::ObjectIdentifier>>& waiter) {
+ fit::function<void(storage::Status, storage::ObjectIdentifier)> callback) {
+ FXL_DCHECK(merged_value.source == ValueSource::RIGHT ||
+ merged_value.source == ValueSource::NEW);
switch (merged_value.source) {
case ValueSource::RIGHT: {
std::string key = convert::ToString(merged_value.key);
storage_->GetEntryFromCommit(
*right_, key,
- [key, callback = waiter->NewCallback()](storage::Status status,
- storage::Entry entry) {
+ [key, callback = std::move(callback)](storage::Status status,
+ storage::Entry entry) {
if (status != storage::Status::OK) {
if (status == storage::Status::NOT_FOUND) {
FXL_LOG(ERROR)
@@ -127,22 +128,21 @@
storage_->AddObjectFromLocal(storage::ObjectType::BLOB,
storage::DataSource::Create(std::move(
merged_value.new_value->bytes())),
- waiter->NewCallback());
+ std::move(callback));
} else {
storage::ObjectIdentifier object_identifier;
Status status = manager_->ResolveReference(
std::move(merged_value.new_value->reference()), &object_identifier);
if (status != Status::OK) {
- waiter->NewCallback()(storage::Status::NOT_FOUND, {});
+ callback(storage::Status::NOT_FOUND, {});
return;
}
- waiter->NewCallback()(storage::Status::OK,
- std::move(object_identifier));
+ callback(storage::Status::OK, std::move(object_identifier));
}
break;
}
case ValueSource::DELETE: {
- journal_->Delete(merged_value.key);
+ // No object identifier to retrieve for deletions.
break;
}
}
@@ -275,7 +275,9 @@
callback::Waiter<storage::Status, storage::ObjectIdentifier>>(
storage::Status::OK);
for (const MergedValue& merged_value : merged_values) {
- OnNextMergeResult(merged_value, waiter);
+ if (merged_value.source != ValueSource::DELETE) {
+ GetOrCreateObjectIdetifier(merged_value, waiter->NewCallback());
+ }
}
waiter->Finalize(
[this, weak_this, merged_values = std::move(merged_values),
@@ -286,21 +288,21 @@
return;
}
- size_t j = 0;
- for (size_t i = 0; i < merged_values.size(); ++i) {
- // |object_identifiers| contains only the identifiers of objects
- // that have been inserted. Deletions have already been handled.
- // TODO(LE-691): Maintain the order of insertions/deletions.
- if (merged_values[i].source == ValueSource::DELETE) {
- continue;
+ // |object_identifiers| contains only the identifiers of objects
+ // that have been inserted.
+ size_t i = 0;
+ for (const MergedValue& merged_value : merged_values) {
+ if (merged_value.source == ValueSource::DELETE) {
+ journal_->Delete(merged_value.key);
+ } else {
+ journal_->Put(merged_value.key, object_identifiers[i],
+ merged_value.priority == Priority::EAGER
+ ? storage::KeyPriority::EAGER
+ : storage::KeyPriority::LAZY);
+ ++i;
}
- journal_->Put(merged_values[i].key, object_identifiers[j],
- merged_values[i].priority == Priority::EAGER
- ? storage::KeyPriority::EAGER
- : storage::KeyPriority::LAZY);
- ++j;
}
- FXL_DCHECK(j == object_identifiers.size());
+ FXL_DCHECK(i == object_identifiers.size());
callback(Status::OK);
});
});
diff --git a/peridot/bin/ledger/app/merging/conflict_resolver_client.h b/peridot/bin/ledger/app/merging/conflict_resolver_client.h
index cd4f123..2047999 100644
--- a/peridot/bin/ledger/app/merging/conflict_resolver_client.h
+++ b/peridot/bin/ledger/app/merging/conflict_resolver_client.h
@@ -41,15 +41,12 @@
void Cancel();
private:
- // Handles the next merge value. If the |merged_value| is an insertion, i.e.
- // if the source of |merged_value| is |RIGHT| or |NEW|, a new callback will be
- // added on the waiter, that will be called with the object identifier of the
- // inserted object. Otherwise, if the |merged_value| is a deletion, it will
- // directly update the current journal.
- void OnNextMergeResult(
+ // Gets or creates the object identifier associated to the given
+ // |merge_value|. This method can only be called on merge values whose source
+ // is either |NEW| or |RIGHT|.
+ void GetOrCreateObjectIdetifier(
const MergedValue& merged_value,
- const fxl::RefPtr<callback::Waiter<storage::Status,
- storage::ObjectIdentifier>>& waiter);
+ fit::function<void(storage::Status, storage::ObjectIdentifier)> callback);
// Rolls back journal, closes merge result provider and invokes callback_ with
// |status|. This method must be called at most once.
diff --git a/peridot/bin/ledger/tests/integration/merging_tests.cc b/peridot/bin/ledger/tests/integration/merging_tests.cc
index 545be288..6fd0c08 100644
--- a/peridot/bin/ledger/tests/integration/merging_tests.cc
+++ b/peridot/bin/ledger/tests/integration/merging_tests.cc
@@ -841,6 +841,124 @@
EXPECT_EQ("phone", convert::ExtendedStringView(final_entries[2].key));
}
+TEST_P(MergingIntegrationTest, CustomConflictResolutionMergeValuesOrder) {
+ auto instance = NewLedgerAppInstance();
+ ConflictResolverFactoryPtr resolver_factory_ptr;
+ auto resolver_factory = std::make_unique<TestConflictResolverFactory>(
+ this, MergePolicy::CUSTOM, resolver_factory_ptr.NewRequest(), nullptr);
+ LedgerPtr ledger_ptr = instance->GetTestLedger();
+ auto waiter = NewWaiter();
+ Status status;
+ ledger_ptr->SetConflictResolverFactory(
+ std::move(resolver_factory_ptr),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ PagePtr page1 = instance->GetTestPage();
+ waiter = NewWaiter();
+ PageId test_page_id;
+ page1->GetId(callback::Capture(waiter->GetCallback(), &test_page_id));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ PagePtr page2 =
+ instance->GetPage(fidl::MakeOptional(test_page_id), Status::OK);
+
+ waiter = NewWaiter();
+ page1->StartTransaction(callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+ waiter = NewWaiter();
+ page1->Put(convert::ToArray("name"), convert::ToArray("Alice"),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ waiter = NewWaiter();
+ page2->StartTransaction(callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+ waiter = NewWaiter();
+ page2->Put(convert::ToArray("email"), convert::ToArray("alice@example.org"),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ waiter = NewWaiter();
+ page1->Commit(callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+ waiter = NewWaiter();
+ page2->Commit(callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ resolver_factory->RunUntilNewConflictResolverCalled();
+
+ // We now have a conflict.
+ EXPECT_EQ(1u, resolver_factory->resolvers.size());
+ EXPECT_NE(
+ resolver_factory->resolvers.end(),
+ resolver_factory->resolvers.find(convert::ToString(test_page_id.id)));
+ ConflictResolverImpl* resolver_impl =
+ &(resolver_factory->resolvers.find(convert::ToString(test_page_id.id))
+ ->second);
+ resolver_impl->RunUntilResolveCalled();
+ ASSERT_EQ(1u, resolver_impl->requests.size());
+
+ std::vector<DiffEntry> changes;
+ ASSERT_TRUE(resolver_impl->requests[0].GetFullDiff(&changes));
+
+ EXPECT_EQ(2u, changes.size());
+ EXPECT_TRUE(ChangeMatch("email", Optional<std::string>(),
+ Optional<std::string>("alice@example.org"),
+ Optional<std::string>(), changes[0]));
+ EXPECT_TRUE(ChangeMatch("name", Optional<std::string>(),
+ Optional<std::string>(),
+ Optional<std::string>("Alice"), changes[1]));
+
+ // Common ancestor is empty.
+ PageSnapshotPtr snapshot = resolver_impl->requests[0].common_version.Bind();
+ auto entries = SnapshotGetEntries(this, &snapshot);
+ EXPECT_EQ(0u, entries.size());
+
+ // Prepare the merged values: Initially add, but then delete the entry with
+ // key "name".
+ std::vector<MergedValue> merged_values;
+ {
+ MergedValue merged_value;
+ merged_value.key = convert::ToArray("name");
+ merged_value.source = ValueSource::RIGHT;
+ merged_values.push_back(std::move(merged_value));
+ }
+ {
+ MergedValue merged_value;
+ merged_value.key = convert::ToArray("name");
+ merged_value.source = ValueSource::DELETE;
+ merged_values.push_back(std::move(merged_value));
+ }
+
+ // Watch for the change.
+ PageWatcherPtr watcher_ptr;
+ auto watcher_waiter = NewWaiter();
+ Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback());
+ PageSnapshotPtr snapshot2;
+ waiter = NewWaiter();
+ page1->GetSnapshot(snapshot2.NewRequest(), fidl::VectorPtr<uint8_t>::New(0),
+ std::move(watcher_ptr),
+ callback::Capture(waiter->GetCallback(), &status));
+ ASSERT_TRUE(waiter->RunUntilCalled());
+ EXPECT_EQ(Status::OK, status);
+
+ EXPECT_TRUE(resolver_impl->requests[0].Merge(std::move(merged_values)));
+
+ // Wait for the watcher to be called.
+ ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+
+ auto final_entries = SnapshotGetEntries(this, &watcher.last_snapshot_);
+ ASSERT_EQ(1u, final_entries.size());
+ EXPECT_EQ("email", convert::ExtendedStringView(final_entries[0].key));
+}
+
TEST_P(MergingIntegrationTest, CustomConflictResolutionGetDiffMultiPart) {
auto instance = NewLedgerAppInstance();
ConflictResolverFactoryPtr resolver_factory_ptr;