[ledger] Maintain the order of insertions/deletions in merge journals.

Test: MergingIntegrationTest.CustomConflictResolutionMergeValuesOrder
LE-691 #done

ConflictResolverClient was putting all deletions before insertions in
the merge journal. After this CL, operations in the journal match the
order of merged_values given to the resolver.

Change-Id: I0e4452165e0845f87cafd2117513d294a7cc1af8
diff --git a/peridot/bin/ledger/app/merging/conflict_resolver_client.cc b/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
index be2674e..9197d2a 100644
--- a/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
+++ b/peridot/bin/ledger/app/merging/conflict_resolver_client.cc
@@ -98,17 +98,18 @@
   }
 }
 
-void ConflictResolverClient::OnNextMergeResult(
+void ConflictResolverClient::GetOrCreateObjectIdetifier(
     const MergedValue& merged_value,
-    const fxl::RefPtr<
-        callback::Waiter<storage::Status, storage::ObjectIdentifier>>& waiter) {
+    fit::function<void(storage::Status, storage::ObjectIdentifier)> callback) {
+  FXL_DCHECK(merged_value.source == ValueSource::RIGHT ||
+             merged_value.source == ValueSource::NEW);
   switch (merged_value.source) {
     case ValueSource::RIGHT: {
       std::string key = convert::ToString(merged_value.key);
       storage_->GetEntryFromCommit(
           *right_, key,
-          [key, callback = waiter->NewCallback()](storage::Status status,
-                                                  storage::Entry entry) {
+          [key, callback = std::move(callback)](storage::Status status,
+                                                storage::Entry entry) {
             if (status != storage::Status::OK) {
               if (status == storage::Status::NOT_FOUND) {
                 FXL_LOG(ERROR)
@@ -127,22 +128,21 @@
         storage_->AddObjectFromLocal(storage::ObjectType::BLOB,
                                      storage::DataSource::Create(std::move(
                                          merged_value.new_value->bytes())),
-                                     waiter->NewCallback());
+                                     std::move(callback));
       } else {
         storage::ObjectIdentifier object_identifier;
         Status status = manager_->ResolveReference(
             std::move(merged_value.new_value->reference()), &object_identifier);
         if (status != Status::OK) {
-          waiter->NewCallback()(storage::Status::NOT_FOUND, {});
+          callback(storage::Status::NOT_FOUND, {});
           return;
         }
-        waiter->NewCallback()(storage::Status::OK,
-                              std::move(object_identifier));
+        callback(storage::Status::OK, std::move(object_identifier));
       }
       break;
     }
     case ValueSource::DELETE: {
-      journal_->Delete(merged_value.key);
+      // No object identifier to retrieve for deletions.
       break;
     }
   }
@@ -275,7 +275,9 @@
             callback::Waiter<storage::Status, storage::ObjectIdentifier>>(
             storage::Status::OK);
         for (const MergedValue& merged_value : merged_values) {
-          OnNextMergeResult(merged_value, waiter);
+          if (merged_value.source != ValueSource::DELETE) {
+            GetOrCreateObjectIdetifier(merged_value, waiter->NewCallback());
+          }
         }
         waiter->Finalize(
             [this, weak_this, merged_values = std::move(merged_values),
@@ -286,21 +288,21 @@
                 return;
               }
 
-              size_t j = 0;
-              for (size_t i = 0; i < merged_values.size(); ++i) {
-                // |object_identifiers| contains only the identifiers of objects
-                // that have been inserted. Deletions have already been handled.
-                // TODO(LE-691): Maintain the order of insertions/deletions.
-                if (merged_values[i].source == ValueSource::DELETE) {
-                  continue;
+              // |object_identifiers| contains only the identifiers of objects
+              // that have been inserted.
+              size_t i = 0;
+              for (const MergedValue& merged_value : merged_values) {
+                if (merged_value.source == ValueSource::DELETE) {
+                  journal_->Delete(merged_value.key);
+                } else {
+                  journal_->Put(merged_value.key, object_identifiers[i],
+                                merged_value.priority == Priority::EAGER
+                                    ? storage::KeyPriority::EAGER
+                                    : storage::KeyPriority::LAZY);
+                  ++i;
                 }
-                journal_->Put(merged_values[i].key, object_identifiers[j],
-                              merged_values[i].priority == Priority::EAGER
-                                  ? storage::KeyPriority::EAGER
-                                  : storage::KeyPriority::LAZY);
-                ++j;
               }
-              FXL_DCHECK(j == object_identifiers.size());
+              FXL_DCHECK(i == object_identifiers.size());
               callback(Status::OK);
             });
       });
diff --git a/peridot/bin/ledger/app/merging/conflict_resolver_client.h b/peridot/bin/ledger/app/merging/conflict_resolver_client.h
index cd4f123..2047999 100644
--- a/peridot/bin/ledger/app/merging/conflict_resolver_client.h
+++ b/peridot/bin/ledger/app/merging/conflict_resolver_client.h
@@ -41,15 +41,12 @@
   void Cancel();
 
  private:
-  // Handles the next merge value. If the |merged_value| is an insertion, i.e.
-  // if the source of |merged_value| is |RIGHT| or |NEW|, a new callback will be
-  // added on the waiter, that will be called with the object identifier of the
-  // inserted object. Otherwise, if the |merged_value| is a deletion, it will
-  // directly update the current journal.
-  void OnNextMergeResult(
+  // Gets or creates the object identifier associated to the given
+  // |merge_value|. This method can only be called on merge values whose source
+  // is either |NEW| or |RIGHT|.
+  void GetOrCreateObjectIdetifier(
       const MergedValue& merged_value,
-      const fxl::RefPtr<callback::Waiter<storage::Status,
-                                         storage::ObjectIdentifier>>& waiter);
+      fit::function<void(storage::Status, storage::ObjectIdentifier)> callback);
 
   // Rolls back journal, closes merge result provider and invokes callback_ with
   // |status|. This method must be called at most once.
diff --git a/peridot/bin/ledger/tests/integration/merging_tests.cc b/peridot/bin/ledger/tests/integration/merging_tests.cc
index 545be288..6fd0c08 100644
--- a/peridot/bin/ledger/tests/integration/merging_tests.cc
+++ b/peridot/bin/ledger/tests/integration/merging_tests.cc
@@ -841,6 +841,124 @@
   EXPECT_EQ("phone", convert::ExtendedStringView(final_entries[2].key));
 }
 
+TEST_P(MergingIntegrationTest, CustomConflictResolutionMergeValuesOrder) {
+  auto instance = NewLedgerAppInstance();
+  ConflictResolverFactoryPtr resolver_factory_ptr;
+  auto resolver_factory = std::make_unique<TestConflictResolverFactory>(
+      this, MergePolicy::CUSTOM, resolver_factory_ptr.NewRequest(), nullptr);
+  LedgerPtr ledger_ptr = instance->GetTestLedger();
+  auto waiter = NewWaiter();
+  Status status;
+  ledger_ptr->SetConflictResolverFactory(
+      std::move(resolver_factory_ptr),
+      callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  PagePtr page1 = instance->GetTestPage();
+  waiter = NewWaiter();
+  PageId test_page_id;
+  page1->GetId(callback::Capture(waiter->GetCallback(), &test_page_id));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  PagePtr page2 =
+      instance->GetPage(fidl::MakeOptional(test_page_id), Status::OK);
+
+  waiter = NewWaiter();
+  page1->StartTransaction(callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+  waiter = NewWaiter();
+  page1->Put(convert::ToArray("name"), convert::ToArray("Alice"),
+             callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  waiter = NewWaiter();
+  page2->StartTransaction(callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+  waiter = NewWaiter();
+  page2->Put(convert::ToArray("email"), convert::ToArray("alice@example.org"),
+             callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  waiter = NewWaiter();
+  page1->Commit(callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+  waiter = NewWaiter();
+  page2->Commit(callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  resolver_factory->RunUntilNewConflictResolverCalled();
+
+  // We now have a conflict.
+  EXPECT_EQ(1u, resolver_factory->resolvers.size());
+  EXPECT_NE(
+      resolver_factory->resolvers.end(),
+      resolver_factory->resolvers.find(convert::ToString(test_page_id.id)));
+  ConflictResolverImpl* resolver_impl =
+      &(resolver_factory->resolvers.find(convert::ToString(test_page_id.id))
+            ->second);
+  resolver_impl->RunUntilResolveCalled();
+  ASSERT_EQ(1u, resolver_impl->requests.size());
+
+  std::vector<DiffEntry> changes;
+  ASSERT_TRUE(resolver_impl->requests[0].GetFullDiff(&changes));
+
+  EXPECT_EQ(2u, changes.size());
+  EXPECT_TRUE(ChangeMatch("email", Optional<std::string>(),
+                          Optional<std::string>("alice@example.org"),
+                          Optional<std::string>(), changes[0]));
+  EXPECT_TRUE(ChangeMatch("name", Optional<std::string>(),
+                          Optional<std::string>(),
+                          Optional<std::string>("Alice"), changes[1]));
+
+  // Common ancestor is empty.
+  PageSnapshotPtr snapshot = resolver_impl->requests[0].common_version.Bind();
+  auto entries = SnapshotGetEntries(this, &snapshot);
+  EXPECT_EQ(0u, entries.size());
+
+  // Prepare the merged values: Initially add, but then delete the entry with
+  // key "name".
+  std::vector<MergedValue> merged_values;
+  {
+    MergedValue merged_value;
+    merged_value.key = convert::ToArray("name");
+    merged_value.source = ValueSource::RIGHT;
+    merged_values.push_back(std::move(merged_value));
+  }
+  {
+    MergedValue merged_value;
+    merged_value.key = convert::ToArray("name");
+    merged_value.source = ValueSource::DELETE;
+    merged_values.push_back(std::move(merged_value));
+  }
+
+  // Watch for the change.
+  PageWatcherPtr watcher_ptr;
+  auto watcher_waiter = NewWaiter();
+  Watcher watcher(watcher_ptr.NewRequest(), watcher_waiter->GetCallback());
+  PageSnapshotPtr snapshot2;
+  waiter = NewWaiter();
+  page1->GetSnapshot(snapshot2.NewRequest(), fidl::VectorPtr<uint8_t>::New(0),
+                     std::move(watcher_ptr),
+                     callback::Capture(waiter->GetCallback(), &status));
+  ASSERT_TRUE(waiter->RunUntilCalled());
+  EXPECT_EQ(Status::OK, status);
+
+  EXPECT_TRUE(resolver_impl->requests[0].Merge(std::move(merged_values)));
+
+  // Wait for the watcher to be called.
+  ASSERT_TRUE(watcher_waiter->RunUntilCalled());
+
+  auto final_entries = SnapshotGetEntries(this, &watcher.last_snapshot_);
+  ASSERT_EQ(1u, final_entries.size());
+  EXPECT_EQ("email", convert::ExtendedStringView(final_entries[0].key));
+}
+
 TEST_P(MergingIntegrationTest, CustomConflictResolutionGetDiffMultiPart) {
   auto instance = NewLedgerAppInstance();
   ConflictResolverFactoryPtr resolver_factory_ptr;