[ledger] Implement PageSnapshot new API.

Also implement deprecated API in term of the new API.

LE-618

Change-Id: I083c22c16ec2ccb7902bd168f0ff4a3d02e3389d
diff --git a/src/ledger/bin/app/page_snapshot_impl.cc b/src/ledger/bin/app/page_snapshot_impl.cc
index b9b0a00..211e82d 100644
--- a/src/ledger/bin/app/page_snapshot_impl.cc
+++ b/src/ledger/bin/app/page_snapshot_impl.cc
@@ -28,16 +28,6 @@
 namespace ledger {
 namespace {
 
-// Transform a SizedVmo to an optional Buffer. Returns null when
-// status is not OK, or a not-null transport otherwise.
-fuchsia::mem::BufferPtr ToOptionalTransport(storage::Status status,
-                                            fsl::SizedVmo vmo) {
-  if (status != storage::Status::OK) {
-    return nullptr;
-  }
-  return fidl::MakeOptional(std::move(vmo).ToTransport());
-}
-
 template <typename EntryType>
 EntryType CreateEntry(const storage::Entry& entry) {
   EntryType result;
@@ -106,7 +96,8 @@
     storage::PageStorage* page_storage, const std::string& key_prefix,
     const storage::Commit* commit, std::vector<uint8_t> key_start,
     std::unique_ptr<Token> token,
-    fit::function<void(Status, std::vector<EntryType>, std::unique_ptr<Token>)>
+    fit::function<void(Status, IterationStatus, std::vector<EntryType>,
+                       std::unique_ptr<Token>)>
         callback) {
   // |token| represents the first key to be returned in the list of entries.
   // Initially, all entries starting from |token| are requested from storage.
@@ -173,110 +164,128 @@
     return true;
   };
 
-  auto on_done = [waiter, context = std::move(context),
-                  callback = std::move(timed_callback)](
-                     storage::Status status) mutable {
-    if (status != storage::Status::OK) {
-      FXL_LOG(ERROR) << "Error while reading: " << status;
-      callback(Status::IO_ERROR, std::vector<EntryType>(), nullptr);
-      return;
-    }
-    fit::function<void(storage::Status,
-                       std::vector<std::unique_ptr<const storage::Object>>)>
-        result_callback =
-            [callback = std::move(callback), context = std::move(context)](
-                storage::Status status,
-                std::vector<std::unique_ptr<const storage::Object>>
-                    results) mutable {
-              if (status != storage::Status::OK) {
-                FXL_LOG(ERROR) << "Error while reading: " << status;
-                callback(Status::IO_ERROR, std::vector<EntryType>(), nullptr);
-                return;
-              }
-              FXL_DCHECK(context->entries.size() == results.size());
-              size_t real_size = 0;
-              size_t i = 0;
-              for (; i < results.size(); i++) {
-                EntryType& entry = context->entries.at(i);
-                size_t next_token_size =
-                    i + 1 >= results.size()
-                        ? 0
-                        : fidl_serialization::GetByteVectorSize(
-                              context->entries.at(i + 1).key.size());
-                if (!results[i]) {
-                  size_t entry_size = ComputeEntrySize(entry);
-                  if (real_size + entry_size + next_token_size >
-                      fidl_serialization::kMaxInlineDataSize) {
-                    break;
+  auto on_done =
+      [waiter, context = std::move(context),
+       callback = std::move(timed_callback)](storage::Status status) mutable {
+        if (status != storage::Status::OK) {
+          FXL_LOG(ERROR) << "Error while reading: " << status;
+          callback(Status::IO_ERROR, IterationStatus::OK,
+                   std::vector<EntryType>(), nullptr);
+          return;
+        }
+        fit::function<void(storage::Status,
+                           std::vector<std::unique_ptr<const storage::Object>>)>
+            result_callback =
+                [callback = std::move(callback), context = std::move(context)](
+                    storage::Status status,
+                    std::vector<std::unique_ptr<const storage::Object>>
+                        results) mutable {
+                  if (status != storage::Status::OK) {
+                    FXL_LOG(ERROR) << "Error while reading: " << status;
+                    callback(Status::IO_ERROR, IterationStatus::OK,
+                             std::vector<EntryType>(), nullptr);
+                    return;
                   }
-                  real_size += entry_size;
-                  // We don't have the object locally, but we decided not to
-                  // abort. This means this object is a value of a lazy key and
-                  // the client should ask to retrieve it over the network if
-                  // they need it. Here, we just leave the value part of the
-                  // entry null.
-                  continue;
-                }
+                  FXL_DCHECK(context->entries.size() == results.size());
+                  size_t real_size = 0;
+                  size_t i = 0;
+                  for (; i < results.size(); i++) {
+                    EntryType& entry = context->entries.at(i);
+                    size_t next_token_size =
+                        i + 1 >= results.size()
+                            ? 0
+                            : fidl_serialization::GetByteVectorSize(
+                                  context->entries.at(i + 1).key.size());
+                    if (!results[i]) {
+                      size_t entry_size = ComputeEntrySize(entry);
+                      if (real_size + entry_size + next_token_size >
+                          fidl_serialization::kMaxInlineDataSize) {
+                        break;
+                      }
+                      real_size += entry_size;
+                      // We don't have the object locally, but we decided not to
+                      // abort. This means this object is a value of a lazy key
+                      // and the client should ask to retrieve it over the
+                      // network if they need it. Here, we just leave the value
+                      // part of the entry null.
+                      continue;
+                    }
 
-                storage::Status read_status =
-                    FillSingleEntry(*results[i], &entry);
-                if (read_status != storage::Status::OK) {
-                  callback(PageUtils::ConvertStatus(read_status),
-                           std::vector<EntryType>(), nullptr);
-                  return;
-                }
-                size_t entry_size = ComputeEntrySize(entry);
-                if (real_size + entry_size + next_token_size >
-                    fidl_serialization::kMaxInlineDataSize) {
-                  break;
-                }
-                real_size += entry_size;
-              }
-              if (i != results.size()) {
-                if (i == 0) {
-                  callback(Status::VALUE_TOO_LARGE, std::vector<EntryType>(),
-                           nullptr);
-                  return;
-                }
-                // We had to bail out early because the result would be too big
-                // otherwise.
-                context->next_token = std::make_unique<Token>();
-                context->next_token->opaque_id =
-                    std::move(context->entries.at(i).key);
-                context->entries.resize(i);
-              }
-              if (context->next_token) {
-                callback(Status::PARTIAL_RESULT, std::move(context->entries),
-                         std::move(context->next_token));
-                return;
-              }
-              callback(Status::OK, std::move(context->entries), nullptr);
-            };
-    waiter->Finalize(std::move(result_callback));
-  };
+                    storage::Status read_status =
+                        FillSingleEntry(*results[i], &entry);
+                    if (read_status != storage::Status::OK) {
+                      callback(PageUtils::ConvertStatus(read_status),
+                               IterationStatus::OK, std::vector<EntryType>(),
+                               nullptr);
+                      return;
+                    }
+                    size_t entry_size = ComputeEntrySize(entry);
+                    if (real_size + entry_size + next_token_size >
+                        fidl_serialization::kMaxInlineDataSize) {
+                      break;
+                    }
+                    real_size += entry_size;
+                  }
+                  if (i != results.size()) {
+                    if (i == 0) {
+                      callback(Status::VALUE_TOO_LARGE, IterationStatus::OK,
+                               std::vector<EntryType>(), nullptr);
+                      return;
+                    }
+                    // We had to bail out early because the result would be too
+                    // big otherwise.
+                    context->next_token = std::make_unique<Token>();
+                    context->next_token->opaque_id =
+                        std::move(context->entries.at(i).key);
+                    context->entries.resize(i);
+                  }
+                  if (context->next_token) {
+                    callback(Status::OK, IterationStatus::PARTIAL_RESULT,
+                             std::move(context->entries),
+                             std::move(context->next_token));
+                    return;
+                  }
+                  callback(Status::OK, IterationStatus::OK,
+                           std::move(context->entries), nullptr);
+                };
+        waiter->Finalize(std::move(result_callback));
+      };
   page_storage->GetCommitContents(*commit, std::move(start), std::move(on_next),
                                   std::move(on_done));
 }
 
 // Adapt callback for the error notifier API.
 template <typename... A>
-static fit::function<void(Status, A...)> AdaptCallback(
+fit::function<void(Status, IterationStatus, A...)> AdaptCallback(
     fit::function<void(Status, Status, A...)> callback) {
-  return [callback = std::move(callback)](Status status, A... args) {
-    Status disconnect_status;
-    switch (status) {
-      case Status::PARTIAL_RESULT:
-      case Status::KEY_NOT_FOUND:
-      case Status::NEEDS_FETCH:
-      case Status::NETWORK_ERROR:
-        disconnect_status = Status::OK;
-        break;
-      default:
-        disconnect_status = status;
-    }
-    callback(disconnect_status, status, std::forward<A>(args)...);
+  return [callback = std::move(callback)](
+             Status status, IterationStatus iteration_status, A... args) {
+    callback(status,
+             iteration_status == IterationStatus::OK ? Status::OK
+                                                     : Status::PARTIAL_RESULT,
+             std::forward<A>(args)...);
   };
 }
+
+Status ToStatus(fuchsia::ledger::Error error) {
+  switch (error) {
+    case fuchsia::ledger::Error::KEY_NOT_FOUND:
+      return Status::KEY_NOT_FOUND;
+    case fuchsia::ledger::Error::NEEDS_FETCH:
+      return Status::NEEDS_FETCH;
+    case fuchsia::ledger::Error::NETWORK_ERROR:
+      return Status::NETWORK_ERROR;
+  }
+  FXL_DCHECK(false);
+  return Status::OK;
+}
+
+template <typename Result>
+Result ToErrorResult(fuchsia::ledger::Error error) {
+  Result result;
+  result.set_err(error);
+  return result;
+}
 }  // namespace
 
 PageSnapshotImpl::PageSnapshotImpl(
@@ -293,9 +302,8 @@
     fit::function<void(Status, Status, std::vector<Entry>,
                        std::unique_ptr<Token>)>
         callback) {
-  FillEntries<Entry>(page_storage_, key_prefix_, commit_.get(),
-                     std::move(key_start), std::move(token),
-                     AdaptCallback(std::move(callback)));
+  GetEntriesNew(std::move(key_start), std::move(token),
+                AdaptCallback(std::move(callback)));
 }
 
 void PageSnapshotImpl::GetEntriesInline(
@@ -303,9 +311,8 @@
     fit::function<void(Status, Status, std::vector<InlinedEntry>,
                        std::unique_ptr<Token>)>
         callback) {
-  FillEntries<InlinedEntry>(page_storage_, key_prefix_, commit_.get(),
-                            std::move(key_start), std::move(token),
-                            AdaptCallback(std::move(callback)));
+  GetEntriesInlineNew(std::move(key_start), std::move(token),
+                      AdaptCallback(std::move(callback)));
 }
 
 void PageSnapshotImpl::GetKeys(
@@ -313,6 +320,122 @@
     fit::function<void(Status, Status, std::vector<std::vector<uint8_t>>,
                        std::unique_ptr<Token>)>
         callback) {
+  GetKeysNew(std::move(key_start), std::move(token),
+             AdaptCallback(std::move(callback)));
+}
+
+void PageSnapshotImpl::Get(
+    std::vector<uint8_t> key,
+    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+        callback) {
+  GetNew(
+      std::move(key),
+      [callback = std::move(callback)](
+          Status status, fuchsia::ledger::PageSnapshot_GetNew_Result result) {
+        if (status != Status::OK) {
+          callback(status, status, nullptr);
+          return;
+        }
+        if (result.is_err()) {
+          callback(Status::OK, ToStatus(result.err()), nullptr);
+          return;
+        }
+        callback(Status::OK, Status::OK,
+                 fidl::MakeOptional(std::move(result.response().buffer)));
+      });
+}
+
+void PageSnapshotImpl::GetInline(
+    std::vector<uint8_t> key,
+    fit::function<void(Status, Status, std::unique_ptr<InlinedValue>)>
+        callback) {
+  GetInlineNew(std::move(key),
+               [callback = std::move(callback)](
+                   Status status,
+                   fuchsia::ledger::PageSnapshot_GetInlineNew_Result result) {
+                 if (status != Status::OK) {
+                   callback(status, status, nullptr);
+                   return;
+                 }
+                 if (result.is_err()) {
+                   callback(Status::OK, ToStatus(result.err()), nullptr);
+                   return;
+                 }
+                 callback(
+                     Status::OK, Status::OK,
+                     fidl::MakeOptional(std::move(result.response().value)));
+               });
+}
+
+void PageSnapshotImpl::Fetch(
+    std::vector<uint8_t> key,
+    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+        callback) {
+  FetchNew(
+      std::move(key),
+      [callback = std::move(callback)](
+          Status status, fuchsia::ledger::PageSnapshot_FetchNew_Result result) {
+        if (status != Status::OK) {
+          callback(status, status, nullptr);
+          return;
+        }
+        if (result.is_err()) {
+          callback(Status::OK, ToStatus(result.err()), nullptr);
+          return;
+        }
+        callback(Status::OK, Status::OK,
+                 fidl::MakeOptional(std::move(result.response().buffer)));
+      });
+}
+
+void PageSnapshotImpl::FetchPartial(
+    std::vector<uint8_t> key, int64_t offset, int64_t max_size,
+    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+        callback) {
+  FetchPartialNew(
+      std::move(key), offset, max_size,
+      [callback = std::move(callback)](
+          Status status,
+          fuchsia::ledger::PageSnapshot_FetchPartialNew_Result result) {
+        if (status != Status::OK) {
+          callback(status, status, nullptr);
+          return;
+        }
+        if (result.is_err()) {
+          callback(Status::OK, ToStatus(result.err()), nullptr);
+          return;
+        }
+        callback(Status::OK, Status::OK,
+                 fidl::MakeOptional(std::move(result.response().buffer)));
+      });
+}
+
+void PageSnapshotImpl::GetEntriesNew(
+    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
+    fit::function<void(Status, IterationStatus, std::vector<Entry>,
+                       std::unique_ptr<Token>)>
+        callback) {
+  FillEntries<Entry>(page_storage_, key_prefix_, commit_.get(),
+                     std::move(key_start), std::move(token),
+                     std::move(callback));
+}
+
+void PageSnapshotImpl::GetEntriesInlineNew(
+    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
+    fit::function<void(Status, IterationStatus, std::vector<InlinedEntry>,
+                       std::unique_ptr<Token>)>
+        callback) {
+  FillEntries<InlinedEntry>(page_storage_, key_prefix_, commit_.get(),
+                            std::move(key_start), std::move(token),
+                            std::move(callback));
+}
+
+void PageSnapshotImpl::GetKeysNew(
+    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
+    fit::function<void(Status, IterationStatus,
+                       std::vector<std::vector<uint8_t>>,
+                       std::unique_ptr<Token>)>
+        callback) {
   // Represents the information that needs to be shared between on_next and
   // on_done callbacks.
   struct Context {
@@ -326,8 +449,8 @@
     std::unique_ptr<Token> next_token;
   };
 
-  auto timed_callback = TRACE_CALLBACK(AdaptCallback(std::move(callback)),
-                                       "ledger", "snapshot_get_keys");
+  auto timed_callback =
+      TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_keys");
 
   auto context = std::make_unique<Context>();
   auto on_next = [this, context = context.get()](storage::Entry entry) {
@@ -348,14 +471,16 @@
                       std::move(timed_callback)](storage::Status status) {
     if (status != storage::Status::OK) {
       FXL_LOG(ERROR) << "Error while reading: " << status;
-      callback(Status::IO_ERROR, std::vector<std::vector<uint8_t>>(), nullptr);
+      callback(Status::IO_ERROR, IterationStatus::OK,
+               std::vector<std::vector<uint8_t>>(), nullptr);
       return;
     }
     if (context->next_token) {
-      callback(Status::PARTIAL_RESULT, std::move(context->keys),
-               std::move(context->next_token));
+      callback(Status::OK, IterationStatus::PARTIAL_RESULT,
+               std::move(context->keys), std::move(context->next_token));
     } else {
-      callback(Status::OK, std::move(context->keys), nullptr);
+      callback(Status::OK, IterationStatus::OK, std::move(context->keys),
+               nullptr);
     }
   };
   if (token) {
@@ -369,19 +494,26 @@
   }
 }
 
-void PageSnapshotImpl::Get(
+void PageSnapshotImpl::GetNew(
     std::vector<uint8_t> key,
-    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+    fit::function<void(Status, fuchsia::ledger::PageSnapshot_GetNew_Result)>
         callback) {
-  auto timed_callback = TRACE_CALLBACK(AdaptCallback(std::move(callback)),
-                                       "ledger", "snapshot_get");
+  auto timed_callback =
+      TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get");
 
   page_storage_->GetEntryFromCommit(
       *commit_, convert::ToString(key),
       [this, callback = std::move(timed_callback)](
           storage::Status status, storage::Entry entry) mutable {
+        if (status == storage::Status::KEY_NOT_FOUND) {
+          callback(Status::OK,
+                   ToErrorResult<fuchsia::ledger::PageSnapshot_GetNew_Result>(
+                       fuchsia::ledger::Error::KEY_NOT_FOUND));
+          return;
+        }
         if (status != storage::Status::OK) {
-          callback(PageUtils::ConvertStatus(status), nullptr);
+          callback(PageUtils::ConvertStatus(status),
+                   fuchsia::ledger::PageSnapshot_GetNew_Result());
           return;
         }
         PageUtils::ResolveObjectIdentifierAsBuffer(
@@ -390,30 +522,47 @@
             storage::PageStorage::Location::LOCAL,
             [callback = std::move(callback)](storage::Status status,
                                              fsl::SizedVmo data) {
-              Status ledger_status =
-                  status == storage::Status::INTERNAL_NOT_FOUND
-                      ? Status::NEEDS_FETCH
-                      : PageUtils::ConvertStatus(status);
-
-              callback(ledger_status,
-                       ToOptionalTransport(status, std::move(data)));
+              if (status == storage::Status::INTERNAL_NOT_FOUND) {
+                callback(
+                    Status::OK,
+                    ToErrorResult<fuchsia::ledger::PageSnapshot_GetNew_Result>(
+                        fuchsia::ledger::Error::NEEDS_FETCH));
+                return;
+              }
+              if (status != storage::Status::OK) {
+                callback(PageUtils::ConvertStatus(status),
+                         fuchsia::ledger::PageSnapshot_GetNew_Result());
+                return;
+              }
+              fuchsia::ledger::PageSnapshot_GetNew_Result result;
+              result.response().buffer = std::move(data).ToTransport();
+              callback(Status::OK, std::move(result));
             });
       });
 }
 
-void PageSnapshotImpl::GetInline(
+void PageSnapshotImpl::GetInlineNew(
     std::vector<uint8_t> key,
-    fit::function<void(Status, Status, std::unique_ptr<InlinedValue>)>
+    fit::function<void(Status,
+                       fuchsia::ledger::PageSnapshot_GetInlineNew_Result)>
         callback) {
-  auto timed_callback = TRACE_CALLBACK(AdaptCallback(std::move(callback)),
-                                       "ledger", "snapshot_get_inline");
+  auto timed_callback =
+      TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_get_inline");
 
   page_storage_->GetEntryFromCommit(
       *commit_, convert::ToString(key),
       [this, callback = std::move(timed_callback)](
           storage::Status status, storage::Entry entry) mutable {
+        if (status == storage::Status::KEY_NOT_FOUND) {
+          callback(
+              Status::OK,
+              ToErrorResult<fuchsia::ledger::PageSnapshot_GetInlineNew_Result>(
+                  fuchsia::ledger::Error::KEY_NOT_FOUND));
+          return;
+        }
         if (status != storage::Status::OK) {
-          callback(PageUtils::ConvertStatus(status), nullptr);
+          callback(PageUtils::ConvertStatus(status),
+                   fuchsia::ledger::PageSnapshot_GetInlineNew_Result());
           return;
         }
         PageUtils::ResolveObjectIdentifierAsStringView(
@@ -421,70 +570,77 @@
             storage::PageStorage::Location::LOCAL,
             [callback = std::move(callback)](storage::Status status,
                                              fxl::StringView data_view) {
-              Status ledger_status =
-                  status == storage::Status::INTERNAL_NOT_FOUND
-                      ? Status::NEEDS_FETCH
-                      : PageUtils::ConvertStatus(status);
-              if (ledger_status != Status::OK) {
-                callback(ledger_status, nullptr);
+              if (status == storage::Status::INTERNAL_NOT_FOUND) {
+                callback(Status::OK,
+                         ToErrorResult<
+                             fuchsia::ledger::PageSnapshot_GetInlineNew_Result>(
+                             fuchsia::ledger::Error::NEEDS_FETCH));
+                return;
+              }
+              if (status != storage::Status::OK) {
+                callback(PageUtils::ConvertStatus(status),
+                         fuchsia::ledger::PageSnapshot_GetInlineNew_Result());
                 return;
               }
               if (fidl_serialization::GetByteVectorSize(data_view.size()) +
                       fidl_serialization::kStatusEnumSize >
                   fidl_serialization::kMaxInlineDataSize) {
-                callback(Status::VALUE_TOO_LARGE, nullptr);
+                callback(Status::VALUE_TOO_LARGE,
+                         fuchsia::ledger::PageSnapshot_GetInlineNew_Result());
                 return;
               }
-              auto inlined_value = std::make_unique<InlinedValue>();
-              inlined_value->value = convert::ToArray(data_view);
-              callback(Status::OK, std::move(inlined_value));
+              fuchsia::ledger::PageSnapshot_GetInlineNew_Result result;
+              result.response().value.value = convert::ToArray(data_view);
+              callback(Status::OK, std::move(result));
             });
       });
 }
 
-void PageSnapshotImpl::Fetch(
+void PageSnapshotImpl::FetchNew(
     std::vector<uint8_t> key,
-    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+    fit::function<void(Status, fuchsia::ledger::PageSnapshot_FetchNew_Result)>
         callback) {
-  auto timed_callback = TRACE_CALLBACK(AdaptCallback(std::move(callback)),
-                                       "ledger", "snapshot_fetch");
-
-  page_storage_->GetEntryFromCommit(
-      *commit_, convert::ToString(key),
-      [this, callback = std::move(timed_callback)](
-          storage::Status status, storage::Entry entry) mutable {
-        if (status != storage::Status::OK) {
-          callback(PageUtils::ConvertStatus(status), nullptr);
+  FetchPartialNew(
+      std::move(key), 0, -1,
+      [callback = std::move(callback)](
+          Status status,
+          fuchsia::ledger::PageSnapshot_FetchPartialNew_Result result) {
+        if (status != Status::OK) {
+          callback(status, fuchsia::ledger::PageSnapshot_FetchNew_Result());
           return;
         }
-        PageUtils::ResolveObjectIdentifierAsBuffer(
-            page_storage_, entry.object_identifier, 0u,
-            std::numeric_limits<int64_t>::max(),
-            storage::PageStorage::Location::NETWORK,
-            [callback = std::move(callback)](storage::Status status,
-                                             fsl::SizedVmo data) {
-              callback(PageUtils::ConvertStatus(status),
-                       ToOptionalTransport(status, std::move(data)));
-            });
+        fuchsia::ledger::PageSnapshot_FetchNew_Result new_result;
+        if (result.is_err()) {
+          new_result.set_err(result.err());
+        } else {
+          new_result.response().buffer = std::move(result.response().buffer);
+        }
+        callback(Status::OK, std::move(new_result));
       });
 }
 
-void PageSnapshotImpl::FetchPartial(
+void PageSnapshotImpl::FetchPartialNew(
     std::vector<uint8_t> key, int64_t offset, int64_t max_size,
-    fit::function<void(Status, Status, std::unique_ptr<fuchsia::mem::Buffer>)>
+    fit::function<void(Status,
+                       fuchsia::ledger::PageSnapshot_FetchPartialNew_Result)>
         callback) {
-  auto timed_callback = TRACE_CALLBACK(AdaptCallback(std::move(callback)),
-                                       "ledger", "snapshot_fetch_partial");
+  auto timed_callback =
+      TRACE_CALLBACK(std::move(callback), "ledger", "snapshot_fetch_partial");
 
   page_storage_->GetEntryFromCommit(
       *commit_, convert::ToString(key),
       [this, offset, max_size, callback = std::move(timed_callback)](
           storage::Status status, storage::Entry entry) mutable {
-        Status ledger_status = status == storage::Status::INTERNAL_NOT_FOUND
-                                   ? Status::NEEDS_FETCH
-                                   : PageUtils::ConvertStatus(status);
-        if (ledger_status != Status::OK) {
-          callback(ledger_status, nullptr);
+        if (status == storage::Status::KEY_NOT_FOUND) {
+          callback(Status::OK,
+                   ToErrorResult<
+                       fuchsia::ledger::PageSnapshot_FetchPartialNew_Result>(
+                       fuchsia::ledger::Error::KEY_NOT_FOUND));
+          return;
+        }
+        if (status != storage::Status::OK) {
+          callback(PageUtils::ConvertStatus(status),
+                   fuchsia::ledger::PageSnapshot_FetchPartialNew_Result());
           return;
         }
 
@@ -493,65 +649,25 @@
             storage::PageStorage::Location::NETWORK,
             [callback = std::move(callback)](storage::Status status,
                                              fsl::SizedVmo data) {
-              callback(PageUtils::ConvertStatus(status),
-                       ToOptionalTransport(status, std::move(data)));
+              if (status == storage::Status::NETWORK_ERROR) {
+                callback(
+                    Status::OK,
+                    ToErrorResult<
+                        fuchsia::ledger::PageSnapshot_FetchPartialNew_Result>(
+                        fuchsia::ledger::Error::NETWORK_ERROR));
+                return;
+              }
+              if (status != storage::Status::OK) {
+                callback(
+                    PageUtils::ConvertStatus(status),
+                    fuchsia::ledger::PageSnapshot_FetchPartialNew_Result());
+                return;
+              }
+              fuchsia::ledger::PageSnapshot_FetchPartialNew_Result result;
+              result.response().buffer = std::move(data).ToTransport();
+              callback(Status::OK, std::move(result));
             });
       });
 }
 
-void PageSnapshotImpl::GetEntriesNew(
-    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
-    fit::function<void(Status, IterationStatus, std::vector<Entry>,
-                       std::unique_ptr<Token>)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::GetEntriesInlineNew(
-    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
-    fit::function<void(Status, IterationStatus, std::vector<InlinedEntry>,
-                       std::unique_ptr<Token>)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::GetKeysNew(
-    std::vector<uint8_t> key_start, std::unique_ptr<Token> token,
-    fit::function<void(Status, IterationStatus,
-                       std::vector<std::vector<uint8_t>>,
-                       std::unique_ptr<Token>)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::GetNew(
-    std::vector<uint8_t> key,
-    fit::function<void(Status, fuchsia::ledger::PageSnapshot_GetNew_Result)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::GetInlineNew(
-    std::vector<uint8_t> key,
-    fit::function<void(Status,
-                       fuchsia::ledger::PageSnapshot_GetInlineNew_Result)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::FetchNew(
-    std::vector<uint8_t> key,
-    fit::function<void(Status, fuchsia::ledger::PageSnapshot_FetchNew_Result)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
-void PageSnapshotImpl::FetchPartialNew(
-    std::vector<uint8_t> key, int64_t offset, int64_t max_size,
-    fit::function<void(Status,
-                       fuchsia::ledger::PageSnapshot_FetchPartialNew_Result)>
-        callback) {
-  FXL_NOTIMPLEMENTED();
-}
-
 }  // namespace ledger