[modular][storymodel] Un-flake ledger_story_model_storage_unittest.

The old test was flaky because it depended on the Ledger notifying the
StoryModelStorage of changes to establish what would be the expected
value of a later read from the Ledger, but it did not wait on a
condition that guaranteed the Ledger had notified it. This created a
race (against TIME!).

TEST=ledger_story_model_storage_unittest

MF-165 #done [modular][storymodel] Un-flake ledger_story_model_storage_unittest.
MF-165 #comment

Change-Id: I0d515609374922692caf2fc46491e52d7354a158
diff --git a/bin/sessionmgr/story/model/BUILD.gn b/bin/sessionmgr/story/model/BUILD.gn
index 8941256..101721f 100644
--- a/bin/sessionmgr/story/model/BUILD.gn
+++ b/bin/sessionmgr/story/model/BUILD.gn
@@ -17,8 +17,7 @@
   deps = [
     ":apply_mutations_unittest",
     ":story_model_owner_unittest",
-
-    # DISALBED: FLAKY ":ledger_story_model_storage_unittest",
+    ":ledger_story_model_storage_unittest",
   ]
 }
 
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage.cc b/bin/sessionmgr/story/model/ledger_story_model_storage.cc
index d91fa0f..25e3135 100644
--- a/bin/sessionmgr/story/model/ledger_story_model_storage.cc
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage.cc
@@ -7,6 +7,7 @@
 #include <lib/fit/bridge.h>
 #include <lib/fit/promise.h>
 #include <vector>
+#include <string>
 
 #include "lib/fidl/cpp/object_coding.h"  // for EncodeObject()/DecodeObject()
 #include "lib/fsl/vmo/vector.h"
@@ -70,54 +71,89 @@
 
 // Helper functions to support OnPageChange() and OnPageDelete().
 namespace {
-// Returns a list of StoryModelMutation objects that, when applied to a
+// Appends to |commands| StoryModelMutation objects that, when applied to a
 // StoryModel, reflect the device state in |device_state_bytes|.
-std::vector<StoryModelMutation> GenerateObservedMutationsForDeviceState(
-    std::vector<uint8_t> device_state_bytes) {
+void GenerateObservedMutationsForDeviceState(
+    std::vector<uint8_t> device_state_bytes,
+    std::vector<StoryModelMutation>* commands) {
   StoryModel model;
   DecodeFromStorage(std::move(device_state_bytes), &model);
 
-  std::vector<StoryModelMutation> commands;
   if (model.has_runtime_state()) {
-    commands.resize(commands.size() + 1);
-    commands.back().set_set_runtime_state(*model.runtime_state());
+    commands->resize(commands->size() + 1);
+    commands->back().set_set_runtime_state(*model.runtime_state());
   }
   if (model.has_visibility_state()) {
-    commands.resize(commands.size() + 1);
-    commands.back().set_set_visibility_state(*model.visibility_state());
+    commands->resize(commands->size() + 1);
+    commands->back().set_set_visibility_state(*model.visibility_state());
   }
-  return commands;
 }
 
-std::vector<StoryModelMutation> GenerateObservedMutationsForDeviceState(
-    const fuchsia::mem::Buffer& buffer) {
+void GenerateObservedMutationsForDeviceState(
+    const fuchsia::mem::Buffer& buffer,
+    std::vector<StoryModelMutation>* commands) {
   std::vector<uint8_t> bytes;
   FXL_CHECK(fsl::VectorFromVmo(buffer, &bytes));
-  return GenerateObservedMutationsForDeviceState(std::move(bytes));
+  GenerateObservedMutationsForDeviceState(std::move(bytes), commands);
 }
 }  // namespace
 
-void LedgerStoryModelStorage::OnPageChange(const std::string& key,
-                                           fuchsia::mem::BufferPtr value) {
-  // TODO(MF-157): PageClient breaks a single change notification for multiple
-  // keys into one call to OnPageChange() per key. This breaks the semantic
-  // meaning of a single transaction. This, like OnPageConflict(), should be
-  // changed to preserve the transaction.
-  if (key == MakeDeviceKey(device_id_)) {
-    FXL_CHECK(value) << key;
-    // Read the value and generate equivalent StoryModelMutation commands.
-    Observe(GenerateObservedMutationsForDeviceState(*value));
-  } else if (key.find(kDeviceKeyPrefix) == 0) {
-    // This is device data from another device!
-    // TODO(thatguy): Store it in the local StoryModel when we care about
-    // observing these data.
-  } else {
-    FXL_LOG(FATAL) << "LedgerStoryModelStorage::OnPageChange(): key " << key
-                   << " unexpected in the Ledger.";
+void LedgerStoryModelStorage::OnChange(
+    fuchsia::ledger::PageChange page_change,
+    fuchsia::ledger::ResultState result_state, OnChangeCallback callback) {
+  switch (result_state) {
+    case fuchsia::ledger::ResultState::COMPLETED:
+      ProcessCompletePageChange(std::move(page_change));
+      break;
+    case fuchsia::ledger::ResultState::PARTIAL_STARTED:
+      partial_page_change_ = fuchsia::ledger::PageChange();
+      // Continue to merging of values with |partial_page_change_|.
+    case fuchsia::ledger::ResultState::PARTIAL_CONTINUED:
+    case fuchsia::ledger::ResultState::PARTIAL_COMPLETED:
+      // Merge the entries in |page_change| with |partial_page_change_|.
+      partial_page_change_.changed_entries.insert(
+          partial_page_change_.changed_entries.end(),
+          std::make_move_iterator(page_change.changed_entries.begin()),
+          std::make_move_iterator(page_change.changed_entries.end()));
+      partial_page_change_.deleted_keys.insert(
+          partial_page_change_.deleted_keys.end(),
+          std::make_move_iterator(page_change.deleted_keys.begin()),
+          std::make_move_iterator(page_change.deleted_keys.end()));
+
+      if (result_state == fuchsia::ledger::ResultState::PARTIAL_COMPLETED) {
+        ProcessCompletePageChange(std::move(partial_page_change_));
+      }
   }
+
+  callback(nullptr /* snapshot request */);
 }
 
-void LedgerStoryModelStorage::OnPageDelete(const std::string& key) {}
+void LedgerStoryModelStorage::ProcessCompletePageChange(
+    fuchsia::ledger::PageChange page_change) {
+  std::vector<StoryModelMutation> commands;
+
+  for (auto& entry : page_change.changed_entries) {
+    auto key = to_string(entry.key);
+    if (key == MakeDeviceKey(device_id_)) {
+      FXL_CHECK(entry.value)
+          << key << ": This key should never be deleted. Something is wrong.";
+      // Read the value and generate equivalent StoryModelMutation commands.
+      GenerateObservedMutationsForDeviceState(std::move(*entry.value),
+                                              &commands);
+    } else if (key.find(kDeviceKeyPrefix) == 0) {
+      // This is device data from another device!
+      // TODO(thatguy): Store it in the local StoryModel when we care about
+      // observing these data.
+    } else {
+      FXL_LOG(FATAL) << "LedgerStoryModelStorage::OnPageChange(): key " << key
+                     << " unexpected in the Ledger.";
+    }
+  }
+
+  Observe(std::move(commands));
+
+  // Don't do anything with deleted keys at this time.
+}
 
 void LedgerStoryModelStorage::OnPageConflict(Conflict* conflict) {
   // The default merge policy in LedgerClient is LEFT, meaning whatever value
@@ -258,12 +294,8 @@
                                  const std::unique_ptr<std::vector<uint8_t>>&
                                      device_state_bytes) {
                      if (device_state_bytes) {
-                       auto commands = GenerateObservedMutationsForDeviceState(
-                           std::move(*device_state_bytes));
-                       state->commands.insert(
-                           state->commands.end(),
-                           std::make_move_iterator(commands.begin()),
-                           std::make_move_iterator(commands.end()));
+                       GenerateObservedMutationsForDeviceState(
+                           std::move(*device_state_bytes), &state->commands);
                      }
                      return fit::ok();
                    });
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage.h b/bin/sessionmgr/story/model/ledger_story_model_storage.h
index 05fcdf1..690bafd 100644
--- a/bin/sessionmgr/story/model/ledger_story_model_storage.h
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage.h
@@ -18,8 +18,8 @@
 
 class LedgerClient;
 
-// LedgerStoryModelStorage writes a StoryModel into a Ledger Page instance. It partitions
-// the StoryModel into two sections:
+// LedgerStoryModelStorage writes a StoryModel into a Ledger Page instance. It
+// partitions the StoryModel into two sections:
 //
 // 1) Values that are scoped to this device (such as the Story's runtime state)
 // 2) Values that are shared among all devices (such as the list of mod URLs)
@@ -37,12 +37,10 @@
   ~LedgerStoryModelStorage() override;
 
  private:
-  // |PageClient|
-  void OnPageChange(const std::string& key,
-                    fuchsia::mem::BufferPtr value) override;
-
-  // |PageClient|
-  void OnPageDelete(const std::string& key) override;
+  // |PageWatcher|
+  void OnChange(fuchsia::ledger::PageChange page,
+                fuchsia::ledger::ResultState result_state,
+                OnChangeCallback callback) override;
 
   // |PageClient|
   void OnPageConflict(Conflict* conflict) override;
@@ -58,8 +56,18 @@
       std::vector<fuchsia::modular::storymodel::StoryModelMutation> commands)
       override;
 
+  // Called from OnChange().
+  void ProcessCompletePageChange(fuchsia::ledger::PageChange page_change);
+
   const std::string device_id_;
 
+  // For very large changes to the Ledger page, OnChange() may be called
+  // multiple times, each time with a partial representation of the change. The
+  // changes are accumulated in |partial_page_change_| until OnChange() is
+  // called with the final set (where |result_state| ==
+  // ResultState::PARTIAL_COMPLETE).
+  fuchsia::ledger::PageChange partial_page_change_;
+
   // With |scope_| is destroyed (which is when |this| is destructed), all
   // fit::promises created in Mutate() will be abandoned. This is important
   // because those promises capture |this| in their handler functions.
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc b/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc
index ff895d5..b5f4db1 100644
--- a/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc
@@ -187,9 +187,7 @@
     commands[1].set_set_visibility_state(StoryVisibilityState::IMMERSIVE);
     // TODO(thatguy): As we add more StoryModelMutations, add more lines here.
     executor.schedule_task(storage->Execute(std::move(commands)));
-    bool done{false};
-    executor.schedule_task(storage->Flush().and_then([&] { done = true; }));
-    RunLoopUntil([&] { return done; });
+    RunLoopUntilNumMutationsObserved(observed_mutations, 1);
     expected_model = std::move(*observed_model);
   }