[ledger] Ledger ChildrenManager integration

This is our second use of the new ChildrenManager API and enables
Ledger, given one or more connected repositories, to describe to
Inspect unconnected pages within the ledgers within those
repositories.

Change-Id: I75f23c9684e7d34dd9838fcc91c97d5dfa73d5ad
diff --git a/src/ledger/bin/app/ledger_manager.cc b/src/ledger/bin/app/ledger_manager.cc
index 93cb3df..d1fb17d 100644
--- a/src/ledger/bin/app/ledger_manager.cc
+++ b/src/ledger/bin/app/ledger_manager.cc
@@ -24,6 +24,7 @@
 #include "src/ledger/bin/app/page_utils.h"
 #include "src/ledger/bin/app/types.h"
 #include "src/ledger/bin/fidl/include/types.h"
+#include "src/ledger/bin/inspect/inspect.h"
 #include "src/ledger/bin/p2p_sync/public/page_communicator.h"
 #include "src/ledger/bin/storage/public/page_storage.h"
 #include "src/lib/fxl/logging.h"
@@ -47,6 +48,9 @@
       merge_manager_(environment_),
       page_usage_listener_(page_usage_listener),
       inspect_node_(std::move(inspect_node)),
+      pages_node_(
+          inspect_node_.CreateChild(kPagesInspectPathComponent.ToString())),
+      children_manager_retainer_(pages_node_.SetChildrenManager(this)),
       weak_factory_(this) {
   bindings_.set_on_empty([this] { CheckEmpty(); });
   page_managers_.set_on_empty([this] { CheckEmpty(); });
@@ -93,6 +97,35 @@
                                            std::move(callback));
 }
 
+void LedgerManager::GetNames(
+    fit::function<void(std::vector<std::string>)> callback) {
+  storage_->ListPages([callback = std::move(callback)](
+                          storage::Status status,
+                          std::set<storage::PageId> page_ids) {
+    if (status != storage::Status::OK) {
+      FXL_LOG(WARNING) << "Status wasn't OK; rather it was " << status << "!";
+    }
+    std::vector<std::string> display_names;
+    display_names.reserve(page_ids.size());
+    for (const auto& page_id : page_ids) {
+      display_names.push_back(PageIdToDisplayName(page_id));
+    }
+    callback(display_names);
+  });
+}
+
+void LedgerManager::Attach(std::string name,
+                           fit::function<void(fit::closure)> callback) {
+  storage::PageId page_id;
+  if (!PageDisplayNameToPageId(name, &page_id)) {
+    FXL_DCHECK(false) << "Page display name \"" << name
+                      << "\" not convertable into a PageId!";
+    callback([]() {});
+    return;
+  }
+  callback(GetOrCreatePageManager(page_id)->CreateDetacher());
+};
+
 PageManager* LedgerManager::GetOrCreatePageManager(
     convert::ExtendedStringView page_id) {
   auto it = page_managers_.find(page_id);
@@ -102,9 +135,10 @@
 
   auto ret = page_managers_.emplace(
       std::piecewise_construct, std::forward_as_tuple(page_id.ToString()),
-      std::forward_as_tuple(environment_, ledger_name_, page_id.ToString(),
-                            page_usage_listener_, storage_.get(),
-                            ledger_sync_.get(), &merge_manager_));
+      std::forward_as_tuple(
+          environment_, ledger_name_, page_id.ToString(), page_usage_listener_,
+          storage_.get(), ledger_sync_.get(), &merge_manager_,
+          pages_node_.CreateChild(PageIdToDisplayName(page_id.ToString()))));
   FXL_DCHECK(ret.second);
   return &ret.first->second;
 }
diff --git a/src/ledger/bin/app/ledger_manager.h b/src/ledger/bin/app/ledger_manager.h
index 7e57ca6..315e95e 100644
--- a/src/ledger/bin/app/ledger_manager.h
+++ b/src/ledger/bin/app/ledger_manager.h
@@ -41,7 +41,7 @@
 // LedgerManager owns all per-ledger-instance objects: LedgerStorage and a FIDL
 // LedgerImpl. It is safe to delete it at any point - this closes all channels,
 // deletes the LedgerImpl and tears down the storage.
-class LedgerManager : public LedgerImpl::Delegate {
+class LedgerManager : public LedgerImpl::Delegate, inspect::ChildrenManager {
  public:
   LedgerManager(
       Environment* environment, std::string ledger_name,
@@ -83,6 +83,12 @@
   void SetConflictResolverFactory(
       fidl::InterfaceHandle<ConflictResolverFactory> factory) override;
 
+  // inspect::ChildrenManager:
+  void GetNames(
+      fit::function<void(std::vector<std::string>)> callback) override;
+  void Attach(std::string name,
+              fit::function<void(fit::closure)> callback) override;
+
   void set_on_empty(fit::closure on_empty_callback) {
     on_empty_callback_ = std::move(on_empty_callback);
   }
@@ -139,6 +145,9 @@
   // The static Inspect object maintaining in Inspect a representation of this
   // LedgerManager.
   inspect::Node inspect_node_;
+  // The static Inspect object to which this LedgerManager's pages are attached.
+  inspect::Node pages_node_;
+  fit::deferred_callback children_manager_retainer_;
 
   // Must be the last member.
   fxl::WeakPtrFactory<LedgerManager> weak_factory_;
diff --git a/src/ledger/bin/app/ledger_manager_unittest.cc b/src/ledger/bin/app/ledger_manager_unittest.cc
index c14c28d..84c41f4 100644
--- a/src/ledger/bin/app/ledger_manager_unittest.cc
+++ b/src/ledger/bin/app/ledger_manager_unittest.cc
@@ -11,6 +11,7 @@
 #include <lib/fidl/cpp/optional.h>
 #include <lib/fit/function.h>
 #include <lib/inspect/inspect.h>
+#include <lib/inspect/testing/inspect.h>
 #include <zircon/errors.h>
 #include <zircon/syscalls.h>
 
@@ -20,25 +21,53 @@
 #include <utility>
 #include <vector>
 
+#include "gmock/gmock.h"
 #include "gtest/gtest.h"
 #include "peridot/lib/convert/convert.h"
+#include "peridot/lib/scoped_tmpfs/scoped_tmpfs.h"
 #include "src/ledger/bin/app/constants.h"
 #include "src/ledger/bin/app/disk_cleanup_manager_impl.h"
 #include "src/ledger/bin/encryption/fake/fake_encryption_service.h"
 #include "src/ledger/bin/environment/environment.h"
 #include "src/ledger/bin/fidl/include/types.h"
+#include "src/ledger/bin/inspect/inspect.h"
+#include "src/ledger/bin/storage/fake/fake_db_factory.h"
 #include "src/ledger/bin/storage/fake/fake_page_storage.h"
+#include "src/ledger/bin/storage/impl/ledger_storage_impl.h"
 #include "src/ledger/bin/storage/public/ledger_storage.h"
 #include "src/ledger/bin/sync_coordinator/public/ledger_sync.h"
 #include "src/ledger/bin/testing/fake_disk_cleanup_manager.h"
+#include "src/ledger/bin/testing/inspect.h"
 #include "src/ledger/bin/testing/test_with_environment.h"
 #include "src/lib/fxl/macros.h"
 #include "src/lib/fxl/memory/ref_ptr.h"
+#include "src/lib/fxl/strings/string_view.h"
 
 namespace ledger {
 namespace {
 
-constexpr char kLedgerName[] = "ledger_under_test";
+using ::inspect::testing::ChildrenMatch;
+using ::inspect::testing::NameMatches;
+using ::inspect::testing::NodeMatches;
+using ::testing::AllOf;
+using ::testing::ElementsAre;
+using ::testing::ElementsAreArray;
+
+constexpr fxl::StringView kLedgerName = "ledger_under_test";
+constexpr fxl::StringView kTestTopLevelNodeName = "top-level-of-test node";
+constexpr fxl::StringView kInspectPathComponent = "test_ledger";
+
+PageId RandomId(const Environment& environment) {
+  PageId result;
+  environment.random()->Draw(&result.id);
+  return result;
+}
+
+PageId SpecificId(const storage::PageId& page_id) {
+  PageId fidl_page_id;
+  convert::ToArray(page_id, &fidl_page_id.id);
+  return fidl_page_id;
+}
 
 class DelayingCallbacksManager {
  public:
@@ -238,9 +267,11 @@
     storage_ptr = storage.get();
     std::unique_ptr<FakeLedgerSync> sync = std::make_unique<FakeLedgerSync>();
     sync_ptr = sync.get();
+    top_level_node_ = inspect::Node(kTestTopLevelNodeName.ToString());
     disk_cleanup_manager_ = std::make_unique<FakeDiskCleanupManager>();
     ledger_manager_ = std::make_unique<LedgerManager>(
-        &environment_, kLedgerName, inspect::Node(),
+        &environment_, kLedgerName.ToString(),
+        top_level_node_.CreateChild(kInspectPathComponent.ToString()),
         std::make_unique<encryption::FakeEncryptionService>(dispatcher()),
         std::move(storage), std::move(sync), disk_cleanup_manager_.get());
     ResetLedgerPtr();
@@ -248,15 +279,10 @@
 
   void ResetLedgerPtr() { ledger_manager_->BindLedger(ledger_.NewRequest()); }
 
-  PageId RandomId() {
-    PageId result;
-    environment_.random()->Draw(&result.id);
-    return result;
-  }
-
  protected:
   FakeLedgerStorage* storage_ptr;
   FakeLedgerSync* sync_ptr;
+  inspect::Node top_level_node_;
   std::unique_ptr<FakeDiskCleanupManager> disk_cleanup_manager_;
   std::unique_ptr<LedgerManager> ledger_manager_;
   LedgerPtr ledger_;
@@ -314,7 +340,7 @@
   storage_ptr->ClearCalls();
 
   ResetLedgerPtr();
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
   RunLoopUntilIdle();
   EXPECT_EQ(1u, storage_ptr->create_page_calls.size());
@@ -381,7 +407,7 @@
   bool on_empty_called;
   ledger_manager_->set_on_empty(callback::SetWhenCalled(&on_empty_called));
 
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   bool delete_page_called;
   Status delete_page_status;
   ledger_manager_->DeletePageStorage(
@@ -408,7 +434,7 @@
   Status status;
   PagePredicateResult is_closed_and_synced;
 
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   // Check for a page that doesn't exist.
   storage_ptr->should_get_page_fail = true;
@@ -428,7 +454,7 @@
 
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -464,7 +490,7 @@
 TEST_F(LedgerManagerTest, PageIsClosedAndSyncedCallOnEmpty) {
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -513,7 +539,7 @@
 
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -542,7 +568,7 @@
 
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -585,7 +611,7 @@
 TEST_F(LedgerManagerTest, PageIsClosedAndSyncedConcurrentCalls) {
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -641,7 +667,7 @@
   Status status;
   PagePredicateResult is_closed_offline_empty;
 
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   // Check for a page that doesn't exist.
   storage_ptr->should_get_page_fail = true;
@@ -659,7 +685,7 @@
 
   storage_ptr->should_get_page_fail = false;
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -696,7 +722,7 @@
   PagePredicateResult is_closed_offline_empty;
   bool delete_page_called = false;
   Status delete_page_status;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   // The page is closed, offline and empty. Try to delete the page storage in
   // the callback.
@@ -728,7 +754,7 @@
 
 // Verifies that two successive calls to GetPage do not create 2 storages.
 TEST_F(LedgerManagerTest, CallGetPageTwice) {
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   PagePtr page1;
   ledger_->GetPage(fidl::MakeOptional(id), page1.NewRequest());
@@ -746,7 +772,7 @@
   zx_status_t status;
 
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   bool called;
   storage_ptr->ClearCalls();
   // Get the root page.
@@ -794,7 +820,7 @@
 TEST_F(LedgerManagerTest, OnPageOpenedClosedCalls) {
   PagePtr page1;
   PagePtr page2;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   EXPECT_EQ(0, disk_cleanup_manager_->page_opened_count);
   EXPECT_EQ(0, disk_cleanup_manager_->page_closed_count);
@@ -832,7 +858,7 @@
 
 TEST_F(LedgerManagerTest, OnPageOpenedClosedCallInternalRequest) {
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   EXPECT_EQ(0, disk_cleanup_manager_->page_opened_count);
   EXPECT_EQ(0, disk_cleanup_manager_->page_closed_count);
@@ -866,7 +892,7 @@
 
 TEST_F(LedgerManagerTest, OnPageOpenedClosedUnused) {
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   storage::PageIdView storage_page_id = convert::ExtendedStringView(id.id);
 
   EXPECT_EQ(0, disk_cleanup_manager_->page_opened_count);
@@ -928,7 +954,7 @@
 
 TEST_F(LedgerManagerTest, DeletePageStorageWhenPageOpenFails) {
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
   bool called;
 
   ledger_->GetPage(fidl::MakeOptional(id), page.NewRequest());
@@ -946,7 +972,7 @@
 
 TEST_F(LedgerManagerTest, OpenPageWithDeletePageStorageInProgress) {
   PagePtr page;
-  PageId id = RandomId();
+  PageId id = RandomId(environment_);
 
   // Start deleting the page.
   bool delete_called;
@@ -1008,5 +1034,175 @@
   EXPECT_FALSE(factory2.disconnected);
 }
 
+// Constructs a Matcher to be matched against a test-owned Inspect object (the
+// Inspect object to which the LedgerManager under test attaches a child) that
+// validates that the matched object has a hierarchy with a node for the
+// LedgerManager under test, a node named |kPagesInspectPathComponent|
+// under that, and a node for each of the given |page_ids| under that.
+testing::Matcher<const inspect::ObjectHierarchy&> HierarchyMatcher(
+    const std::vector<storage::PageId>& page_ids) {
+  auto page_expectations =
+      std::vector<testing::Matcher<const inspect::ObjectHierarchy&>>();
+  std::set<storage::PageId> sorted_and_unique_page_ids;
+  for (const storage::PageId& page_id : page_ids) {
+    sorted_and_unique_page_ids.insert(page_id);
+  }
+  for (const storage::PageId& page_id : sorted_and_unique_page_ids) {
+    page_expectations.push_back(
+        NodeMatches(NameMatches(PageIdToDisplayName(page_id))));
+  }
+  return ChildrenMatch(ElementsAre(ChildrenMatch(ElementsAre(
+      AllOf(NodeMatches(NameMatches(kPagesInspectPathComponent.ToString())),
+            ChildrenMatch(ElementsAreArray(page_expectations)))))));
+}
+
+// TODO(https://fuchsia.atlassian.net/browse/LE-800): Make FakeLedgerStorage
+// usable as a real LedgerStorage and unify this class with LedgerManagerTest.
+class LedgerManagerWithRealStorageTest : public TestWithEnvironment {
+ public:
+  LedgerManagerWithRealStorageTest() = default;
+  ~LedgerManagerWithRealStorageTest() override = default;
+
+  // gtest::TestWithEnvironment:
+  void SetUp() override {
+    TestWithEnvironment::SetUp();
+    auto encryption_service =
+        std::make_unique<encryption::FakeEncryptionService>(dispatcher());
+    db_factory_ = std::make_unique<storage::fake::FakeDbFactory>(dispatcher());
+    auto ledger_storage = std::make_unique<storage::LedgerStorageImpl>(
+        &environment_, encryption_service.get(),
+
+        db_factory_.get(), DetachedPath(tmpfs_.root_fd()));
+    std::unique_ptr<FakeLedgerSync> sync = std::make_unique<FakeLedgerSync>();
+    top_level_node_ = inspect::Node(kTestTopLevelNodeName.ToString());
+    attachment_node_ = top_level_node_.CreateChild(
+        kSystemUnderTestAttachmentPointPathComponent.ToString());
+    disk_cleanup_manager_ = std::make_unique<FakeDiskCleanupManager>();
+    ledger_manager_ = std::make_unique<LedgerManager>(
+        &environment_, kLedgerName.ToString(),
+        attachment_node_.CreateChild(kInspectPathComponent.ToString()),
+        std::move(encryption_service), std::move(std::move(ledger_storage)),
+        std::move(sync), disk_cleanup_manager_.get());
+  }
+
+ protected:
+  testing::AssertionResult MutatePage(const PagePtr& page_ptr) {
+    page_ptr->Put(convert::ToArray("Hello."),
+                  convert::ToArray("Is it me for whom you are looking?"));
+    bool sync_callback_called;
+    page_ptr->Sync(
+        callback::Capture(callback::SetWhenCalled(&sync_callback_called)));
+    RunLoopUntilIdle();
+    if (!sync_callback_called) {
+      return testing::AssertionFailure() << "Sync callback wasn't called!";
+    }
+    if (!page_ptr.is_bound()) {
+      return testing::AssertionFailure() << "Page pointer came unbound!";
+    }
+    return testing::AssertionSuccess();
+  }
+
+  scoped_tmpfs::ScopedTmpFS tmpfs_;
+  std::unique_ptr<storage::fake::FakeDbFactory> db_factory_;
+  // TODO(nathaniel): Because we use the ChildrenManager API, we need to do our
+  // reads using FIDL, and because we want to use inspect::ReadFromFidl for our
+  // reads, we need to have these two objects (one parent, one child, both part
+  // of the test, and with the system under test attaching to the child) rather
+  // than just one. Even though this is test code this is still a layer of
+  // indirection that should be eliminable in Inspect's upcoming "VMO-World".
+  inspect::Node top_level_node_;
+  inspect::Node attachment_node_;
+  std::unique_ptr<FakeDiskCleanupManager> disk_cleanup_manager_;
+  std::unique_ptr<LedgerManager> ledger_manager_;
+
+ private:
+  FXL_DISALLOW_COPY_AND_ASSIGN(LedgerManagerWithRealStorageTest);
+};
+
+TEST_F(LedgerManagerWithRealStorageTest, InspectAPIDisconnectedPagePresence) {
+  // These PageIds are similar to those seen in use by real components.
+  PageId first_page_id = SpecificId("first_page_id___");
+  PageId second_page_id = SpecificId("second_page_id__");
+  // Real components also use random PageIds.
+  PageId third_page_id = RandomId(environment_);
+  std::vector<storage::PageId> storage_page_ids(
+      {convert::ToString(first_page_id.id),
+       convert::ToString(second_page_id.id),
+       convert::ToString(third_page_id.id)});
+  LedgerPtr ledger_ptr;
+  ledger_manager_->BindLedger(ledger_ptr.NewRequest());
+
+  // When nothing has yet requested a page, check that the Inspect hierarchy
+  // is as expected with no nodes representing pages.
+  inspect::ObjectHierarchy zeroth_hierarchy;
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &zeroth_hierarchy));
+  EXPECT_THAT(zeroth_hierarchy, HierarchyMatcher({}));
+
+  // When one page has been created in the ledger, check that the Inspect
+  // hierarchy is as expected with a node for that one page.
+  PagePtr first_page_ptr;
+  ledger_ptr->GetPage(fidl::MakeOptional(first_page_id),
+                      first_page_ptr.NewRequest());
+  RunLoopUntilIdle();
+  inspect::ObjectHierarchy hierarchy_after_one_connection;
+  ASSERT_TRUE(
+      Inspect(&top_level_node_, &test_loop(), &hierarchy_after_one_connection));
+  EXPECT_THAT(hierarchy_after_one_connection,
+              HierarchyMatcher({storage_page_ids[0]}));
+
+  // When two ledgers have been created in the repository, check that the
+  // Inspect hierarchy is as expected with nodes for both ledgers.
+  PagePtr second_page_ptr;
+  ledger_ptr->GetPage(fidl::MakeOptional(second_page_id),
+                      second_page_ptr.NewRequest());
+  RunLoopUntilIdle();
+  inspect::ObjectHierarchy hierarchy_after_two_connections;
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_two_connections));
+  EXPECT_THAT(hierarchy_after_two_connections,
+              HierarchyMatcher({storage_page_ids[0], storage_page_ids[1]}));
+
+  // Unbind the first page, but only after having written to it, because it is
+  // not guaranteed that the LedgerManager under test will maintain an empty
+  // page resident on disk to be described in a later inspection.
+  EXPECT_TRUE(MutatePage(first_page_ptr));
+  first_page_ptr.Unbind();
+  RunLoopUntilIdle();
+
+  // When one of the two pages has been disconnected, check that an inspection
+  // still finds both.
+  inspect::ObjectHierarchy hierarchy_after_one_disconnection;
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_one_disconnection));
+  EXPECT_THAT(hierarchy_after_one_disconnection,
+              HierarchyMatcher({storage_page_ids[0], storage_page_ids[1]}));
+
+  // Check that after a third page connection is made, all three pages appear in
+  // an inspection.
+  PagePtr third_page_ptr;
+  ledger_ptr->GetPage(fidl::MakeOptional(third_page_id),
+                      third_page_ptr.NewRequest());
+  RunLoopUntilIdle();
+  inspect::ObjectHierarchy hierarchy_with_second_and_third_connection;
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_with_second_and_third_connection));
+  EXPECT_THAT(hierarchy_with_second_and_third_connection,
+              HierarchyMatcher(storage_page_ids));
+
+  // Check that after all pages are mutated and unbound all three pages still
+  // appear in an inspection.
+  EXPECT_TRUE(MutatePage(second_page_ptr));
+  second_page_ptr.Unbind();
+  EXPECT_TRUE(MutatePage(third_page_ptr));
+  third_page_ptr.Unbind();
+  RunLoopUntilIdle();
+
+  inspect::ObjectHierarchy hierarchy_after_three_disconnections;
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_three_disconnections));
+  EXPECT_THAT(hierarchy_after_three_disconnections,
+              HierarchyMatcher(storage_page_ids));
+}
+
 }  // namespace
 }  // namespace ledger
diff --git a/src/ledger/bin/app/ledger_repository_impl_unittest.cc b/src/ledger/bin/app/ledger_repository_impl_unittest.cc
index 5768b2d..fb1b912 100644
--- a/src/ledger/bin/app/ledger_repository_impl_unittest.cc
+++ b/src/ledger/bin/app/ledger_repository_impl_unittest.cc
@@ -14,7 +14,6 @@
 #include <lib/inspect/deprecated/expose.h>
 #include <lib/inspect/hierarchy.h>
 #include <lib/inspect/inspect.h>
-#include <lib/inspect/reader.h>
 #include <lib/inspect/testing/inspect.h>
 
 #include <vector>
@@ -29,6 +28,7 @@
 #include "src/ledger/bin/storage/fake/fake_db_factory.h"
 #include "src/ledger/bin/storage/public/types.h"
 #include "src/ledger/bin/testing/fake_disk_cleanup_manager.h"
+#include "src/ledger/bin/testing/inspect.h"
 #include "src/ledger/bin/testing/test_with_environment.h"
 #include "src/lib/fxl/macros.h"
 #include "src/lib/fxl/strings/string_view.h"
@@ -36,8 +36,6 @@
 namespace ledger {
 namespace {
 
-constexpr char kSystemUnderTestAttachmentPointPathComponent[] =
-    "attachment_point";
 constexpr char kInspectPathComponent[] = "test_repository";
 constexpr char kTestTopLevelNodeName[] = "top-level-of-test node";
 
@@ -77,7 +75,7 @@
     disk_cleanup_manager_ = fake_page_eviction_manager.get();
     top_level_node_ = inspect::Node(kTestTopLevelNodeName);
     attachment_node_ = top_level_node_.CreateChild(
-        kSystemUnderTestAttachmentPointPathComponent);
+        kSystemUnderTestAttachmentPointPathComponent.ToString());
 
     repository_ = std::make_unique<LedgerRepositoryImpl>(
         DetachedPath(tmpfs_.root_fd()), &environment_,
@@ -89,8 +87,6 @@
   ~LedgerRepositoryImplTest() override {}
 
  protected:
-  testing::AssertionResult Read(inspect::ObjectHierarchy* hierarchy);
-
   scoped_tmpfs::ScopedTmpFS tmpfs_;
   FakeDiskCleanupManager* disk_cleanup_manager_;
   // TODO(nathaniel): Because we use the ChildrenManager API, we need to do our
@@ -107,45 +103,6 @@
   FXL_DISALLOW_COPY_AND_ASSIGN(LedgerRepositoryImplTest);
 };
 
-testing::AssertionResult LedgerRepositoryImplTest::Read(
-    inspect::ObjectHierarchy* hierarchy) {
-  bool callback_called;
-  bool success;
-  fidl::InterfaceHandle<fuchsia::inspect::Inspect> inspect_handle;
-  top_level_node_.object_dir().object()->OpenChild(
-      kSystemUnderTestAttachmentPointPathComponent, inspect_handle.NewRequest(),
-      callback::Capture(callback::SetWhenCalled(&callback_called), &success));
-  RunLoopUntilIdle();
-  if (!callback_called) {
-    return ::testing::AssertionFailure()
-           << "Callback passed to OpenChild not called!";
-  }
-  if (!success) {
-    return ::testing::AssertionFailure() << "OpenChild not successful!";
-  }
-
-  callback_called = false;
-  async::Executor executor(dispatcher());
-  fit::result<inspect::ObjectHierarchy> hierarchy_result;
-  auto hierarchy_promise =
-      inspect::ReadFromFidl(inspect::ObjectReader(std::move(inspect_handle)))
-          .then([&](fit::result<inspect::ObjectHierarchy>&
-                        then_hierarchy_result) {
-            callback_called = true;
-            hierarchy_result = std::move(then_hierarchy_result);
-          });
-  executor.schedule_task(std::move(hierarchy_promise));
-  RunLoopUntilIdle();
-  if (!callback_called) {
-    return ::testing::AssertionFailure()
-           << "Callback passed to ReadFromFidl(<...>).then not called!";
-  } else if (!hierarchy_result.is_ok()) {
-    return ::testing::AssertionFailure() << "Hierarchy result not okay!";
-  }
-  *hierarchy = hierarchy_result.take_value();
-  return ::testing::AssertionSuccess();
-}
-
 TEST_F(LedgerRepositoryImplTest, ConcurrentCalls) {
   // Make a first call to DiskCleanUp.
   bool callback_called1 = false;
@@ -180,7 +137,7 @@
   // When nothing has bound to the repository, check that the "requests" metric
   // is present and is zero.
   inspect::ObjectHierarchy zeroth_hierarchy;
-  ASSERT_TRUE(Read(&zeroth_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &zeroth_hierarchy));
   EXPECT_THAT(
       zeroth_hierarchy,
       ChildrenMatch(Contains(NodeMatches(MetricList(Contains(
@@ -191,7 +148,7 @@
   ledger_internal::LedgerRepositoryPtr first_ledger_repository_ptr;
   repository_->BindRepository(first_ledger_repository_ptr.NewRequest());
   inspect::ObjectHierarchy first_hierarchy;
-  ASSERT_TRUE(Read(&first_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &first_hierarchy));
   EXPECT_THAT(
       first_hierarchy,
       ChildrenMatch(Contains(NodeMatches(MetricList(Contains(
@@ -202,7 +159,7 @@
   ledger_internal::LedgerRepositoryPtr second_ledger_repository_ptr;
   repository_->BindRepository(second_ledger_repository_ptr.NewRequest());
   inspect::ObjectHierarchy second_hierarchy;
-  ASSERT_TRUE(Read(&second_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &second_hierarchy));
   EXPECT_THAT(
       second_hierarchy,
       ChildrenMatch(Contains(NodeMatches(MetricList(Contains(
@@ -218,7 +175,7 @@
   // When nothing has requested a ledger, check that the Inspect hierarchy is as
   // expected with no nodes representing ledgers.
   inspect::ObjectHierarchy zeroth_hierarchy;
-  ASSERT_TRUE(Read(&zeroth_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &zeroth_hierarchy));
   EXPECT_THAT(zeroth_hierarchy, HierarchyMatcher({}));
 
   // When one ledger has been created in the repository, check that the Inspect
@@ -228,7 +185,7 @@
                                    first_ledger_ptr.NewRequest());
   RunLoopUntilIdle();
   inspect::ObjectHierarchy first_hierarchy;
-  ASSERT_TRUE(Read(&first_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &first_hierarchy));
   EXPECT_THAT(first_hierarchy, HierarchyMatcher({first_ledger_name}));
 
   // When two ledgers have been created in the repository, check that the
@@ -238,7 +195,7 @@
                                    second_ledger_ptr.NewRequest());
   RunLoopUntilIdle();
   inspect::ObjectHierarchy second_hierarchy;
-  ASSERT_TRUE(Read(&second_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &second_hierarchy));
   EXPECT_THAT(second_hierarchy,
               HierarchyMatcher({first_ledger_name, second_ledger_name}));
 }
@@ -252,7 +209,7 @@
   // When nothing has yet requested a ledger, check that the Inspect hierarchy
   // is as expected with no nodes representing ledgers.
   inspect::ObjectHierarchy zeroth_hierarchy;
-  ASSERT_TRUE(Read(&zeroth_hierarchy));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(), &zeroth_hierarchy));
   EXPECT_THAT(zeroth_hierarchy, HierarchyMatcher({}));
 
   // When one ledger has been created in the repository, check that the Inspect
@@ -262,7 +219,8 @@
                                    first_ledger_ptr.NewRequest());
   RunLoopUntilIdle();
   inspect::ObjectHierarchy hierarchy_after_one_connection;
-  ASSERT_TRUE(Read(&hierarchy_after_one_connection));
+  ASSERT_TRUE(
+      Inspect(&top_level_node_, &test_loop(), &hierarchy_after_one_connection));
   EXPECT_THAT(hierarchy_after_one_connection,
               HierarchyMatcher({first_ledger_name}));
 
@@ -273,7 +231,8 @@
                                    second_ledger_ptr.NewRequest());
   RunLoopUntilIdle();
   inspect::ObjectHierarchy hierarchy_after_two_connections;
-  ASSERT_TRUE(Read(&hierarchy_after_two_connections));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_two_connections));
   EXPECT_THAT(hierarchy_after_two_connections,
               HierarchyMatcher({first_ledger_name, second_ledger_name}));
 
@@ -283,7 +242,8 @@
   // When one of the two ledgers has been disconnected, check that an inspection
   // still finds both.
   inspect::ObjectHierarchy hierarchy_after_one_disconnection;
-  ASSERT_TRUE(Read(&hierarchy_after_one_disconnection));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_one_disconnection));
   EXPECT_THAT(hierarchy_after_one_disconnection,
               HierarchyMatcher({first_ledger_name, second_ledger_name}));
 
@@ -293,7 +253,8 @@
   // When both of the ledgers have been disconnected, check that an inspection
   // still finds both.
   inspect::ObjectHierarchy hierarchy_after_two_disconnections;
-  ASSERT_TRUE(Read(&hierarchy_after_two_disconnections));
+  ASSERT_TRUE(Inspect(&top_level_node_, &test_loop(),
+                      &hierarchy_after_two_disconnections));
   EXPECT_THAT(hierarchy_after_two_disconnections,
               HierarchyMatcher({first_ledger_name, second_ledger_name}));
 }
diff --git a/src/ledger/bin/app/page_manager.cc b/src/ledger/bin/app/page_manager.cc
index 6dbac196..93d7a7c 100644
--- a/src/ledger/bin/app/page_manager.cc
+++ b/src/ledger/bin/app/page_manager.cc
@@ -10,6 +10,7 @@
 #include <lib/fidl/cpp/interface_request.h>
 #include <lib/fit/defer.h>
 #include <lib/fit/function.h>
+#include <lib/inspect/inspect.h>
 #include <trace/event.h>
 
 #include <string>
@@ -24,6 +25,7 @@
 #include "src/ledger/bin/app/page_utils.h"
 #include "src/ledger/bin/app/types.h"
 #include "src/ledger/bin/fidl/include/types.h"
+#include "src/ledger/bin/inspect/inspect.h"
 #include "src/ledger/bin/p2p_sync/public/page_communicator.h"
 #include "src/ledger/bin/storage/public/page_storage.h"
 #include "src/lib/fxl/logging.h"
@@ -36,7 +38,8 @@
                          PageUsageListener* page_usage_listener,
                          storage::LedgerStorage* ledger_storage,
                          sync_coordinator::LedgerSync* ledger_sync,
-                         LedgerMergeManager* ledger_merge_manager)
+                         LedgerMergeManager* ledger_merge_manager,
+                         inspect::Node inspect_node)
     : environment_(environment),
       ledger_name_(std::move(ledger_name)),
       page_id_(std::move(page_id)),
@@ -44,12 +47,24 @@
       ledger_storage_(ledger_storage),
       ledger_sync_(ledger_sync),
       ledger_merge_manager_(ledger_merge_manager),
+      inspect_node_(std::move(inspect_node)),
+      commits_node_(
+          inspect_node_.CreateChild(kCommitsInspectPathComponent.ToString())),
       weak_factory_(this) {
   page_availability_manager_.set_on_empty([this] { CheckEmpty(); });
 }
 
 PageManager::~PageManager() {}
 
+fit::closure PageManager::CreateDetacher() {
+  outstanding_detachers_++;
+  return [this]() {
+    outstanding_detachers_--;
+    FXL_DCHECK(outstanding_detachers_ >= 0);
+    CheckEmpty();
+  };
+}
+
 void PageManager::PageIsClosedAndSynced(
     fit::function<void(storage::Status, PagePredicateResult)> callback) {
   auto is_synced = [](ActivePageManager* active_page_manager,
@@ -308,7 +323,8 @@
 
 void PageManager::CheckEmpty() {
   if (on_empty_callback_ && !active_page_manager_container_ &&
-      outstanding_operations_ == 0 && page_availability_manager_.IsEmpty()) {
+      outstanding_operations_ == 0 && page_availability_manager_.IsEmpty() &&
+      outstanding_detachers_ == 0) {
     on_empty_callback_();
   }
 }
diff --git a/src/ledger/bin/app/page_manager.h b/src/ledger/bin/app/page_manager.h
index 83b386a..207f1ce 100644
--- a/src/ledger/bin/app/page_manager.h
+++ b/src/ledger/bin/app/page_manager.h
@@ -7,6 +7,7 @@
 
 #include <lib/fidl/cpp/interface_request.h>
 #include <lib/fit/function.h>
+#include <lib/inspect/inspect.h>
 
 #include <optional>
 #include <string>
@@ -41,7 +42,8 @@
               storage::PageId page_id, PageUsageListener* page_usage_listener,
               storage::LedgerStorage* ledger_storage,
               sync_coordinator::LedgerSync* ledger_sync,
-              LedgerMergeManager* ledger_merge_manager);
+              LedgerMergeManager* ledger_merge_manager,
+              inspect::Node inspect_node);
   ~PageManager();
 
   // Checks whether the given page is closed and synced. The result returned in
@@ -68,6 +70,12 @@
                fidl::InterfaceRequest<Page> page_request,
                fit::function<void(storage::Status)> callback);
 
+  // Registers "interest" in this |PageManager| for which this |PageManager|
+  // will remain non-empty and returns a closure that when called will
+  // deregister the "interest" in this |PageManager| (and potentially cause this
+  // |PageManager|'s on_empty_callback_ to be called).
+  fit::closure CreateDetacher();
+
   void set_on_empty(fit::closure on_empty_callback) {
     on_empty_callback_ = std::move(on_empty_callback);
   }
@@ -147,6 +155,20 @@
   // The super manager is not empty until all operations have completed.
   uint64_t outstanding_operations_ = 0;
 
+  // The static Inspect object maintaining in Inspect a representation of this
+  // |PageManager|.
+  inspect::Node inspect_node_;
+  // The static Inspect object to which this |PageManager|'s commits are
+  // attached.
+  inspect::Node commits_node_;
+  fit::deferred_callback children_manager_retainer_;
+
+  // A nonnegative count of the number of "registered interests" for this
+  // |PageManager|. This field is incremented by calls to |CreateDetacher| and
+  // decremented by calls to the closures returned by calls to |CreateDetacher|.
+  // This |PageManager| is not considered empty while this number is positive.
+  int64_t outstanding_detachers_ = 0;
+
   // Must be the last member.
   fxl::WeakPtrFactory<PageManager> weak_factory_;
 
diff --git a/src/ledger/bin/app/page_manager_unittest.cc b/src/ledger/bin/app/page_manager_unittest.cc
index 6fe0089..66b6637 100644
--- a/src/ledger/bin/app/page_manager_unittest.cc
+++ b/src/ledger/bin/app/page_manager_unittest.cc
@@ -241,7 +241,7 @@
     page_manager_ = std::make_unique<PageManager>(
         &environment_, kLedgerName, convert::ToString(page_id_.id),
         disk_cleanup_manager_.get(), storage_.get(), sync_.get(),
-        ledger_merge_manager_.get());
+        ledger_merge_manager_.get(), inspect::Node());
   }
 
   PageId RandomId() {
@@ -284,6 +284,37 @@
   fidl::Binding<ConflictResolverFactory> binding_;
 };
 
+TEST_F(PageManagerTest, OnEmptyCalled) {
+  bool get_page_callback_called;
+  Status get_page_status;
+  bool on_empty_callback_called;
+
+  page_manager_->set_on_empty(
+      callback::Capture(callback::SetWhenCalled(&on_empty_callback_called)));
+
+  PagePtr page;
+  page_manager_->GetPage(
+      LedgerImpl::Delegate::PageState::NEW, page.NewRequest(),
+      callback::Capture(callback::SetWhenCalled(&get_page_callback_called),
+                        &get_page_status));
+  RunLoopUntilIdle();
+  EXPECT_TRUE(get_page_callback_called);
+  EXPECT_EQ(Status::OK, get_page_status);
+  EXPECT_FALSE(on_empty_callback_called);
+
+  fit::closure detacher = page_manager_->CreateDetacher();
+  RunLoopUntilIdle();
+  EXPECT_FALSE(on_empty_callback_called);
+
+  page.Unbind();
+  RunLoopUntilIdle();
+  EXPECT_FALSE(on_empty_callback_called);
+
+  detacher();
+  RunLoopUntilIdle();
+  EXPECT_TRUE(on_empty_callback_called);
+}
+
 TEST_F(PageManagerTest, PageIsClosedAndSyncedCheckNotFound) {
   bool called;
   Status status;
diff --git a/src/ledger/bin/testing/BUILD.gn b/src/ledger/bin/testing/BUILD.gn
index a80850a..acacb74 100644
--- a/src/ledger/bin/testing/BUILD.gn
+++ b/src/ledger/bin/testing/BUILD.gn
@@ -25,6 +25,8 @@
     "fake_disk_cleanup_manager.h",
     "get_page_ensure_initialized.cc",
     "get_page_ensure_initialized.h",
+    "inspect.cc",
+    "inspect.h",
     "ledger_matcher.cc",
     "ledger_matcher.h",
     "ledger_memory_usage.cc",
@@ -44,6 +46,7 @@
   public_deps = [
     "//garnet/public/lib/fsl",
     "//garnet/public/lib/gtest",
+    "//garnet/public/lib/inspect:inspect",
     "//garnet/public/lib/rapidjson_utils",
     "//garnet/public/lib/timekeeper:testing",
     "//peridot/lib/convert",
@@ -63,6 +66,7 @@
     "//third_party/googletest:gmock",
     "//third_party/googletest:gtest",
     "//third_party/rapidjson",
+    "//zircon/public/lib/async-testing",
     "//zircon/public/lib/fit",
     "//zircon/public/lib/task-utils",
     "//zircon/public/lib/trace-provider-with-fdio",
@@ -70,8 +74,10 @@
   ]
 
   deps = [
+    "//garnet/public/lib/async_promise",
     "//garnet/public/lib/callback",
     "//garnet/public/lib/fsl",
+    "//garnet/public/lib/inspect:reader",
     "//sdk/fidl/fuchsia.net.oldhttp",
     "//src/ledger/bin/fidl",
     "//src/ledger/lib/coroutine",
diff --git a/src/ledger/bin/testing/inspect.cc b/src/ledger/bin/testing/inspect.cc
new file mode 100644
index 0000000..581e497
--- /dev/null
+++ b/src/ledger/bin/testing/inspect.cc
@@ -0,0 +1,63 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ledger/bin/testing/inspect.h"
+
+#include <lib/async-testing/test_loop.h>
+#include <lib/async_promise/executor.h>
+#include <lib/callback/capture.h>
+#include <lib/callback/set_when_called.h>
+#include <lib/fidl/cpp/interface_request.h>
+#include <lib/fit/function.h>
+#include <lib/inspect/deprecated/expose.h>
+#include <lib/inspect/hierarchy.h>
+#include <lib/inspect/inspect.h>
+#include <lib/inspect/reader.h>
+
+#include <functional>
+#include <string>
+#include <utility>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+namespace ledger {
+
+testing::AssertionResult Inspect(inspect::Node* top_level_node,
+                                 async::TestLoop* test_loop,
+                                 inspect::ObjectHierarchy* hierarchy) {
+  bool callback_called;
+  bool success;
+  fidl::InterfaceHandle<fuchsia::inspect::Inspect> inspect_handle;
+  top_level_node->object_dir().object()->OpenChild(
+      kSystemUnderTestAttachmentPointPathComponent.ToString(),
+      inspect_handle.NewRequest(),
+      callback::Capture(callback::SetWhenCalled(&callback_called), &success));
+  test_loop->RunUntilIdle();
+  if (!callback_called) {
+    return ::testing::AssertionFailure()
+           << "Callback passed to OpenChild not called!";
+  }
+  if (!success) {
+    return ::testing::AssertionFailure() << "OpenChild not successful!";
+  }
+
+  async::Executor executor(test_loop->dispatcher());
+  fit::result<inspect::ObjectHierarchy> hierarchy_result;
+  auto hierarchy_promise =
+      inspect::ReadFromFidl(inspect::ObjectReader(std::move(inspect_handle)))
+          .then([&](fit::result<inspect::ObjectHierarchy>&
+                        then_hierarchy_result) {
+            hierarchy_result = std::move(then_hierarchy_result);
+          });
+  executor.schedule_task(std::move(hierarchy_promise));
+  test_loop->RunUntilIdle();
+  if (!hierarchy_result.is_ok()) {
+    return ::testing::AssertionFailure() << "Hierarchy result not okay!";
+  }
+  *hierarchy = hierarchy_result.take_value();
+  return ::testing::AssertionSuccess();
+}
+
+}  // namespace ledger
diff --git a/src/ledger/bin/testing/inspect.h b/src/ledger/bin/testing/inspect.h
new file mode 100644
index 0000000..3308a03c
--- /dev/null
+++ b/src/ledger/bin/testing/inspect.h
@@ -0,0 +1,32 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SRC_LEDGER_BIN_TESTING_INSPECT_H_
+#define SRC_LEDGER_BIN_TESTING_INSPECT_H_
+
+#include <lib/async-testing/test_loop.h>
+#include <lib/inspect/deprecated/expose.h>
+#include <lib/inspect/hierarchy.h>
+#include <lib/inspect/inspect.h>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+#include "src/lib/fxl/strings/string_view.h"
+
+namespace ledger {
+
+inline constexpr fxl::StringView kSystemUnderTestAttachmentPointPathComponent =
+    "attachment_point";
+
+// Given an |inspect::Node| under which another |inspect::Node| is available at
+// |kSystemUnderTestAttachmentPointPathComponent|, reads the exposed Inspect
+// data of the system under test and assigned to |hierarchy| the
+// |inspect::ObjectHierarchy| of the read data.
+testing::AssertionResult Inspect(inspect::Node* top_level_node,
+                                 async::TestLoop* test_loop,
+                                 inspect::ObjectHierarchy* hierarchy);
+
+}  // namespace ledger
+
+#endif  // SRC_LEDGER_BIN_TESTING_INSPECT_H_