[sessionmgr][refactor] Implement Ledger-backed storage.

This stores only those fields in StoryModel in use right now:
runtime_state and visibility_state. They are stored in a device-local
prefix of the page.

Note that this will replace most if not all of StoryStorage entirely
once all of its data has been migrated into StoryModel.

TEST=ledger_story_model_storage_unittest
MF-152 #comment [sessionmgr][refactor] Implement Ledger-backed storage.

Change-Id: Ia96e581aaa633abb3b9c90e907be18dac1772e75
diff --git a/bin/sessionmgr/story/model/BUILD.gn b/bin/sessionmgr/story/model/BUILD.gn
index cb0518f..db4f5b1 100644
--- a/bin/sessionmgr/story/model/BUILD.gn
+++ b/bin/sessionmgr/story/model/BUILD.gn
@@ -17,6 +17,7 @@
   deps = [
     ":apply_mutations_unittest",
     ":story_model_owner_unittest",
+    ":ledger_story_model_storage_unittest",
   ]
 }
 
@@ -98,6 +99,47 @@
   ]
 }
 
+source_set("ledger_story_model_storage") {
+  sources = [
+    "ledger_story_model_storage.h",
+    "ledger_story_model_storage.cc",
+  ]
+
+  public_deps = [
+    "//peridot/public/fidl/fuchsia.modular.storymodel",
+  ]
+
+  deps = [
+    ":apply_mutations",
+    ":story_model_storage",
+    "//garnet/public/lib/fsl",
+    "//garnet/public/lib/fxl",
+    "//peridot/lib/ledger_client:page_client",
+    "//peridot/lib/ledger_client:promise",
+    "//peridot/lib/fidl:array_to_string",
+    "//peridot/public/fidl/fuchsia.ledger",
+  ]
+}
+
+executable("ledger_story_model_storage_unittest") {
+  testonly = true
+
+  sources = [
+    "ledger_story_model_storage_unittest.cc",
+  ]
+
+  deps = [
+    ":apply_mutations",
+    ":ledger_story_model_storage",
+    "testing",
+    "//garnet/public/lib/async_promise",
+    "//peridot/lib/ledger_client:page_client",
+    "//peridot/lib/testing:test_with_ledger",
+    "//third_party/googletest:gtest",
+    "//third_party/googletest:gtest_main",
+  ]
+}
+
 source_set("story_model_owner") {
   sources = [
     "story_model_owner.cc",
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage.cc b/bin/sessionmgr/story/model/ledger_story_model_storage.cc
new file mode 100644
index 0000000..d91fa0f
--- /dev/null
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage.cc
@@ -0,0 +1,379 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "peridot/bin/sessionmgr/story/model/ledger_story_model_storage.h"
+
+#include <lib/fit/bridge.h>
+#include <lib/fit/promise.h>
+#include <vector>
+
+#include "lib/fidl/cpp/object_coding.h"  // for EncodeObject()/DecodeObject()
+#include "lib/fsl/vmo/vector.h"
+#include "lib/fxl/logging.h"
+#include "peridot/bin/sessionmgr/story/model/apply_mutations.h"
+#include "peridot/lib/ledger_client/promise.h"  // for fit::promise<> wrappers
+
+using fuchsia::modular::storymodel::StoryModel;
+using fuchsia::modular::storymodel::StoryModelMutation;
+
+namespace modular {
+
+namespace {
+// Synopsis of Ledger page structure:
+//
+// storymodel/                        - base prefix for all data for this story
+//   device/<device id>               - key for device data for <device id>
+//   shared/                          - prefix for data shared across devices
+
+const char kStoryModelKeyPrefix[] = "storymodel/";
+const char kDeviceKeyPrefix[] = "storymodel/device/";
+// const char kSharedKeyPrefix[] = "shared/";
+
+std::string MakeDeviceKey(const std::string& device_id) {
+  return kDeviceKeyPrefix + device_id;
+}
+}  // namespace
+
+namespace {
+// Encodes a FIDL table into a byte representation safe for persisting to
+// storage.
+template <class T>
+std::vector<uint8_t> EncodeForStorage(T* table) {
+  std::vector<uint8_t> encoded;
+  // This can only fail if |table| contains handles. StoryModel and its fields
+  // do not.
+  FXL_CHECK(fidl::EncodeObject(table, &encoded, nullptr /* error_msg_out */) ==
+            ZX_OK);
+  return encoded;
+}
+
+// Decodes bytes encoded by EncodeForStorage() into their corresponding FIDL
+// table.
+template <class T>
+void DecodeFromStorage(std::vector<uint8_t> encoded, T* table) {
+  // DecodeObject() takes a non-const pointer, even though it doesn't
+  // modify the data.
+  FXL_CHECK(fidl::DecodeObject(encoded.data(), encoded.size(), table,
+                               nullptr /* error_msg_out */) == ZX_OK);
+}
+}  // namespace
+
+LedgerStoryModelStorage::LedgerStoryModelStorage(
+    LedgerClient* const ledger_client, fuchsia::ledger::PageId page_id,
+    std::string device_id)
+    : PageClient("LedgerStoryModelStorage", ledger_client, std::move(page_id),
+                 kStoryModelKeyPrefix),
+      device_id_(std::move(device_id)) {}
+
+LedgerStoryModelStorage::~LedgerStoryModelStorage() = default;
+
+// Helper functions to support OnPageChange() and OnPageDelete().
+namespace {
+// Returns a list of StoryModelMutation objects that, when applied to a
+// StoryModel, reflect the device state in |device_state_bytes|.
+std::vector<StoryModelMutation> GenerateObservedMutationsForDeviceState(
+    std::vector<uint8_t> device_state_bytes) {
+  StoryModel model;
+  DecodeFromStorage(std::move(device_state_bytes), &model);
+
+  std::vector<StoryModelMutation> commands;
+  if (model.has_runtime_state()) {
+    commands.resize(commands.size() + 1);
+    commands.back().set_set_runtime_state(*model.runtime_state());
+  }
+  if (model.has_visibility_state()) {
+    commands.resize(commands.size() + 1);
+    commands.back().set_set_visibility_state(*model.visibility_state());
+  }
+  return commands;
+}
+
+std::vector<StoryModelMutation> GenerateObservedMutationsForDeviceState(
+    const fuchsia::mem::Buffer& buffer) {
+  std::vector<uint8_t> bytes;
+  FXL_CHECK(fsl::VectorFromVmo(buffer, &bytes));
+  return GenerateObservedMutationsForDeviceState(std::move(bytes));
+}
+}  // namespace
+
+void LedgerStoryModelStorage::OnPageChange(const std::string& key,
+                                           fuchsia::mem::BufferPtr value) {
+  // TODO(MF-157): PageClient breaks a single change notification for multiple
+  // keys into one call to OnPageChange() per key. This breaks the semantic
+  // meaning of a single transaction. This, like OnPageConflict(), should be
+  // changed to preserve the transaction.
+  if (key == MakeDeviceKey(device_id_)) {
+    FXL_CHECK(value) << key;
+    // Read the value and generate equivalent StoryModelMutation commands.
+    Observe(GenerateObservedMutationsForDeviceState(*value));
+  } else if (key.find(kDeviceKeyPrefix) == 0) {
+    // This is device data from another device!
+    // TODO(thatguy): Store it in the local StoryModel when we care about
+    // observing these data.
+  } else {
+    FXL_LOG(FATAL) << "LedgerStoryModelStorage::OnPageChange(): key " << key
+                   << " unexpected in the Ledger.";
+  }
+}
+
+void LedgerStoryModelStorage::OnPageDelete(const std::string& key) {}
+
+void LedgerStoryModelStorage::OnPageConflict(Conflict* conflict) {
+  // The default merge policy in LedgerClient is LEFT, meaning whatever value
+  // was in the left branch for each key is taken.
+  //
+  // TODO(MF-157): LedgerClient breaks a single merge conflict for multiple
+  // keys into on OnPageConflict() call per key. For a more advanced conflict
+  // resolution policy, it is likely necessary to look at the conflict in full.
+}
+
+// Helper functions to support task construction in Execute().
+namespace {
+// Partitions |commands| into two vectors:
+//
+//   1) Those that mutate state that is device-local (ie, runtime state of the
+//   story)
+//
+//   2) Those that mutate state that is shared among all devices (ie, the set
+//   of mods)
+struct PartitionedCommands {
+  // These commands represent mutations that apply only to device-local state.
+  std::vector<StoryModelMutation> device_commands;
+  // And these apply to shared (cross-device) state.
+  std::vector<StoryModelMutation> shared_commands;
+};
+PartitionedCommands PartitionCommandsForDeviceAndShared(
+    std::vector<StoryModelMutation> commands) {
+  PartitionedCommands partitioned_commands;
+
+  for (auto& i : commands) {
+    switch (i.Which()) {
+      case StoryModelMutation::Tag::kSetRuntimeState:
+      case StoryModelMutation::Tag::kSetVisibilityState:
+        partitioned_commands.device_commands.push_back(std::move(i));
+        break;
+      case StoryModelMutation::Tag::Invalid:
+        FXL_LOG(FATAL) << "Encountered invalid StoryModelMutation.";
+        break;
+    }
+  }
+
+  return partitioned_commands;
+}
+
+// TODO(thatguy): Move these functions to ledger_client/promise.h
+
+// Reads the value in the given key and returns an object of type T. If |key|
+// does not have a value, returns a default-constructed T.
+template <class T>
+fit::promise<T> ReadObjectFromKey(fuchsia::ledger::PageSnapshot* snapshot,
+                                  const std::string& key) {
+  return PageSnapshotPromise::GetInline(snapshot, key)
+      .and_then([](const std::unique_ptr<std::vector<uint8_t>>& value) {
+        if (!value) {
+          return fit::ok(T());
+        }
+
+        T object;
+        DecodeFromStorage(std::move(*value), &object);
+        return fit::ok(std::move(object));
+      });
+}
+
+// Writes |value| to |key|.
+template <class T>
+fit::promise<> WriteObjectToKey(fuchsia::ledger::Page* page,
+                                const std::string& key, T value) {
+  auto bytes = EncodeForStorage(&value);
+  // TODO(thatguy): Calculate if this value is too big for a FIDL message.  If
+  // so, fall back on Page.CreateReferenceFromBuffer() and Page.PutReference().
+  return PagePromise::Put(page, key, std::move(bytes));
+}
+
+// Reads the latest device-local state, applies |commands| to it, and then
+// writes it back to the Ledger.
+//
+// Store all the device-local state under a single key, and re-use
+// a sparsely populated StoryModel table as our data structure for simplicity.
+//
+// The returned promise is resolved once calls to mutate the Page have
+// returned.
+fit::promise<> UpdateDeviceState(fuchsia::ledger::Page* page,
+                                 fuchsia::ledger::PageSnapshot* snapshot,
+                                 const std::string& device_id,
+                                 std::vector<StoryModelMutation> commands) {
+  // Task synopsis:
+  //
+  // 1) Read the current contents at |key| from the page snapshot.
+  // 2) Apply |commands| to those contents.
+  // 3) Write the new contents back to |key|.
+  auto key = MakeDeviceKey(device_id);
+  return ReadObjectFromKey<StoryModel>(snapshot, key)
+      .and_then([page, key, commands = std::move(commands)](
+                    const StoryModel& current_value) {
+        auto new_value = ApplyMutations(current_value, commands);
+        return WriteObjectToKey(page, key, std::move(new_value));
+      });
+}
+
+// Updates the shared state section of the ledger based on |commands|.
+//
+// The returned promise is resolved once calls to mutate the Page have
+// returned.
+fit::promise<> UpdateSharedState(fuchsia::ledger::Page* page,
+                                 fuchsia::ledger::PageSnapshot* snapshot,
+                                 std::vector<StoryModelMutation> commands) {
+  // There is no shared state yet.
+  return fit::make_promise([] { return fit::ok(); });
+}
+}  // namespace
+
+fit::promise<> LedgerStoryModelStorage::Load() {
+  // Synopsis of Load() task:
+  //
+  // 1) Read from device-local state and build commands.
+  // 2) Scan the shared state and build commands.
+  // 3) Wait for the above tasks and then issue all of the commands to
+  // Observe().
+  //
+  // NOTE: currently we don't have any shared state, so we skip (2).
+
+  struct State {
+    fuchsia::ledger::PageSnapshotPtr page_snapshot;
+    std::vector<StoryModelMutation> commands;
+  };
+  auto state = std::make_unique<State>();
+
+  return fit::make_promise([this, state = state.get()](fit::context& c) {
+           // Get a snapshot. Join on the result later and take advantage of
+           // pipelining instead.
+           auto get_snapshot_promise = PagePromise::GetSnapshot(
+               page(), state->page_snapshot.NewRequest());
+
+           auto key = MakeDeviceKey(device_id_);
+           auto read_promise =
+               PageSnapshotPromise::GetInline(state->page_snapshot.get(), key)
+                   .and_then([state](
+                                 const std::unique_ptr<std::vector<uint8_t>>&
+                                     device_state_bytes) {
+                     if (device_state_bytes) {
+                       auto commands = GenerateObservedMutationsForDeviceState(
+                           std::move(*device_state_bytes));
+                       state->commands.insert(
+                           state->commands.end(),
+                           std::make_move_iterator(commands.begin()),
+                           std::make_move_iterator(commands.end()));
+                     }
+                     return fit::ok();
+                   });
+
+           return fit::join_promises(std::move(get_snapshot_promise),
+                                     std::move(read_promise));
+         })
+      .and_then([this, state = state.get()](
+                    std::tuple<fit::result<>, fit::result<>> results)
+                    -> fit::result<> {
+        auto [get_snapshot_result, read_result] = results;
+        if (get_snapshot_result.is_error() || read_result.is_error()) {
+          return fit::error();
+        }
+
+        Observe(std::move(state->commands));
+        return fit::ok();
+      })
+      // Keep |state| alive until execution reaches here.
+      .inspect([state = std::move(state)](fit::result<>& r) {})
+      .wrap_with(scope_);
+}
+
+fit::promise<> LedgerStoryModelStorage::Flush() {
+  // The returned promise will block until all pending mutation opertaions have
+  // resolved. These pending operations are also wrapped with |sequencer_| (in
+  // Execute()), which applies this sequential behavior to promises it wraps.
+  return fit::make_promise([] { return fit::ok(); }).wrap_with(sequencer_);
+}
+
+fit::promise<> LedgerStoryModelStorage::Execute(
+    std::vector<StoryModelMutation> commands) {
+  // Synopsis of the Execute() task:
+  //
+  // 1) Start a Page transaction.
+  // 2) Get a PageSnapshot.
+  // 3) Partition |commands| into those affecting per-device state and shared
+  // state and then update each partition in storage in parallel.
+  // 4) Commit() if successful, and Rollback() if not.
+  //
+  // To take maximum advantage of FIDL pipelining and concurrency, do (1), (2),
+  // and (3). Before (4), join on all the results and fail if
+  // any of 1-3 failed.
+
+  // Some state must outlast several of the fit::promise callbacks below.
+  // Capture it in a struct on the heap, and then move ownership to a point
+  // late enough in our promise by calling
+  // fit::promise.inspect().
+  struct State {
+    fuchsia::ledger::PageSnapshotPtr page_snapshot;
+  };
+  auto state = std::make_unique<State>();
+
+  return fit::make_promise(
+             [this, state = state.get(),
+              commands = std::move(commands)]() mutable -> fit::promise<> {
+               // Start the transaction, but don't block on its result. Rather,
+               // join it later to ensure that a failed StartTransaction()
+               // triggers a failure of the overall task.
+               auto start_transaction_promise =
+                   PagePromise::StartTransaction(page());
+
+               // Get a snapshot. As with StartTransaction(), join on the
+               // result later and take advantage of pipelining instead.
+               auto get_snapshot_promise = PagePromise::GetSnapshot(
+                   page(), state->page_snapshot.NewRequest());
+
+               // Partition up the commands into those that affect device-only
+               // state, and those that affect shared (among all devices) state.
+               auto [device_commands, shared_commands] =
+                   PartitionCommandsForDeviceAndShared(std::move(commands));
+
+               // Dispatch the update commands.
+               auto update_device_state_promise =
+                   UpdateDeviceState(page(), state->page_snapshot.get(),
+                                     device_id_, std::move(device_commands));
+               auto update_shared_state_promise =
+                   UpdateSharedState(page(), state->page_snapshot.get(),
+                                     std::move(shared_commands));
+
+               // Wait on all four pending promises. Fail if any one of them
+               // result in an error.
+               return fit::join_promises(std::move(start_transaction_promise),
+                                         std::move(get_snapshot_promise),
+                                         std::move(update_device_state_promise),
+                                         std::move(update_shared_state_promise))
+                   .and_then([](std::tuple<fit::result<>, fit::result<>,
+                                           fit::result<>, fit::result<>>
+                                    results) -> fit::result<> {
+                     auto [start_transaction_result, get_snapshot_result,
+                           device_result, shared_result] = results;
+                     if (start_transaction_result.is_error() ||
+                         get_snapshot_result.is_error() ||
+                         device_result.is_error() || shared_result.is_error()) {
+                       return fit::error();
+                     }
+                     return fit::ok();
+                   });
+             })
+      // Keep |state| alive until execution reaches here. It is not needed in
+      // any subsequent continuation functions.
+      .inspect([state = std::move(state)](fit::result<>& r) {})
+      .and_then([page = page()] { return PagePromise::Commit(page); })
+      .or_else([page = page()] {
+        // Even if RollbackTransaction() succeeds, fail the overall task.
+        return PagePromise::Rollback(page).and_then(
+            [] { return fit::error(); });
+      })
+      .wrap_with(sequencer_)  // Waits until last Execute() is done.
+      .wrap_with(scope_);     // Aborts if |this| is destroyed.
+}
+
+}  // namespace modular
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage.h b/bin/sessionmgr/story/model/ledger_story_model_storage.h
new file mode 100644
index 0000000..05fcdf1
--- /dev/null
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage.h
@@ -0,0 +1,80 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef PERIDOT_BIN_SESSIONMGR_STORY_MODEL_LEDGER_STORY_MODEL_STORAGE_H_
+#define PERIDOT_BIN_SESSIONMGR_STORY_MODEL_LEDGER_STORY_MODEL_STORAGE_H_
+
+#include <fuchsia/modular/storymodel/cpp/fidl.h>
+#include <lib/fit/scope.h>
+#include <lib/fit/sequencer.h>
+#include <lib/fxl/macros.h>
+
+#include "peridot/bin/sessionmgr/story/model/story_model_storage.h"
+
+#include "peridot/lib/ledger_client/page_client.h"
+
+namespace modular {
+
+class LedgerClient;
+
+// LedgerStoryModelStorage writes a StoryModel into a Ledger Page instance. It partitions
+// the StoryModel into two sections:
+//
+// 1) Values that are scoped to this device (such as the Story's runtime state)
+// 2) Values that are shared among all devices (such as the list of mod URLs)
+//
+// The two sections are stored in separate prefixes of the Ledger: (1) is
+// prefixed using the device's id, and (2) is prefixed in a shared location.
+class LedgerStoryModelStorage : public StoryModelStorage, PageClient {
+ public:
+  // Constructs a new instance which stores all data in |page_id| within
+  // |ledger_client|'s Ledger. Scopes device-local state to a key namespace
+  // therein with |device_id|.
+  LedgerStoryModelStorage(LedgerClient* ledger_client,
+                          fuchsia::ledger::PageId page_id,
+                          std::string device_id);
+  ~LedgerStoryModelStorage() override;
+
+ private:
+  // |PageClient|
+  void OnPageChange(const std::string& key,
+                    fuchsia::mem::BufferPtr value) override;
+
+  // |PageClient|
+  void OnPageDelete(const std::string& key) override;
+
+  // |PageClient|
+  void OnPageConflict(Conflict* conflict) override;
+
+  // |StoryModelStorage|
+  fit::promise<> Load() override;
+
+  // |StoryModelStorage|
+  fit::promise<> Flush() override;
+
+  // |StoryModelStorage|
+  fit::promise<> Execute(
+      std::vector<fuchsia::modular::storymodel::StoryModelMutation> commands)
+      override;
+
+  const std::string device_id_;
+
+  // With |scope_| is destroyed (which is when |this| is destructed), all
+  // fit::promises created in Mutate() will be abandoned. This is important
+  // because those promises capture |this| in their handler functions.
+  fit::scope scope_;
+
+  // All of the writes to the Ledger are sequenced: the fuchsia.ledger.Page API
+  // dictates that only one transaction may be ongoing at a time. Each call to
+  // Execute() results in a promise that calls StartTransaction() and Commit()
+  // at its end. |sequencer_| is used to ensure that no subsequent Execute()
+  // task begins before the previous has completed.
+  fit::sequencer sequencer_;
+
+  FXL_DISALLOW_COPY_AND_ASSIGN(LedgerStoryModelStorage);
+};
+
+}  // namespace modular
+
+#endif  // PERIDOT_BIN_SESSIONMGR_STORY_MODEL_LEDGER_STORY_MODEL_STORAGE_H_
diff --git a/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc b/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc
new file mode 100644
index 0000000..ff895d5
--- /dev/null
+++ b/bin/sessionmgr/story/model/ledger_story_model_storage_unittest.cc
@@ -0,0 +1,206 @@
+// Copyright 2018 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <fuchsia/modular/storymodel/cpp/fidl.h>
+#include <lib/async_promise/executor.h>
+#include <lib/fit/bridge.h>
+#include <lib/fit/function.h>
+#include <lib/fit/single_threaded_executor.h>
+
+#include "gtest/gtest.h"
+#include "peridot/bin/sessionmgr/story/model/apply_mutations.h"
+#include "peridot/bin/sessionmgr/story/model/ledger_story_model_storage.h"
+#include "peridot/bin/sessionmgr/story/model/testing/mutation_matchers.h"
+#include "peridot/lib/ledger_client/ledger_client.h"
+#include "peridot/lib/ledger_client/page_id.h"
+#include "peridot/lib/testing/test_with_ledger.h"
+
+using fuchsia::modular::StoryState;
+using fuchsia::modular::StoryVisibilityState;
+using fuchsia::modular::storymodel::StoryModel;
+using fuchsia::modular::storymodel::StoryModelMutation;
+
+namespace modular {
+namespace {
+
+// TODO: there is no good candidate for testing conflict resolution in the
+// StoryModel as of yet. What would be good is, e.g.: setting a value on a
+// ModuleModel while simultaneously deleting the entire entry.
+
+class LedgerStoryModelStorageTest : public testing::TestWithLedger {
+ public:
+  async::Executor executor;
+
+  LedgerStoryModelStorageTest() : executor(dispatcher()) {}
+
+  // Creates a new LedgerStoryModelStorage instance and returns:
+  //
+  // 1) A unique_ptr to the new instance.
+  // 2) A ptr to a vector of lists of StoryModelMutations observed from that
+  // instance.
+  // 3) A ptr to a StoryModel updated with the observed commands.
+  std::tuple<std::unique_ptr<StoryModelStorage>,
+             std::vector<std::vector<StoryModelMutation>>*, StoryModel*>
+  Create(std::string page_id, std::string device_id) {
+    auto storage = std::make_unique<LedgerStoryModelStorage>(
+        ledger_client(), MakePageId(page_id), device_id);
+
+    auto observed_commands =
+        observed_mutations_.emplace(observed_mutations_.end());
+    auto observed_model = observed_models_.emplace(observed_models_.end());
+    storage->SetObserveCallback([=](std::vector<StoryModelMutation> commands) {
+      *observed_model = ApplyMutations(*observed_model, commands);
+      observed_commands->push_back(std::move(commands));
+    });
+    return std::make_tuple(std::move(storage), &*observed_commands,
+                           &*observed_model);
+  }
+
+  // This is broken out into its own function because we use C++ structured
+  // bindings to capture the result of Create() above. These cannot be
+  // implicitly captured in lambdas without more verbose syntax. This function
+  // converts the binding into a real variable which is possible to capture.
+  void RunLoopUntilNumMutationsObserved(
+      std::vector<std::vector<StoryModelMutation>>* observed_mutations,
+      uint32_t n) {
+    RunLoopUntil([&] { return observed_mutations->size() >= n; });
+  }
+
+ private:
+  // A list (per StoryModelStorage instance) of the commands issued to each call
+  // to StoryModelStorage.Observe().
+  std::list<std::vector<std::vector<StoryModelMutation>>> observed_mutations_;
+  std::list<StoryModel> observed_models_;
+};
+
+// Store some device-local values (runtime state, visibility state), and
+// observe the values coming back to us.
+TEST_F(LedgerStoryModelStorageTest, DeviceLocal_RoundTrip) {
+  auto [storage, observed_mutations, observed_model] =
+      Create("page1", "device1");
+
+  std::vector<StoryModelMutation> commands(2);
+  commands[0].set_set_runtime_state(StoryState::RUNNING);
+  commands[1].set_set_visibility_state(StoryVisibilityState::IMMERSIVE);
+
+  fit::result<> result;
+  executor.schedule_task(
+      storage->Execute(std::move(commands)).then([&](fit::result<>& r) {
+        result = std::move(r);
+      }));
+  RunLoopUntil([&] { return !!result; });
+  EXPECT_TRUE(result.is_ok());
+
+  // We expect to see these values resulting in a notification from the ledger
+  // eventually.
+  RunLoopUntilNumMutationsObserved(observed_mutations, 1);
+  EXPECT_EQ(1lu, observed_mutations->size());
+  EXPECT_THAT(observed_mutations->at(0),
+              ::testing::ElementsAre(
+                  IsSetRuntimeStateMutation(StoryState::RUNNING),
+                  IsSetVisibilityMutation(StoryVisibilityState::IMMERSIVE)));
+
+  // Now change only StoryState. We should see the result of our previous
+  // change to StoryVisibilityState preserved.
+  commands.resize(1);
+  commands[0].set_set_runtime_state(StoryState::STOPPED);
+
+  result = fit::result<>();
+  executor.schedule_task(
+      storage->Execute(std::move(commands)).then([&](fit::result<>& r) {
+        result = std::move(r);
+      }));
+  RunLoopUntil([&] { return !!result; });
+  EXPECT_TRUE(result.is_ok());
+
+  RunLoopUntilNumMutationsObserved(observed_mutations, 2);
+  EXPECT_EQ(2lu, observed_mutations->size());
+  EXPECT_THAT(observed_mutations->at(1),
+              ::testing::ElementsAre(
+                  IsSetRuntimeStateMutation(StoryState::STOPPED),
+                  IsSetVisibilityMutation(StoryVisibilityState::IMMERSIVE)));
+}
+
+// Show that when we store values for two different device IDs in the same
+// Ledger page, they do not cause any conflicts.
+TEST_F(LedgerStoryModelStorageTest, DeviceLocal_DeviceIsolation) {
+  auto [storage1, observed_mutations1, observed_model1] =
+      Create("page1", "device1");
+  auto [storage2, observed_mutations2, observed_model2] =
+      Create("page1", "device2");
+
+  // Set runtime state to RUNNING on device1, and set visibility state to
+  // IMMERSIVE on device2.
+  {
+    std::vector<StoryModelMutation> commands(1);
+    commands[0].set_set_runtime_state(StoryState::RUNNING);
+    executor.schedule_task(storage1->Execute(std::move(commands)));
+  }
+  {
+    std::vector<StoryModelMutation> commands(1);
+    commands[0].set_set_visibility_state(StoryVisibilityState::IMMERSIVE);
+    executor.schedule_task(storage2->Execute(std::move(commands)));
+  }
+
+  RunLoopUntilNumMutationsObserved(observed_mutations1, 1);
+  RunLoopUntilNumMutationsObserved(observed_mutations2, 1);
+
+  EXPECT_TRUE(observed_model1->has_runtime_state());
+  EXPECT_FALSE(observed_model1->has_visibility_state());
+  EXPECT_TRUE(observed_model2->has_visibility_state());
+  EXPECT_FALSE(observed_model2->has_runtime_state());
+}
+
+// Create two update tasks but schedule them out of order. We expect them to
+// run in order.
+TEST_F(LedgerStoryModelStorageTest, UpdatesAreSequential) {
+  auto [storage, observed_mutations, observed_model] = Create("page", "device");
+
+  std::vector<StoryModelMutation> commands(1);
+  commands[0].set_set_runtime_state(StoryState::RUNNING);
+  auto promise1 = storage->Execute(std::move(commands));
+
+  commands.resize(1);
+  commands[0].set_set_runtime_state(StoryState::STOPPING);
+  auto promise2 = storage->Execute(std::move(commands));
+
+  executor.schedule_task(std::move(promise2));
+  RunLoopUntilIdle();  // For good measure.
+  executor.schedule_task(std::move(promise1));
+
+  RunLoopUntilNumMutationsObserved(observed_mutations, 2);
+  EXPECT_EQ(StoryState::STOPPING, *observed_model->runtime_state());
+}
+
+// When Load() is called, read what is stored in the Ledger back out and
+// expect to see commands that represent that state through the storage
+// observer.
+TEST_F(LedgerStoryModelStorageTest, Load) {
+  StoryModel expected_model;
+  {
+    auto [storage, observed_mutations, observed_model] =
+        Create("page", "device");
+
+    std::vector<StoryModelMutation> commands(2);
+    commands[0].set_set_runtime_state(StoryState::RUNNING);
+    commands[1].set_set_visibility_state(StoryVisibilityState::IMMERSIVE);
+    // TODO(thatguy): As we add more StoryModelMutations, add more lines here.
+    executor.schedule_task(storage->Execute(std::move(commands)));
+    bool done{false};
+    executor.schedule_task(storage->Flush().and_then([&] { done = true; }));
+    RunLoopUntil([&] { return done; });
+    expected_model = std::move(*observed_model);
+  }
+
+  auto [storage, observed_mutations, observed_model] = Create("page", "device");
+
+  bool done{false};
+  executor.schedule_task(
+      storage->Load().then([&](fit::result<>&) { done = true; }));
+  RunLoopUntil([&] { return done; });
+  EXPECT_EQ(expected_model, *observed_model);
+}
+
+}  // namespace
+}  // namespace modular
diff --git a/bin/sessionmgr/story/model/meta/ledger_story_model_storage_unittest.cmx b/bin/sessionmgr/story/model/meta/ledger_story_model_storage_unittest.cmx
index bee4b96..bb0807c 100644
--- a/bin/sessionmgr/story/model/meta/ledger_story_model_storage_unittest.cmx
+++ b/bin/sessionmgr/story/model/meta/ledger_story_model_storage_unittest.cmx
@@ -3,6 +3,8 @@
     "binary": "test/ledger_story_model_storage_unittest"
   },
   "sandbox": {
-    "services": []
+    "services": [
+      "fuchsia.sys.Launcher"
+    ]
   }
 }