[ledger] Fuzz test Ledger P2P stack

Adds a fuzz test to our P2P synchronization code, and fixes issues
discovered by this fuzz test: lack of deep verification of incoming
flatbuffers.

To achieve this deep verification, I refactored the message parsing code
for P2P. We now have one place for verification, then the message
pointers are passed around.

Change-Id: Ie770e3b2c83f46b6df1f8341b251b51eb0b7c9ea
diff --git a/src/ledger/bin/BUILD.gn b/src/ledger/bin/BUILD.gn
index 36d1ba0..042abc8 100644
--- a/src/ledger/bin/BUILD.gn
+++ b/src/ledger/bin/BUILD.gn
@@ -71,7 +71,10 @@
 }
 
 fuzz_package("ledger_fuzzers") {
-  targets = [ "//src/ledger/bin/storage/impl/btree:encoding_fuzzer" ]
+  targets = [
+    "//src/ledger/bin/p2p_sync/impl:p2p_sync_fuzzer",
+    "//src/ledger/bin/storage/impl/btree:encoding_fuzzer",
+  ]
   sanitizers = [
     "asan",
     "ubsan",
diff --git a/src/ledger/bin/p2p_sync/impl/BUILD.gn b/src/ledger/bin/p2p_sync/impl/BUILD.gn
index cec55e3..a6aeeb2 100644
--- a/src/ledger/bin/p2p_sync/impl/BUILD.gn
+++ b/src/ledger/bin/p2p_sync/impl/BUILD.gn
@@ -2,6 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import("//build/fuzzing/fuzzer.gni")
 import("//third_party/flatbuffers/flatbuffer.gni")
 
 visibility = [ "//src/ledger/*" ]
@@ -11,6 +12,8 @@
     "commit_batch.cc",
     "commit_batch.h",
     "device_mesh.h",
+    "encoding.cc",
+    "encoding.h",
     "flatbuffer_message_factory.cc",
     "flatbuffer_message_factory.h",
     "ledger_communicator_impl.cc",
@@ -51,12 +54,14 @@
   testonly = true
 
   sources = [
+    "encoding_unittest.cc",
     "page_communicator_impl_unittest.cc",
     "user_communicator_impl_unittest.cc",
   ]
 
   deps = [
     ":impl",
+    ":message",
     "//garnet/public/lib/fsl",
     "//sdk/lib/fidl/cpp",
     "//src/ledger/bin/p2p_provider/impl",
@@ -73,3 +78,14 @@
 
   configs += [ "//src/ledger:ledger_config" ]
 }
+
+fuzz_target("p2p_sync_fuzzer") {
+  sources = [
+    "user_communicator_impl_fuzztest.cc",
+  ]
+  deps = [
+    ":impl",
+    "//src/ledger/bin/p2p_provider/public",
+    "//src/ledger/bin/storage/testing",
+  ]
+}
diff --git a/src/ledger/bin/p2p_sync/impl/encoding.cc b/src/ledger/bin/p2p_sync/impl/encoding.cc
new file mode 100644
index 0000000..2f4a241
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding.cc
@@ -0,0 +1,193 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+
+#include "flatbuffers/flatbuffers.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
+#include "src/lib/fxl/strings/string_view.h"
+
+namespace p2p_sync {
+namespace {
+
+bool IsValidCommitRequest(flatbuffers::Verifier& verifier,
+                          const CommitRequest* request) {
+  return request && request->Verify(verifier) && request->commit_ids();
+}
+
+bool IsValidObjectRequest(flatbuffers::Verifier& verifier,
+                          const ObjectRequest* request) {
+  return request && request->Verify(verifier) && request->object_ids();
+}
+
+bool IsValidWatchStartRequest(flatbuffers::Verifier& verifier,
+                              const WatchStartRequest* request) {
+  return request && request->Verify(verifier);
+}
+
+bool IsValidWatchStopRequest(flatbuffers::Verifier& verifier,
+                             const WatchStopRequest* request) {
+  return request && request->Verify(verifier);
+}
+
+bool IsValidRequest(flatbuffers::Verifier& verifier, const Request* request) {
+  if (!request || !request->Verify(verifier)) {
+    return false;
+  }
+
+  const NamespacePageId* namespace_page_id = request->namespace_page();
+  if (!namespace_page_id || !namespace_page_id->Verify(verifier)) {
+    return false;
+  }
+
+  if (!namespace_page_id->namespace_id() || !namespace_page_id->page_id()) {
+    return false;
+  }
+
+  switch (request->request_type()) {
+    case RequestMessage_NONE:
+      return false;
+    case RequestMessage_CommitRequest:
+      return IsValidCommitRequest(
+          verifier, static_cast<const CommitRequest*>(request->request()));
+    case RequestMessage_ObjectRequest:
+      return IsValidObjectRequest(
+          verifier, static_cast<const ObjectRequest*>(request->request()));
+    case RequestMessage_WatchStartRequest:
+      return IsValidWatchStartRequest(
+          verifier, static_cast<const WatchStartRequest*>(request->request()));
+    case RequestMessage_WatchStopRequest:
+      return IsValidWatchStopRequest(
+          verifier, static_cast<const WatchStopRequest*>(request->request()));
+  }
+}
+
+bool IsValidCommitResponse(flatbuffers::Verifier& verifier,
+                           const CommitResponse* response) {
+  if (!response || !response->Verify(verifier)) {
+    return false;
+  }
+
+  if (!response->commits()) {
+    return false;
+  }
+
+  for (const Commit* commit : *response->commits()) {
+    if (!commit || !commit->Verify(verifier)) {
+      return false;
+    }
+
+    const CommitId* id = commit->id();
+    if (!id || !id->Verify(verifier) || !id->id()) {
+      return false;
+    }
+
+    const Data* data = commit->commit();
+    if (!data || !data->Verify(verifier) || !data->bytes()) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool IsValidObjectResponse(flatbuffers::Verifier& verifier,
+                           const ObjectResponse* response) {
+  if (!response || !response->Verify(verifier)) {
+    return false;
+  }
+
+  if (!response->objects()) {
+    return false;
+  }
+
+  for (const Object* object : *response->objects()) {
+    if (!object || !object->Verify(verifier)) {
+      return false;
+    }
+
+    const ObjectId* id = object->id();
+    if (!id || !id->Verify(verifier) || !id->digest()) {
+      return false;
+    }
+
+    const Data* data = object->data();
+    // No data is a valid response: it means the object was not found.
+    if (data && (!data->Verify(verifier) || !data->bytes())) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool IsValidResponse(flatbuffers::Verifier& verifier,
+                     const Response* response) {
+  if (!response) {
+    return false;
+  }
+
+  if (!response->Verify(verifier)) {
+    return false;
+  }
+
+  const NamespacePageId* namespace_page_id = response->namespace_page();
+  if (!namespace_page_id || !namespace_page_id->Verify(verifier)) {
+    return false;
+  }
+
+  if (!namespace_page_id->namespace_id() || !namespace_page_id->page_id()) {
+    return false;
+  }
+
+  switch (response->response_type()) {
+    case ResponseMessage_NONE:
+      // Response returned in case of unknown namespace or page.
+      return true;
+    case ResponseMessage_CommitResponse:
+      return IsValidCommitResponse(
+          verifier, static_cast<const CommitResponse*>(response->response()));
+    case ResponseMessage_ObjectResponse:
+      return IsValidObjectResponse(
+          verifier, static_cast<const ObjectResponse*>(response->response()));
+  }
+}
+
+}  // namespace
+
+const Message* ParseMessage(convert::ExtendedStringView data) {
+  flatbuffers::Verifier verifier(
+      reinterpret_cast<const unsigned char*>(data.data()), data.size());
+
+  if (!VerifyMessageBuffer(verifier)) {
+    return nullptr;
+  }
+
+  const Message* message = GetMessage(data.data());
+
+  if (!message || !message->Verify(verifier)) {
+    return nullptr;
+  }
+
+  switch (message->message_type()) {
+    case MessageUnion_NONE:
+      return nullptr;
+    case MessageUnion_Request:
+      if (!IsValidRequest(verifier,
+                          static_cast<const Request*>(message->message()))) {
+        return nullptr;
+      }
+      break;
+    case MessageUnion_Response:
+      if (!IsValidResponse(verifier,
+                           static_cast<const Response*>(message->message()))) {
+        return nullptr;
+      }
+      break;
+  }
+
+  return message;
+}
+
+}  // namespace p2p_sync
diff --git a/src/ledger/bin/p2p_sync/impl/encoding.h b/src/ledger/bin/p2p_sync/impl/encoding.h
new file mode 100644
index 0000000..aedf805
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding.h
@@ -0,0 +1,19 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
+#define SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
+
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
+
+namespace p2p_sync {
+
+// Parses |data| into a |Message| object. Returns nullptr if the message is
+// malformed. The returned pointer is valid as long as |data|'s data is valid.
+const Message* ParseMessage(convert::ExtendedStringView data);
+
+}  // namespace p2p_sync
+
+#endif  // SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
diff --git a/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc b/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc
new file mode 100644
index 0000000..a76bee9
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc
@@ -0,0 +1,83 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+
+#include "gtest/gtest.h"
+#include "message_generated.h"
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h"
+#include "src/lib/fxl/macros.h"
+
+namespace p2p_sync {
+namespace {
+
+TEST(ParseMessageTest, Valid_WatchStart) {
+  flatbuffers::FlatBufferBuilder buffer;
+  flatbuffers::Offset<WatchStartRequest> watch_start =
+      CreateWatchStartRequest(buffer);
+  flatbuffers::Offset<NamespacePageId> namespace_page_id =
+      CreateNamespacePageId(
+          buffer, convert::ToFlatBufferVector(&buffer, "namespace_id"),
+          convert::ToFlatBufferVector(&buffer, "page_id"));
+  flatbuffers::Offset<Request> request =
+      CreateRequest(buffer, namespace_page_id, RequestMessage_WatchStartRequest,
+                    watch_start.Union());
+  flatbuffers::Offset<Message> message =
+      CreateMessage(buffer, MessageUnion_Request, request.Union());
+  buffer.Finish(message);
+  const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+  EXPECT_NE(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Invalid_NoNamespace) {
+  flatbuffers::FlatBufferBuilder buffer;
+  flatbuffers::Offset<WatchStartRequest> watch_start =
+      CreateWatchStartRequest(buffer);
+  flatbuffers::Offset<NamespacePageId> namespace_page_id;
+  flatbuffers::Offset<Request> request =
+      CreateRequest(buffer, namespace_page_id, RequestMessage_WatchStartRequest,
+                    watch_start.Union());
+  flatbuffers::Offset<Message> message =
+      CreateMessage(buffer, MessageUnion_Request, request.Union());
+  buffer.Finish(message);
+  const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+  EXPECT_EQ(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Valid_ObjectResponseNoObject) {
+  flatbuffers::FlatBufferBuilder buffer;
+  flatbuffers::Offset<NamespacePageId> namespace_page_id =
+      CreateNamespacePageId(
+          buffer, convert::ToFlatBufferVector(&buffer, "namespace_id"),
+          convert::ToFlatBufferVector(&buffer, "page_id"));
+  flatbuffers::Offset<ObjectId> fb_object_id = CreateObjectId(
+      buffer, 1, 3, convert::ToFlatBufferVector(&buffer, "digest"));
+  std::vector<flatbuffers::Offset<Object>> fb_objects;
+  fb_objects.emplace_back(
+      CreateObject(buffer, fb_object_id, ObjectStatus_UNKNOWN_OBJECT));
+  flatbuffers::Offset<ObjectResponse> object_response =
+      CreateObjectResponse(buffer, buffer.CreateVector(fb_objects));
+  flatbuffers::Offset<Response> response =
+      CreateResponse(buffer, ResponseStatus_OK, namespace_page_id,
+                     ResponseMessage_ObjectResponse, object_response.Union());
+  flatbuffers::Offset<Message> message =
+      CreateMessage(buffer, MessageUnion_Response, response.Union());
+  buffer.Finish(message);
+
+  const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+  EXPECT_NE(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Valid_ResponseUnknownNamespace) {
+  flatbuffers::FlatBufferBuilder buffer;
+  CreateUnknownResponseMessage(&buffer, "namespace", "page",
+                               ResponseStatus_UNKNOWN_NAMESPACE);
+
+  const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+  EXPECT_NE(message_ptr, nullptr);
+}
+
+}  // namespace
+}  // namespace p2p_sync
diff --git a/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h b/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
index 6d85bce..c52b72f 100644
--- a/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
+++ b/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
@@ -12,6 +12,8 @@
 
 namespace p2p_sync {
 
+// Fill |buffer| with a new |Message| containing a |Response| for an unknown
+// namespace or page.
 void CreateUnknownResponseMessage(flatbuffers::FlatBufferBuilder* buffer,
                                   fxl::StringView namespace_id,
                                   fxl::StringView page_id,
diff --git a/src/ledger/bin/p2p_sync/impl/message_holder.h b/src/ledger/bin/p2p_sync/impl/message_holder.h
index add1a49..5f4bc43 100644
--- a/src/ledger/bin/p2p_sync/impl/message_holder.h
+++ b/src/ledger/bin/p2p_sync/impl/message_holder.h
@@ -8,9 +8,12 @@
 #include <lib/fit/function.h>
 
 #include <functional>
+#include <optional>
 #include <utility>
 #include <vector>
 
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
 #include "src/lib/fxl/strings/string_view.h"
 
 namespace p2p_sync {
@@ -20,14 +23,8 @@
 class MessageHolder {
  public:
   // Create a new MessageHolder from the provided data and a parser.
-  MessageHolder(fxl::StringView data,
-                fit::function<const M*(const uint8_t*)> get_message)
-      : data_(data.begin(), data.end()), message_(get_message(data_.data())) {}
-
-  // Create a new MessageHolder from the provided data and a parser.
-  MessageHolder(std::vector<uint8_t> data,
-                fit::function<const M*(const uint8_t*)> get_message)
-      : data_(std::move(data)), message_(get_message(data_.data())) {}
+  MessageHolder(std::unique_ptr<std::vector<uint8_t>> data, const M* message)
+      : data_(std::move(data)), message_(message) {}
 
   // Create a new MessageHolder from the current object and a function to
   // specialize the message. The current message holder is destroyed in the
@@ -71,10 +68,27 @@
     other.message_ = nullptr;
   }
 
-  std::vector<uint8_t> data_;
+  std::unique_ptr<std::vector<uint8_t>> data_;
   const M* message_;
 };
 
+// Creates a new MessageHolder to contained a message, or nullopt if no message
+// can be obtained.
+template <class M>
+std::optional<MessageHolder<M>> CreateMessageHolder(
+    fxl::StringView data,
+    fit::function<const M*(convert::ExtendedStringView)> get_message) {
+  std::unique_ptr<std::vector<uint8_t>> data_vec =
+      std::make_unique<std::vector<uint8_t>>(data.begin(), data.end());
+
+  const M* message = get_message(*data_vec);
+
+  if (!message) {
+    return std::nullopt;
+  }
+
+  return MessageHolder<M>(std::move(data_vec), message);
+}
 }  // namespace p2p_sync
 
 #endif  // SRC_LEDGER_BIN_P2P_SYNC_IMPL_MESSAGE_HOLDER_H_
diff --git a/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc b/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
index 1f794d9..cf927cf 100644
--- a/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
+++ b/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
@@ -11,6 +11,7 @@
 #include "peridot/lib/convert/convert.h"
 #include "src/ledger/bin/p2p_sync/impl/message_generated.h"
 #include "src/ledger/bin/storage/public/read_data_source.h"
+#include "src/ledger/bin/storage/public/types.h"
 #include "src/ledger/lib/coroutine/coroutine_waiter.h"
 #include "src/lib/fxl/memory/ref_ptr.h"
 
@@ -391,12 +392,15 @@
 
 void PageCommunicatorImpl::BuildWatchStartBuffer(
     flatbuffers::FlatBufferBuilder* buffer) {
+  flatbuffers::Offset<WatchStartRequest> watch_start =
+      CreateWatchStartRequest(*buffer);
   flatbuffers::Offset<NamespacePageId> namespace_page_id =
       CreateNamespacePageId(*buffer,
                             convert::ToFlatBufferVector(buffer, namespace_id_),
                             convert::ToFlatBufferVector(buffer, page_id_));
-  flatbuffers::Offset<Request> request = CreateRequest(
-      *buffer, namespace_page_id, RequestMessage_WatchStartRequest);
+  flatbuffers::Offset<Request> request =
+      CreateRequest(*buffer, namespace_page_id,
+                    RequestMessage_WatchStartRequest, watch_start.Union());
   flatbuffers::Offset<Message> message =
       CreateMessage(*buffer, MessageUnion_Request, request.Union());
   buffer->Finish(message);
@@ -404,12 +408,15 @@
 
 void PageCommunicatorImpl::BuildWatchStopBuffer(
     flatbuffers::FlatBufferBuilder* buffer) {
+  flatbuffers::Offset<WatchStopRequest> watch_stop =
+      CreateWatchStopRequest(*buffer);
   flatbuffers::Offset<NamespacePageId> namespace_page_id =
       CreateNamespacePageId(*buffer,
                             convert::ToFlatBufferVector(buffer, namespace_id_),
                             convert::ToFlatBufferVector(buffer, page_id_));
-  flatbuffers::Offset<Request> request = CreateRequest(
-      *buffer, namespace_page_id, RequestMessage_WatchStopRequest);
+  flatbuffers::Offset<Request> request =
+      CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest,
+                    watch_stop.Union());
   flatbuffers::Offset<Message> message =
       CreateMessage(*buffer, MessageUnion_Request, request.Union());
   buffer->Finish(message);
diff --git a/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc b/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
index 2c6c52f..ca4e81a 100644
--- a/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
+++ b/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
@@ -22,6 +22,8 @@
 #include "gmock/gmock.h"
 #include "peridot/lib/convert/convert.h"
 #include "src/ledger/bin/p2p_sync/impl/device_mesh.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
 #include "src/ledger/bin/storage/fake/fake_object.h"
 #include "src/ledger/bin/storage/testing/commit_empty_impl.h"
 #include "src/ledger/bin/storage/testing/page_storage_empty_impl.h"
@@ -202,12 +204,15 @@
 void BuildWatchStartBuffer(flatbuffers::FlatBufferBuilder* buffer,
                            fxl::StringView namespace_id,
                            fxl::StringView page_id) {
+  flatbuffers::Offset<WatchStartRequest> watch_start =
+      CreateWatchStartRequest(*buffer);
   flatbuffers::Offset<NamespacePageId> namespace_page_id =
       CreateNamespacePageId(*buffer,
                             convert::ToFlatBufferVector(buffer, namespace_id),
                             convert::ToFlatBufferVector(buffer, page_id));
-  flatbuffers::Offset<Request> request = CreateRequest(
-      *buffer, namespace_page_id, RequestMessage_WatchStartRequest);
+  flatbuffers::Offset<Request> request =
+      CreateRequest(*buffer, namespace_page_id,
+                    RequestMessage_WatchStartRequest, watch_start.Union());
   flatbuffers::Offset<Message> message =
       CreateMessage(*buffer, MessageUnion_Request, request.Union());
   buffer->Finish(message);
@@ -216,12 +221,15 @@
 void BuildWatchStopBuffer(flatbuffers::FlatBufferBuilder* buffer,
                           fxl::StringView namespace_id,
                           fxl::StringView page_id) {
+  flatbuffers::Offset<WatchStopRequest> watch_stop =
+      CreateWatchStopRequest(*buffer);
   flatbuffers::Offset<NamespacePageId> namespace_page_id =
       CreateNamespacePageId(*buffer,
                             convert::ToFlatBufferVector(buffer, namespace_id),
                             convert::ToFlatBufferVector(buffer, page_id));
-  flatbuffers::Offset<Request> request = CreateRequest(
-      *buffer, namespace_page_id, RequestMessage_WatchStopRequest);
+  flatbuffers::Offset<Request> request =
+      CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest,
+                    watch_stop.Union());
   flatbuffers::Offset<Message> message =
       CreateMessage(*buffer, MessageUnion_Request, request.Union());
   buffer->Finish(message);
@@ -409,8 +417,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
-                                            &GetMessage);
+  MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(new_device_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -464,8 +472,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
-                                            &GetMessage);
+  MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   // If storage fails to mark the page as synced to a peer, the mesh should not
   // be updated.
   storage.mark_synced_to_peer_status = storage::Status::IO_ERROR;
@@ -502,8 +510,8 @@
   BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
                            {MakeObjectIdentifier("object_digest"),
                             MakeObjectIdentifier("object_digest2")});
-  MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
-                                         &GetMessage);
+  MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(request_buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(request_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -558,8 +566,8 @@
   flatbuffers::FlatBufferBuilder request_buffer;
   BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
                            {MakeObjectIdentifier("object_digest")});
-  MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
-                                         &GetMessage);
+  MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(request_buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(request_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -607,8 +615,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
-                                            &GetMessage);
+  MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(new_device_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -635,8 +643,8 @@
       &response_buffer, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false),
        std::make_tuple(MakeObjectIdentifier("bar"), "bar_data", false)});
-  MessageHolder<Message> response_message(
-      convert::ToStringView(response_buffer), &GetMessage);
+  MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer), &ParseMessage);
   page_communicator.OnNewResponse(
       "device2", std::move(response_message)
                      .TakeAndMap<Response>([](const Message* message) {
@@ -658,8 +666,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
-                                            &GetMessage);
+  MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(new_device_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -685,8 +693,8 @@
   BuildObjectResponseBuffer(
       &response_buffer, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", true)});
-  MessageHolder<Message> response_message(
-      convert::ToStringView(response_buffer), &GetMessage);
+  MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer), &ParseMessage);
   page_communicator.OnNewResponse(
       "device2", std::move(response_message)
                      .TakeAndMap<Response>([](const Message* message) {
@@ -708,7 +716,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -734,8 +743,8 @@
   BuildObjectResponseBuffer(
       &response_buffer, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
-  MessageHolder<Message> response_message(
-      convert::ToStringView(response_buffer), &GetMessage);
+  MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer), &ParseMessage);
   page_communicator.OnNewResponse(
       "device2", std::move(response_message)
                      .TakeAndMap<Response>([](const Message* message) {
@@ -756,16 +765,18 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
         return static_cast<const Request*>(message->message());
       }));
-  message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message_copy = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device3",
-      std::move(message).TakeAndMap<Request>([](const Message* message) {
+      std::move(message_copy).TakeAndMap<Request>([](const Message* message) {
         return static_cast<const Request*>(message->message());
       }));
 
@@ -786,8 +797,8 @@
   BuildObjectResponseBuffer(
       &response_buffer_1, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
-  MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
-                                   &GetMessage);
+  MessageHolder<Message> message_1 = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer_1), &ParseMessage);
   page_communicator.OnNewResponse(
       "device2",
       std::move(message_1).TakeAndMap<Response>([](const Message* message) {
@@ -799,8 +810,8 @@
   BuildObjectResponseBuffer(
       &response_buffer_2, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false)});
-  MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
-                                   &GetMessage);
+  MessageHolder<Message> message_2 = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer_2), &ParseMessage);
   page_communicator.OnNewResponse(
       "device3",
       std::move(message_2).TakeAndMap<Response>([](const Message* message) {
@@ -823,16 +834,18 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
         return static_cast<const Request*>(message->message());
       }));
-  message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message_copy = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device3",
-      std::move(message).TakeAndMap<Request>([](const Message* message) {
+      std::move(message_copy).TakeAndMap<Request>([](const Message* message) {
         return static_cast<const Request*>(message->message());
       }));
 
@@ -853,8 +866,8 @@
   BuildObjectResponseBuffer(
       &response_buffer_1, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
-  MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
-                                   &GetMessage);
+  MessageHolder<Message> message_1 = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer_1), &ParseMessage);
   page_communicator.OnNewResponse(
       "device2",
       std::move(message_1).TakeAndMap<Response>([](const Message* message) {
@@ -866,8 +879,8 @@
   BuildObjectResponseBuffer(
       &response_buffer_2, "ledger", "page",
       {std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
-  MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
-                                   &GetMessage);
+  MessageHolder<Message> message_2 = *CreateMessageHolder<Message>(
+      convert::ToStringView(response_buffer_2), &ParseMessage);
   page_communicator.OnNewResponse(
       "device3",
       std::move(message_2).TakeAndMap<Response>([](const Message* message) {
@@ -888,7 +901,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator_1.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -924,12 +938,8 @@
   ASSERT_EQ(1u, mesh.messages_.size());
   EXPECT_EQ("device2", mesh.messages_[0].first);
 
-  flatbuffers::Verifier verifier(
-      reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
-      mesh.messages_[0].second.size());
-  ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
-  MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
+  MessageHolder<Message> reply_message =
+      *CreateMessageHolder<Message>(mesh.messages_[0].second, &ParseMessage);
   ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
   MessageHolder<Response> response =
       std::move(reply_message).TakeAndMap<Response>([](const Message* message) {
@@ -969,7 +979,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -1007,8 +1018,8 @@
 
   flatbuffers::FlatBufferBuilder stop_buffer;
   BuildWatchStopBuffer(&stop_buffer, "ledger", "page");
-  MessageHolder<Message> watch_stop_message(convert::ToStringView(stop_buffer),
-                                            &GetMessage);
+  MessageHolder<Message> watch_stop_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(stop_buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(watch_stop_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -1053,8 +1064,8 @@
   BuildCommitRequestBuffer(&request_buffer, "ledger", "page",
                            {storage::CommitId(commit_1.GetId()),
                             storage::CommitId("missing_commit")});
-  MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
-                                         &GetMessage);
+  MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+      convert::ToStringView(request_buffer), &ParseMessage);
   page_communicator.OnNewRequest(
       "device2", std::move(request_message)
                      .TakeAndMap<Request>([](const Message* message) {
@@ -1106,7 +1117,8 @@
 
   flatbuffers::FlatBufferBuilder buffer;
   BuildWatchStartBuffer(&buffer, "ledger", "page");
-  MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+  MessageHolder<Message> message = *CreateMessageHolder<Message>(
+      convert::ToStringView(buffer), &ParseMessage);
   page_communicator_1.OnNewRequest(
       "device2",
       std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -1133,12 +1145,8 @@
   EXPECT_EQ("device2", mesh.messages_[0].first);
 
   {
-    flatbuffers::Verifier verifier(
-        reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
-        mesh.messages_[0].second.size());
-    ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
-    MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
+    MessageHolder<Message> reply_message =
+        *CreateMessageHolder<Message>(mesh.messages_[0].second, &ParseMessage);
     ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
     MessageHolder<Response> response =
         std::move(reply_message)
@@ -1170,13 +1178,8 @@
   EXPECT_EQ("device1", mesh.messages_[1].first);
 
   {
-    flatbuffers::Verifier verifier(
-        reinterpret_cast<const unsigned char*>(mesh.messages_[1].second.data()),
-        mesh.messages_[0].second.size());
-    ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
-    MessageHolder<Message> request_message(mesh.messages_[1].second,
-                                           &GetMessage);
+    MessageHolder<Message> request_message =
+        *CreateMessageHolder<Message>(mesh.messages_[1].second, &ParseMessage);
     ASSERT_EQ(MessageUnion_Request, request_message->message_type());
     MessageHolder<Request> request =
         std::move(request_message)
@@ -1201,12 +1204,8 @@
   EXPECT_EQ("device2", mesh.messages_[2].first);
 
   {
-    flatbuffers::Verifier verifier(
-        reinterpret_cast<const unsigned char*>(mesh.messages_[2].second.data()),
-        mesh.messages_[0].second.size());
-    ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
-    MessageHolder<Message> reply_message(mesh.messages_[2].second, &GetMessage);
+    MessageHolder<Message> reply_message =
+        *CreateMessageHolder<Message>(mesh.messages_[2].second, &ParseMessage);
     ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
     MessageHolder<Response> response =
         std::move(reply_message)
diff --git a/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc b/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
index 78b80c4..1e20343a 100644
--- a/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
+++ b/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
@@ -5,6 +5,7 @@
 #include "src/ledger/bin/p2p_sync/impl/user_communicator_impl.h"
 
 #include "peridot/lib/ledger_client/constants.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
 #include "src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h"
 #include "src/ledger/bin/p2p_sync/impl/ledger_communicator_impl.h"
 #include "src/ledger/bin/p2p_sync/impl/message_generated.h"
@@ -48,16 +49,15 @@
 
 void UserCommunicatorImpl::OnNewMessage(fxl::StringView source,
                                         fxl::StringView data) {
-  flatbuffers::Verifier verifier(
-      reinterpret_cast<const unsigned char*>(data.data()), data.size());
-  if (!VerifyMessageBuffer(verifier)) {
+  std::optional<MessageHolder<Message>> message =
+      CreateMessageHolder<Message>(data, &ParseMessage);
+  if (!message) {
     // Wrong serialization, abort.
     FXL_LOG(ERROR) << "The message received is malformed.";
     return;
   };
-  MessageHolder<Message> message(data, &GetMessage);
-  const NamespacePageId* namespace_page_id;
-  switch (message->message_type()) {
+
+  switch ((*message)->message_type()) {
     case MessageUnion_NONE:
       FXL_LOG(ERROR) << "The message received is unexpected at this point.";
       return;
@@ -65,10 +65,10 @@
 
     case MessageUnion_Request: {
       MessageHolder<Request> request =
-          std::move(message).TakeAndMap<Request>([](const Message* message) {
+          std::move(*message).TakeAndMap<Request>([](const Message* message) {
             return static_cast<const Request*>(message->message());
           });
-      namespace_page_id = request->namespace_page();
+      const NamespacePageId* namespace_page_id = request->namespace_page();
 
       std::string namespace_id(namespace_page_id->namespace_id()->begin(),
                                namespace_page_id->namespace_id()->end());
@@ -89,10 +89,10 @@
 
     case MessageUnion_Response: {
       MessageHolder<Response> response =
-          std::move(message).TakeAndMap<Response>([](const Message* message) {
+          std::move(*message).TakeAndMap<Response>([](const Message* message) {
             return static_cast<const Response*>(message->message());
           });
-      namespace_page_id = response->namespace_page();
+      const NamespacePageId* namespace_page_id = response->namespace_page();
       std::string namespace_id(namespace_page_id->namespace_id()->begin(),
                                namespace_page_id->namespace_id()->end());
       std::string page_id(namespace_page_id->page_id()->begin(),
@@ -100,10 +100,10 @@
 
       const auto& it = ledgers_.find(namespace_id);
       if (it == ledgers_.end()) {
-        // We are receiving a response for a ledger that no longer exists. This
-        // can happen in normal operation, and we cannot do anything with this
-        // message: we can't send it to a ledger, and we don't send responses to
-        // responses. So we just drop it here.
+        // We are receiving a response for a ledger that no longer exists.
+        // This can happen in normal operation, and we cannot do anything with
+        // this message: we can't send it to a ledger, and we don't send
+        // responses to responses. So we just drop it here.
         return;
       }
       it->second->OnNewResponse(source, page_id, std::move(response));
diff --git a/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc b/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc
new file mode 100644
index 0000000..9368b69
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc
@@ -0,0 +1,74 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <lib/fidl/cpp/binding.h>
+#include <lib/fit/function.h>
+
+#include <algorithm>
+#include <string>
+
+#include "src/ledger/bin/p2p_provider/impl/p2p_provider_impl.h"
+#include "src/ledger/bin/p2p_provider/public/p2p_provider.h"
+#include "src/ledger/bin/p2p_sync/impl/user_communicator_impl.h"
+#include "src/ledger/bin/storage/public/page_sync_client.h"
+#include "src/ledger/bin/storage/testing/page_storage_empty_impl.h"
+#include "src/ledger/lib/coroutine/coroutine_impl.h"
+#include "src/lib/fxl/macros.h"
+
+namespace p2p_sync {
+namespace {
+
+class TestPageStorage : public storage::PageStorageEmptyImpl {
+ public:
+  TestPageStorage() = default;
+  ~TestPageStorage() {}
+
+  storage::PageId GetId() override { return "page"; }
+
+  void SetSyncDelegate(storage::PageSyncDelegate* page_sync) override {
+    return;
+  }
+};
+
+class FuzzingP2PProvider : public p2p_provider::P2PProvider {
+ public:
+  FuzzingP2PProvider() = default;
+
+  void Start(Client* client) override { client_ = client; }
+
+  bool SendMessage(fxl::StringView destination, fxl::StringView data) override {
+    FXL_NOTIMPLEMENTED();
+    return false;
+  }
+
+  Client* client_;
+};
+
+// Fuzz the peer-to-peer messages received by a |UserCommunicatorImpl|.
+extern "C" int LLVMFuzzerTestOneInput(const uint8_t* Data, size_t Size) {
+  std::string bytes(reinterpret_cast<const char*>(Data), Size);
+
+  coroutine::CoroutineServiceImpl coroutine_service;
+  auto provider = std::make_unique<FuzzingP2PProvider>();
+  FuzzingP2PProvider* provider_ptr = provider.get();
+
+  UserCommunicatorImpl user_communicator(std::move(provider),
+                                         &coroutine_service);
+  user_communicator.Start();
+  auto ledger_communicator = user_communicator.GetLedgerCommunicator("ledger");
+
+  storage::PageStorageEmptyImpl page_storage;
+
+  auto page_communicator =
+      ledger_communicator->GetPageCommunicator(&page_storage, &page_storage);
+
+  provider_ptr->client_->OnDeviceChange("device",
+                                        p2p_provider::DeviceChangeType::NEW);
+  provider_ptr->client_->OnNewMessage("device", bytes);
+
+  return 0;
+}
+
+}  // namespace
+}  // namespace p2p_sync