[ledger] Fuzz test Ledger P2P stack
Adds a fuzz test to our P2P synchronization code, and fixes issues
discovered by this fuzz test: lack of deep verification of incoming
flatbuffers.
To achieve this deep verification, I refactored the message parsing code
for P2P. We now have one place for verification, then the message
pointers are passed around.
Change-Id: Ie770e3b2c83f46b6df1f8341b251b51eb0b7c9ea
diff --git a/src/ledger/bin/BUILD.gn b/src/ledger/bin/BUILD.gn
index 36d1ba0..042abc8 100644
--- a/src/ledger/bin/BUILD.gn
+++ b/src/ledger/bin/BUILD.gn
@@ -71,7 +71,10 @@
}
fuzz_package("ledger_fuzzers") {
- targets = [ "//src/ledger/bin/storage/impl/btree:encoding_fuzzer" ]
+ targets = [
+ "//src/ledger/bin/p2p_sync/impl:p2p_sync_fuzzer",
+ "//src/ledger/bin/storage/impl/btree:encoding_fuzzer",
+ ]
sanitizers = [
"asan",
"ubsan",
diff --git a/src/ledger/bin/p2p_sync/impl/BUILD.gn b/src/ledger/bin/p2p_sync/impl/BUILD.gn
index cec55e3..a6aeeb2 100644
--- a/src/ledger/bin/p2p_sync/impl/BUILD.gn
+++ b/src/ledger/bin/p2p_sync/impl/BUILD.gn
@@ -2,6 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import("//build/fuzzing/fuzzer.gni")
import("//third_party/flatbuffers/flatbuffer.gni")
visibility = [ "//src/ledger/*" ]
@@ -11,6 +12,8 @@
"commit_batch.cc",
"commit_batch.h",
"device_mesh.h",
+ "encoding.cc",
+ "encoding.h",
"flatbuffer_message_factory.cc",
"flatbuffer_message_factory.h",
"ledger_communicator_impl.cc",
@@ -51,12 +54,14 @@
testonly = true
sources = [
+ "encoding_unittest.cc",
"page_communicator_impl_unittest.cc",
"user_communicator_impl_unittest.cc",
]
deps = [
":impl",
+ ":message",
"//garnet/public/lib/fsl",
"//sdk/lib/fidl/cpp",
"//src/ledger/bin/p2p_provider/impl",
@@ -73,3 +78,14 @@
configs += [ "//src/ledger:ledger_config" ]
}
+
+fuzz_target("p2p_sync_fuzzer") {
+ sources = [
+ "user_communicator_impl_fuzztest.cc",
+ ]
+ deps = [
+ ":impl",
+ "//src/ledger/bin/p2p_provider/public",
+ "//src/ledger/bin/storage/testing",
+ ]
+}
diff --git a/src/ledger/bin/p2p_sync/impl/encoding.cc b/src/ledger/bin/p2p_sync/impl/encoding.cc
new file mode 100644
index 0000000..2f4a241
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding.cc
@@ -0,0 +1,193 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+
+#include "flatbuffers/flatbuffers.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
+#include "src/lib/fxl/strings/string_view.h"
+
+namespace p2p_sync {
+namespace {
+
+bool IsValidCommitRequest(flatbuffers::Verifier& verifier,
+ const CommitRequest* request) {
+ return request && request->Verify(verifier) && request->commit_ids();
+}
+
+bool IsValidObjectRequest(flatbuffers::Verifier& verifier,
+ const ObjectRequest* request) {
+ return request && request->Verify(verifier) && request->object_ids();
+}
+
+bool IsValidWatchStartRequest(flatbuffers::Verifier& verifier,
+ const WatchStartRequest* request) {
+ return request && request->Verify(verifier);
+}
+
+bool IsValidWatchStopRequest(flatbuffers::Verifier& verifier,
+ const WatchStopRequest* request) {
+ return request && request->Verify(verifier);
+}
+
+bool IsValidRequest(flatbuffers::Verifier& verifier, const Request* request) {
+ if (!request || !request->Verify(verifier)) {
+ return false;
+ }
+
+ const NamespacePageId* namespace_page_id = request->namespace_page();
+ if (!namespace_page_id || !namespace_page_id->Verify(verifier)) {
+ return false;
+ }
+
+ if (!namespace_page_id->namespace_id() || !namespace_page_id->page_id()) {
+ return false;
+ }
+
+ switch (request->request_type()) {
+ case RequestMessage_NONE:
+ return false;
+ case RequestMessage_CommitRequest:
+ return IsValidCommitRequest(
+ verifier, static_cast<const CommitRequest*>(request->request()));
+ case RequestMessage_ObjectRequest:
+ return IsValidObjectRequest(
+ verifier, static_cast<const ObjectRequest*>(request->request()));
+ case RequestMessage_WatchStartRequest:
+ return IsValidWatchStartRequest(
+ verifier, static_cast<const WatchStartRequest*>(request->request()));
+ case RequestMessage_WatchStopRequest:
+ return IsValidWatchStopRequest(
+ verifier, static_cast<const WatchStopRequest*>(request->request()));
+ }
+}
+
+bool IsValidCommitResponse(flatbuffers::Verifier& verifier,
+ const CommitResponse* response) {
+ if (!response || !response->Verify(verifier)) {
+ return false;
+ }
+
+ if (!response->commits()) {
+ return false;
+ }
+
+ for (const Commit* commit : *response->commits()) {
+ if (!commit || !commit->Verify(verifier)) {
+ return false;
+ }
+
+ const CommitId* id = commit->id();
+ if (!id || !id->Verify(verifier) || !id->id()) {
+ return false;
+ }
+
+ const Data* data = commit->commit();
+ if (!data || !data->Verify(verifier) || !data->bytes()) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool IsValidObjectResponse(flatbuffers::Verifier& verifier,
+ const ObjectResponse* response) {
+ if (!response || !response->Verify(verifier)) {
+ return false;
+ }
+
+ if (!response->objects()) {
+ return false;
+ }
+
+ for (const Object* object : *response->objects()) {
+ if (!object || !object->Verify(verifier)) {
+ return false;
+ }
+
+ const ObjectId* id = object->id();
+ if (!id || !id->Verify(verifier) || !id->digest()) {
+ return false;
+ }
+
+ const Data* data = object->data();
+ // No data is a valid response: it means the object was not found.
+ if (data && (!data->Verify(verifier) || !data->bytes())) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool IsValidResponse(flatbuffers::Verifier& verifier,
+ const Response* response) {
+ if (!response) {
+ return false;
+ }
+
+ if (!response->Verify(verifier)) {
+ return false;
+ }
+
+ const NamespacePageId* namespace_page_id = response->namespace_page();
+ if (!namespace_page_id || !namespace_page_id->Verify(verifier)) {
+ return false;
+ }
+
+ if (!namespace_page_id->namespace_id() || !namespace_page_id->page_id()) {
+ return false;
+ }
+
+ switch (response->response_type()) {
+ case ResponseMessage_NONE:
+ // Response returned in case of unknown namespace or page.
+ return true;
+ case ResponseMessage_CommitResponse:
+ return IsValidCommitResponse(
+ verifier, static_cast<const CommitResponse*>(response->response()));
+ case ResponseMessage_ObjectResponse:
+ return IsValidObjectResponse(
+ verifier, static_cast<const ObjectResponse*>(response->response()));
+ }
+}
+
+} // namespace
+
+const Message* ParseMessage(convert::ExtendedStringView data) {
+ flatbuffers::Verifier verifier(
+ reinterpret_cast<const unsigned char*>(data.data()), data.size());
+
+ if (!VerifyMessageBuffer(verifier)) {
+ return nullptr;
+ }
+
+ const Message* message = GetMessage(data.data());
+
+ if (!message || !message->Verify(verifier)) {
+ return nullptr;
+ }
+
+ switch (message->message_type()) {
+ case MessageUnion_NONE:
+ return nullptr;
+ case MessageUnion_Request:
+ if (!IsValidRequest(verifier,
+ static_cast<const Request*>(message->message()))) {
+ return nullptr;
+ }
+ break;
+ case MessageUnion_Response:
+ if (!IsValidResponse(verifier,
+ static_cast<const Response*>(message->message()))) {
+ return nullptr;
+ }
+ break;
+ }
+
+ return message;
+}
+
+} // namespace p2p_sync
diff --git a/src/ledger/bin/p2p_sync/impl/encoding.h b/src/ledger/bin/p2p_sync/impl/encoding.h
new file mode 100644
index 0000000..aedf805
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding.h
@@ -0,0 +1,19 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
+#define SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
+
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
+
+namespace p2p_sync {
+
+// Parses |data| into a |Message| object. Returns nullptr if the message is
+// malformed. The returned pointer is valid as long as |data|'s data is valid.
+const Message* ParseMessage(convert::ExtendedStringView data);
+
+} // namespace p2p_sync
+
+#endif // SRC_LEDGER_BIN_P2P_SYNC_IMPL_ENCODING_H_
diff --git a/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc b/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc
new file mode 100644
index 0000000..a76bee9
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/encoding_unittest.cc
@@ -0,0 +1,83 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+
+#include "gtest/gtest.h"
+#include "message_generated.h"
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h"
+#include "src/lib/fxl/macros.h"
+
+namespace p2p_sync {
+namespace {
+
+TEST(ParseMessageTest, Valid_WatchStart) {
+ flatbuffers::FlatBufferBuilder buffer;
+ flatbuffers::Offset<WatchStartRequest> watch_start =
+ CreateWatchStartRequest(buffer);
+ flatbuffers::Offset<NamespacePageId> namespace_page_id =
+ CreateNamespacePageId(
+ buffer, convert::ToFlatBufferVector(&buffer, "namespace_id"),
+ convert::ToFlatBufferVector(&buffer, "page_id"));
+ flatbuffers::Offset<Request> request =
+ CreateRequest(buffer, namespace_page_id, RequestMessage_WatchStartRequest,
+ watch_start.Union());
+ flatbuffers::Offset<Message> message =
+ CreateMessage(buffer, MessageUnion_Request, request.Union());
+ buffer.Finish(message);
+ const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+ EXPECT_NE(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Invalid_NoNamespace) {
+ flatbuffers::FlatBufferBuilder buffer;
+ flatbuffers::Offset<WatchStartRequest> watch_start =
+ CreateWatchStartRequest(buffer);
+ flatbuffers::Offset<NamespacePageId> namespace_page_id;
+ flatbuffers::Offset<Request> request =
+ CreateRequest(buffer, namespace_page_id, RequestMessage_WatchStartRequest,
+ watch_start.Union());
+ flatbuffers::Offset<Message> message =
+ CreateMessage(buffer, MessageUnion_Request, request.Union());
+ buffer.Finish(message);
+ const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+ EXPECT_EQ(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Valid_ObjectResponseNoObject) {
+ flatbuffers::FlatBufferBuilder buffer;
+ flatbuffers::Offset<NamespacePageId> namespace_page_id =
+ CreateNamespacePageId(
+ buffer, convert::ToFlatBufferVector(&buffer, "namespace_id"),
+ convert::ToFlatBufferVector(&buffer, "page_id"));
+ flatbuffers::Offset<ObjectId> fb_object_id = CreateObjectId(
+ buffer, 1, 3, convert::ToFlatBufferVector(&buffer, "digest"));
+ std::vector<flatbuffers::Offset<Object>> fb_objects;
+ fb_objects.emplace_back(
+ CreateObject(buffer, fb_object_id, ObjectStatus_UNKNOWN_OBJECT));
+ flatbuffers::Offset<ObjectResponse> object_response =
+ CreateObjectResponse(buffer, buffer.CreateVector(fb_objects));
+ flatbuffers::Offset<Response> response =
+ CreateResponse(buffer, ResponseStatus_OK, namespace_page_id,
+ ResponseMessage_ObjectResponse, object_response.Union());
+ flatbuffers::Offset<Message> message =
+ CreateMessage(buffer, MessageUnion_Response, response.Union());
+ buffer.Finish(message);
+
+ const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+ EXPECT_NE(message_ptr, nullptr);
+}
+
+TEST(ParseMessageTest, Valid_ResponseUnknownNamespace) {
+ flatbuffers::FlatBufferBuilder buffer;
+ CreateUnknownResponseMessage(&buffer, "namespace", "page",
+ ResponseStatus_UNKNOWN_NAMESPACE);
+
+ const Message* message_ptr = ParseMessage(convert::ToStringView(buffer));
+ EXPECT_NE(message_ptr, nullptr);
+}
+
+} // namespace
+} // namespace p2p_sync
diff --git a/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h b/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
index 6d85bce..c52b72f 100644
--- a/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
+++ b/src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h
@@ -12,6 +12,8 @@
namespace p2p_sync {
+// Fill |buffer| with a new |Message| containing a |Response| for an unknown
+// namespace or page.
void CreateUnknownResponseMessage(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id,
diff --git a/src/ledger/bin/p2p_sync/impl/message_holder.h b/src/ledger/bin/p2p_sync/impl/message_holder.h
index add1a49..5f4bc43 100644
--- a/src/ledger/bin/p2p_sync/impl/message_holder.h
+++ b/src/ledger/bin/p2p_sync/impl/message_holder.h
@@ -8,9 +8,12 @@
#include <lib/fit/function.h>
#include <functional>
+#include <optional>
#include <utility>
#include <vector>
+#include "peridot/lib/convert/convert.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
#include "src/lib/fxl/strings/string_view.h"
namespace p2p_sync {
@@ -20,14 +23,8 @@
class MessageHolder {
public:
// Create a new MessageHolder from the provided data and a parser.
- MessageHolder(fxl::StringView data,
- fit::function<const M*(const uint8_t*)> get_message)
- : data_(data.begin(), data.end()), message_(get_message(data_.data())) {}
-
- // Create a new MessageHolder from the provided data and a parser.
- MessageHolder(std::vector<uint8_t> data,
- fit::function<const M*(const uint8_t*)> get_message)
- : data_(std::move(data)), message_(get_message(data_.data())) {}
+ MessageHolder(std::unique_ptr<std::vector<uint8_t>> data, const M* message)
+ : data_(std::move(data)), message_(message) {}
// Create a new MessageHolder from the current object and a function to
// specialize the message. The current message holder is destroyed in the
@@ -71,10 +68,27 @@
other.message_ = nullptr;
}
- std::vector<uint8_t> data_;
+ std::unique_ptr<std::vector<uint8_t>> data_;
const M* message_;
};
+// Creates a new MessageHolder to contained a message, or nullopt if no message
+// can be obtained.
+template <class M>
+std::optional<MessageHolder<M>> CreateMessageHolder(
+ fxl::StringView data,
+ fit::function<const M*(convert::ExtendedStringView)> get_message) {
+ std::unique_ptr<std::vector<uint8_t>> data_vec =
+ std::make_unique<std::vector<uint8_t>>(data.begin(), data.end());
+
+ const M* message = get_message(*data_vec);
+
+ if (!message) {
+ return std::nullopt;
+ }
+
+ return MessageHolder<M>(std::move(data_vec), message);
+}
} // namespace p2p_sync
#endif // SRC_LEDGER_BIN_P2P_SYNC_IMPL_MESSAGE_HOLDER_H_
diff --git a/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc b/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
index 1f794d9..cf927cf 100644
--- a/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
+++ b/src/ledger/bin/p2p_sync/impl/page_communicator_impl.cc
@@ -11,6 +11,7 @@
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
#include "src/ledger/bin/storage/public/read_data_source.h"
+#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/coroutine/coroutine_waiter.h"
#include "src/lib/fxl/memory/ref_ptr.h"
@@ -391,12 +392,15 @@
void PageCommunicatorImpl::BuildWatchStartBuffer(
flatbuffers::FlatBufferBuilder* buffer) {
+ flatbuffers::Offset<WatchStartRequest> watch_start =
+ CreateWatchStartRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
- flatbuffers::Offset<Request> request = CreateRequest(
- *buffer, namespace_page_id, RequestMessage_WatchStartRequest);
+ flatbuffers::Offset<Request> request =
+ CreateRequest(*buffer, namespace_page_id,
+ RequestMessage_WatchStartRequest, watch_start.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
@@ -404,12 +408,15 @@
void PageCommunicatorImpl::BuildWatchStopBuffer(
flatbuffers::FlatBufferBuilder* buffer) {
+ flatbuffers::Offset<WatchStopRequest> watch_stop =
+ CreateWatchStopRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
- flatbuffers::Offset<Request> request = CreateRequest(
- *buffer, namespace_page_id, RequestMessage_WatchStopRequest);
+ flatbuffers::Offset<Request> request =
+ CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest,
+ watch_stop.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
diff --git a/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc b/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
index 2c6c52f..ca4e81a 100644
--- a/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
+++ b/src/ledger/bin/p2p_sync/impl/page_communicator_impl_unittest.cc
@@ -22,6 +22,8 @@
#include "gmock/gmock.h"
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/p2p_sync/impl/device_mesh.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
+#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
#include "src/ledger/bin/storage/fake/fake_object.h"
#include "src/ledger/bin/storage/testing/commit_empty_impl.h"
#include "src/ledger/bin/storage/testing/page_storage_empty_impl.h"
@@ -202,12 +204,15 @@
void BuildWatchStartBuffer(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id) {
+ flatbuffers::Offset<WatchStartRequest> watch_start =
+ CreateWatchStartRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
- flatbuffers::Offset<Request> request = CreateRequest(
- *buffer, namespace_page_id, RequestMessage_WatchStartRequest);
+ flatbuffers::Offset<Request> request =
+ CreateRequest(*buffer, namespace_page_id,
+ RequestMessage_WatchStartRequest, watch_start.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
@@ -216,12 +221,15 @@
void BuildWatchStopBuffer(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id) {
+ flatbuffers::Offset<WatchStopRequest> watch_stop =
+ CreateWatchStopRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
- flatbuffers::Offset<Request> request = CreateRequest(
- *buffer, namespace_page_id, RequestMessage_WatchStopRequest);
+ flatbuffers::Offset<Request> request =
+ CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest,
+ watch_stop.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
@@ -409,8 +417,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
- &GetMessage);
+ MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -464,8 +472,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
- &GetMessage);
+ MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
// If storage fails to mark the page as synced to a peer, the mesh should not
// be updated.
storage.mark_synced_to_peer_status = storage::Status::IO_ERROR;
@@ -502,8 +510,8 @@
BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
{MakeObjectIdentifier("object_digest"),
MakeObjectIdentifier("object_digest2")});
- MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
- &GetMessage);
+ MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(request_buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -558,8 +566,8 @@
flatbuffers::FlatBufferBuilder request_buffer;
BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
{MakeObjectIdentifier("object_digest")});
- MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
- &GetMessage);
+ MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(request_buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -607,8 +615,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
- &GetMessage);
+ MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -635,8 +643,8 @@
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false),
std::make_tuple(MakeObjectIdentifier("bar"), "bar_data", false)});
- MessageHolder<Message> response_message(
- convert::ToStringView(response_buffer), &GetMessage);
+ MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer), &ParseMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
@@ -658,8 +666,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
- &GetMessage);
+ MessageHolder<Message> new_device_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -685,8 +693,8 @@
BuildObjectResponseBuffer(
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", true)});
- MessageHolder<Message> response_message(
- convert::ToStringView(response_buffer), &GetMessage);
+ MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer), &ParseMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
@@ -708,7 +716,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -734,8 +743,8 @@
BuildObjectResponseBuffer(
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
- MessageHolder<Message> response_message(
- convert::ToStringView(response_buffer), &GetMessage);
+ MessageHolder<Message> response_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer), &ParseMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
@@ -756,16 +765,18 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
- message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message_copy = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device3",
- std::move(message).TakeAndMap<Request>([](const Message* message) {
+ std::move(message_copy).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
@@ -786,8 +797,8 @@
BuildObjectResponseBuffer(
&response_buffer_1, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
- MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
- &GetMessage);
+ MessageHolder<Message> message_1 = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer_1), &ParseMessage);
page_communicator.OnNewResponse(
"device2",
std::move(message_1).TakeAndMap<Response>([](const Message* message) {
@@ -799,8 +810,8 @@
BuildObjectResponseBuffer(
&response_buffer_2, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false)});
- MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
- &GetMessage);
+ MessageHolder<Message> message_2 = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer_2), &ParseMessage);
page_communicator.OnNewResponse(
"device3",
std::move(message_2).TakeAndMap<Response>([](const Message* message) {
@@ -823,16 +834,18 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
- message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message_copy = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device3",
- std::move(message).TakeAndMap<Request>([](const Message* message) {
+ std::move(message_copy).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
@@ -853,8 +866,8 @@
BuildObjectResponseBuffer(
&response_buffer_1, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
- MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
- &GetMessage);
+ MessageHolder<Message> message_1 = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer_1), &ParseMessage);
page_communicator.OnNewResponse(
"device2",
std::move(message_1).TakeAndMap<Response>([](const Message* message) {
@@ -866,8 +879,8 @@
BuildObjectResponseBuffer(
&response_buffer_2, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
- MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
- &GetMessage);
+ MessageHolder<Message> message_2 = *CreateMessageHolder<Message>(
+ convert::ToStringView(response_buffer_2), &ParseMessage);
page_communicator.OnNewResponse(
"device3",
std::move(message_2).TakeAndMap<Response>([](const Message* message) {
@@ -888,7 +901,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator_1.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -924,12 +938,8 @@
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
- flatbuffers::Verifier verifier(
- reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
- mesh.messages_[0].second.size());
- ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
- MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
+ MessageHolder<Message> reply_message =
+ *CreateMessageHolder<Message>(mesh.messages_[0].second, &ParseMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message).TakeAndMap<Response>([](const Message* message) {
@@ -969,7 +979,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -1007,8 +1018,8 @@
flatbuffers::FlatBufferBuilder stop_buffer;
BuildWatchStopBuffer(&stop_buffer, "ledger", "page");
- MessageHolder<Message> watch_stop_message(convert::ToStringView(stop_buffer),
- &GetMessage);
+ MessageHolder<Message> watch_stop_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(stop_buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(watch_stop_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -1053,8 +1064,8 @@
BuildCommitRequestBuffer(&request_buffer, "ledger", "page",
{storage::CommitId(commit_1.GetId()),
storage::CommitId("missing_commit")});
- MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
- &GetMessage);
+ MessageHolder<Message> request_message = *CreateMessageHolder<Message>(
+ convert::ToStringView(request_buffer), &ParseMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
@@ -1106,7 +1117,8 @@
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
- MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
+ MessageHolder<Message> message = *CreateMessageHolder<Message>(
+ convert::ToStringView(buffer), &ParseMessage);
page_communicator_1.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
@@ -1133,12 +1145,8 @@
EXPECT_EQ("device2", mesh.messages_[0].first);
{
- flatbuffers::Verifier verifier(
- reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
- mesh.messages_[0].second.size());
- ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
- MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
+ MessageHolder<Message> reply_message =
+ *CreateMessageHolder<Message>(mesh.messages_[0].second, &ParseMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message)
@@ -1170,13 +1178,8 @@
EXPECT_EQ("device1", mesh.messages_[1].first);
{
- flatbuffers::Verifier verifier(
- reinterpret_cast<const unsigned char*>(mesh.messages_[1].second.data()),
- mesh.messages_[0].second.size());
- ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
- MessageHolder<Message> request_message(mesh.messages_[1].second,
- &GetMessage);
+ MessageHolder<Message> request_message =
+ *CreateMessageHolder<Message>(mesh.messages_[1].second, &ParseMessage);
ASSERT_EQ(MessageUnion_Request, request_message->message_type());
MessageHolder<Request> request =
std::move(request_message)
@@ -1201,12 +1204,8 @@
EXPECT_EQ("device2", mesh.messages_[2].first);
{
- flatbuffers::Verifier verifier(
- reinterpret_cast<const unsigned char*>(mesh.messages_[2].second.data()),
- mesh.messages_[0].second.size());
- ASSERT_TRUE(VerifyMessageBuffer(verifier));
-
- MessageHolder<Message> reply_message(mesh.messages_[2].second, &GetMessage);
+ MessageHolder<Message> reply_message =
+ *CreateMessageHolder<Message>(mesh.messages_[2].second, &ParseMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message)
diff --git a/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc b/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
index 78b80c4..1e20343a 100644
--- a/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
+++ b/src/ledger/bin/p2p_sync/impl/user_communicator_impl.cc
@@ -5,6 +5,7 @@
#include "src/ledger/bin/p2p_sync/impl/user_communicator_impl.h"
#include "peridot/lib/ledger_client/constants.h"
+#include "src/ledger/bin/p2p_sync/impl/encoding.h"
#include "src/ledger/bin/p2p_sync/impl/flatbuffer_message_factory.h"
#include "src/ledger/bin/p2p_sync/impl/ledger_communicator_impl.h"
#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
@@ -48,16 +49,15 @@
void UserCommunicatorImpl::OnNewMessage(fxl::StringView source,
fxl::StringView data) {
- flatbuffers::Verifier verifier(
- reinterpret_cast<const unsigned char*>(data.data()), data.size());
- if (!VerifyMessageBuffer(verifier)) {
+ std::optional<MessageHolder<Message>> message =
+ CreateMessageHolder<Message>(data, &ParseMessage);
+ if (!message) {
// Wrong serialization, abort.
FXL_LOG(ERROR) << "The message received is malformed.";
return;
};
- MessageHolder<Message> message(data, &GetMessage);
- const NamespacePageId* namespace_page_id;
- switch (message->message_type()) {
+
+ switch ((*message)->message_type()) {
case MessageUnion_NONE:
FXL_LOG(ERROR) << "The message received is unexpected at this point.";
return;
@@ -65,10 +65,10 @@
case MessageUnion_Request: {
MessageHolder<Request> request =
- std::move(message).TakeAndMap<Request>([](const Message* message) {
+ std::move(*message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
});
- namespace_page_id = request->namespace_page();
+ const NamespacePageId* namespace_page_id = request->namespace_page();
std::string namespace_id(namespace_page_id->namespace_id()->begin(),
namespace_page_id->namespace_id()->end());
@@ -89,10 +89,10 @@
case MessageUnion_Response: {
MessageHolder<Response> response =
- std::move(message).TakeAndMap<Response>([](const Message* message) {
+ std::move(*message).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
});
- namespace_page_id = response->namespace_page();
+ const NamespacePageId* namespace_page_id = response->namespace_page();
std::string namespace_id(namespace_page_id->namespace_id()->begin(),
namespace_page_id->namespace_id()->end());
std::string page_id(namespace_page_id->page_id()->begin(),
@@ -100,10 +100,10 @@
const auto& it = ledgers_.find(namespace_id);
if (it == ledgers_.end()) {
- // We are receiving a response for a ledger that no longer exists. This
- // can happen in normal operation, and we cannot do anything with this
- // message: we can't send it to a ledger, and we don't send responses to
- // responses. So we just drop it here.
+ // We are receiving a response for a ledger that no longer exists.
+ // This can happen in normal operation, and we cannot do anything with
+ // this message: we can't send it to a ledger, and we don't send
+ // responses to responses. So we just drop it here.
return;
}
it->second->OnNewResponse(source, page_id, std::move(response));
diff --git a/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc b/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc
new file mode 100644
index 0000000..9368b69
--- /dev/null
+++ b/src/ledger/bin/p2p_sync/impl/user_communicator_impl_fuzztest.cc
@@ -0,0 +1,74 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <lib/fidl/cpp/binding.h>
+#include <lib/fit/function.h>
+
+#include <algorithm>
+#include <string>
+
+#include "src/ledger/bin/p2p_provider/impl/p2p_provider_impl.h"
+#include "src/ledger/bin/p2p_provider/public/p2p_provider.h"
+#include "src/ledger/bin/p2p_sync/impl/user_communicator_impl.h"
+#include "src/ledger/bin/storage/public/page_sync_client.h"
+#include "src/ledger/bin/storage/testing/page_storage_empty_impl.h"
+#include "src/ledger/lib/coroutine/coroutine_impl.h"
+#include "src/lib/fxl/macros.h"
+
+namespace p2p_sync {
+namespace {
+
+class TestPageStorage : public storage::PageStorageEmptyImpl {
+ public:
+ TestPageStorage() = default;
+ ~TestPageStorage() {}
+
+ storage::PageId GetId() override { return "page"; }
+
+ void SetSyncDelegate(storage::PageSyncDelegate* page_sync) override {
+ return;
+ }
+};
+
+class FuzzingP2PProvider : public p2p_provider::P2PProvider {
+ public:
+ FuzzingP2PProvider() = default;
+
+ void Start(Client* client) override { client_ = client; }
+
+ bool SendMessage(fxl::StringView destination, fxl::StringView data) override {
+ FXL_NOTIMPLEMENTED();
+ return false;
+ }
+
+ Client* client_;
+};
+
+// Fuzz the peer-to-peer messages received by a |UserCommunicatorImpl|.
+extern "C" int LLVMFuzzerTestOneInput(const uint8_t* Data, size_t Size) {
+ std::string bytes(reinterpret_cast<const char*>(Data), Size);
+
+ coroutine::CoroutineServiceImpl coroutine_service;
+ auto provider = std::make_unique<FuzzingP2PProvider>();
+ FuzzingP2PProvider* provider_ptr = provider.get();
+
+ UserCommunicatorImpl user_communicator(std::move(provider),
+ &coroutine_service);
+ user_communicator.Start();
+ auto ledger_communicator = user_communicator.GetLedgerCommunicator("ledger");
+
+ storage::PageStorageEmptyImpl page_storage;
+
+ auto page_communicator =
+ ledger_communicator->GetPageCommunicator(&page_storage, &page_storage);
+
+ provider_ptr->client_->OnDeviceChange("device",
+ p2p_provider::DeviceChangeType::NEW);
+ provider_ptr->client_->OnNewMessage("device", bytes);
+
+ return 0;
+}
+
+} // namespace
+} // namespace p2p_sync