blob: 0481681bb1253cb2dcbe038de4c3a28282a9bd05 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/p2p_sync/impl/page_communicator_impl.h"
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <lib/async/cpp/task.h>
#include <lib/fit/function.h>
// gtest matchers are in gmock and we cannot include the specific header file
// directly as it is private to the library.
#include <lib/callback/capture.h>
#include <lib/callback/set_when_called.h>
#include <lib/gtest/test_loop_fixture.h>
#include "gmock/gmock.h"
#include "peridot/bin/ledger/coroutine/coroutine_impl.h"
#include "peridot/bin/ledger/p2p_sync/impl/device_mesh.h"
#include "peridot/bin/ledger/storage/fake/fake_object.h"
#include "peridot/bin/ledger/storage/testing/commit_empty_impl.h"
#include "peridot/bin/ledger/storage/testing/page_storage_empty_impl.h"
#include "peridot/lib/convert/convert.h"
using testing::IsEmpty;
namespace p2p_sync {
namespace {
// Creates a dummy object identifier.
// |object_digest| need not be valid (wrt. internal storage constraints) as it
// is only used as an opaque identifier for p2p.
storage::ObjectIdentifier MakeObjectIdentifier(std::string object_digest) {
return storage::ObjectIdentifier(
0, 0, storage::ObjectDigest(std::move(object_digest)));
}
class FakeCommit : public storage::CommitEmptyImpl {
public:
FakeCommit(std::string id, std::string data,
std::vector<storage::CommitId> parents = {})
: id_(std::move(id)),
data_(std::move(data)),
parents_(std::move(parents)) {}
const storage::CommitId& GetId() const override { return id_; }
std::vector<storage::CommitIdView> GetParentIds() const override {
std::vector<storage::CommitIdView> parent_ids;
for (const storage::CommitId& id : parents_) {
parent_ids.emplace_back(id);
}
return parent_ids;
}
fxl::StringView GetStorageBytes() const override { return data_; }
std::unique_ptr<const storage::Commit> Clone() const override {
return std::make_unique<FakeCommit>(id_, data_);
}
private:
const std::string id_;
const std::string data_;
const std::vector<storage::CommitId> parents_;
};
class FakePageStorage : public storage::PageStorageEmptyImpl {
public:
explicit FakePageStorage(async_dispatcher_t* dispatcher, std::string page_id)
: dispatcher_(dispatcher), page_id_(std::move(page_id)) {}
~FakePageStorage() override {}
storage::PageId GetId() override { return page_id_; }
void GetHeadCommitIds(
fit::function<void(storage::Status, std::vector<storage::CommitId>)>
callback) override {
callback(storage::Status::OK, {"commit_id"});
}
const FakeCommit& AddCommit(std::string id, std::string data) {
auto commit =
commits_.emplace(std::piecewise_construct, std::forward_as_tuple(id),
std::forward_as_tuple(id, std::move(data)));
return commit.first->second;
}
void GetCommit(storage::CommitIdView commit_id,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Commit>)>
callback) override {
auto it = commits_.find(commit_id);
if (it == commits_.end()) {
callback(storage::Status::NOT_FOUND, nullptr);
return;
}
callback(storage::Status::OK, it->second.Clone());
}
void GetPiece(storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status,
std::unique_ptr<const storage::Object>)>
callback) override {
async::PostTask(dispatcher_, [this, object_identifier,
callback = std::move(callback)]() {
const auto& it = objects_.find(object_identifier);
if (it == objects_.end()) {
callback(storage::Status::NOT_FOUND, nullptr);
return;
}
callback(storage::Status::OK, std::make_unique<storage::fake::FakeObject>(
object_identifier, it->second));
});
}
void SetPiece(storage::ObjectIdentifier object_identifier,
std::string contents, bool is_synced = false) {
objects_[object_identifier] = std::move(contents);
if (is_synced) {
synced_objects_.insert(std::move(object_identifier));
}
}
void IsPieceSynced(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status, bool)> callback) override {
async::PostTask(dispatcher_, [this, object_identifier,
callback = std::move(callback)]() {
const auto& it = objects_.find(object_identifier);
if (it == objects_.end()) {
callback(storage::Status::NOT_FOUND, false);
return;
}
callback(storage::Status::OK, synced_objects_.find(object_identifier) !=
synced_objects_.end());
});
}
void AddCommitsFromSync(
std::vector<storage::PageStorage::CommitIdAndBytes> ids_and_bytes,
const storage::ChangeSource /*source*/,
fit::function<void(storage::Status, std::vector<storage::CommitId>)>
callback) override {
commits_from_sync_.emplace_back(
std::piecewise_construct,
std::forward_as_tuple(std::move(ids_and_bytes)),
std::forward_as_tuple(std::move(callback)));
}
storage::Status AddCommitWatcher(storage::CommitWatcher* watcher) override {
FXL_DCHECK(!watcher_);
watcher_ = watcher;
return storage::Status::OK;
}
void MarkSyncedToPeer(
fit::function<void(storage::Status)> callback) override {
callback(mark_synced_to_peer_status);
}
storage::CommitWatcher* watcher_ = nullptr;
std::vector<std::pair<
std::vector<storage::PageStorage::CommitIdAndBytes>,
fit::function<void(storage::Status, std::vector<storage::CommitId>)>>>
commits_from_sync_;
storage::Status mark_synced_to_peer_status = storage::Status::OK;
private:
async_dispatcher_t* const dispatcher_;
const std::string page_id_;
std::map<storage::ObjectIdentifier, std::string> objects_;
std::set<storage::ObjectIdentifier> synced_objects_;
std::map<storage::CommitId, FakeCommit, convert::StringViewComparator>
commits_;
};
class FakeDeviceMesh : public DeviceMesh {
public:
FakeDeviceMesh() {}
~FakeDeviceMesh() override {}
DeviceSet GetDeviceList() override { return devices_; }
void Send(fxl::StringView device_name,
convert::ExtendedStringView data) override {
messages_.emplace_back(
std::forward_as_tuple(device_name.ToString(), data.ToString()));
}
DeviceSet devices_;
std::vector<std::pair<std::string, std::string>> messages_;
};
void BuildWatchStartBuffer(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
flatbuffers::Offset<Request> request = CreateRequest(
*buffer, namespace_page_id, RequestMessage_WatchStartRequest);
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
}
void BuildWatchStopBuffer(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
flatbuffers::Offset<Request> request = CreateRequest(
*buffer, namespace_page_id, RequestMessage_WatchStopRequest);
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
}
void BuildObjectRequestBuffer(
flatbuffers::FlatBufferBuilder* buffer, fxl::StringView namespace_id,
fxl::StringView page_id,
std::vector<storage::ObjectIdentifier> object_ids) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
std::vector<flatbuffers::Offset<ObjectId>> fb_object_ids;
fb_object_ids.reserve(object_ids.size());
for (const storage::ObjectIdentifier& object_id : object_ids) {
fb_object_ids.emplace_back(CreateObjectId(
*buffer, object_id.key_index(), object_id.deletion_scope_id(),
convert::ToFlatBufferVector(buffer,
object_id.object_digest().Serialize())));
}
flatbuffers::Offset<ObjectRequest> object_request =
CreateObjectRequest(*buffer, buffer->CreateVector(fb_object_ids));
flatbuffers::Offset<Request> fb_request =
CreateRequest(*buffer, namespace_page_id, RequestMessage_ObjectRequest,
object_request.Union());
flatbuffers::Offset<Message> fb_message =
CreateMessage(*buffer, MessageUnion_Request, fb_request.Union());
buffer->Finish(fb_message);
}
void BuildObjectResponseBuffer(
flatbuffers::FlatBufferBuilder* buffer, fxl::StringView namespace_id,
fxl::StringView page_id,
std::vector<std::tuple<storage::ObjectIdentifier, std::string, bool>>
data) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
std::vector<flatbuffers::Offset<Object>> fb_objects;
for (const auto& object_tuple : data) {
const storage::ObjectIdentifier& object_identifier =
std::get<0>(object_tuple);
const std::string& data = std::get<1>(object_tuple);
bool is_synced = std::get<2>(object_tuple);
flatbuffers::Offset<ObjectId> fb_object_id = CreateObjectId(
*buffer, object_identifier.key_index(),
object_identifier.deletion_scope_id(),
convert::ToFlatBufferVector(
buffer, object_identifier.object_digest().Serialize()));
if (!data.empty()) {
flatbuffers::Offset<Data> fb_data =
CreateData(*buffer, convert::ToFlatBufferVector(buffer, data));
fb_objects.emplace_back(
CreateObject(*buffer, fb_object_id, ObjectStatus_OK, fb_data,
is_synced ? ObjectSyncStatus_SYNCED_TO_CLOUD
: ObjectSyncStatus_UNSYNCED));
} else {
fb_objects.emplace_back(
CreateObject(*buffer, fb_object_id, ObjectStatus_UNKNOWN_OBJECT));
}
}
flatbuffers::Offset<ObjectResponse> object_response =
CreateObjectResponse(*buffer, buffer->CreateVector(fb_objects));
flatbuffers::Offset<Response> response =
CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id,
ResponseMessage_ObjectResponse, object_response.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Response, response.Union());
buffer->Finish(message);
}
void BuildCommitRequestBuffer(flatbuffers::FlatBufferBuilder* buffer,
fxl::StringView namespace_id,
fxl::StringView page_id,
std::vector<storage::CommitId> commit_ids) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id),
convert::ToFlatBufferVector(buffer, page_id));
std::vector<flatbuffers::Offset<CommitId>> fb_commit_ids;
fb_commit_ids.reserve(commit_ids.size());
for (const storage::CommitId& commit_id : commit_ids) {
fb_commit_ids.emplace_back(CreateCommitId(
*buffer, convert::ToFlatBufferVector(buffer, commit_id)));
}
flatbuffers::Offset<CommitRequest> commit_request =
CreateCommitRequest(*buffer, buffer->CreateVector(fb_commit_ids));
flatbuffers::Offset<Request> fb_request =
CreateRequest(*buffer, namespace_page_id, RequestMessage_CommitRequest,
commit_request.Union());
flatbuffers::Offset<Message> fb_message =
CreateMessage(*buffer, MessageUnion_Request, fb_request.Union());
buffer->Finish(fb_message);
}
class PageCommunicatorImplTest : public gtest::TestLoopFixture {
public:
PageCommunicatorImplTest() {}
~PageCommunicatorImplTest() override {}
protected:
void SetUp() override { ::testing::Test::SetUp(); }
coroutine::CoroutineServiceImpl coroutine_service_;
private:
FXL_DISALLOW_COPY_AND_ASSIGN(PageCommunicatorImplTest);
};
TEST_F(PageCommunicatorImplTest, ConnectToExistingMesh) {
FakeDeviceMesh mesh;
mesh.devices_.emplace("device2");
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
EXPECT_TRUE(mesh.messages_.empty());
page_communicator.Start();
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
if (!VerifyMessageBuffer(verifier)) {
// Wrong serialization, abort.
FXL_LOG(ERROR) << "The message received is malformed.";
return;
};
const Message* message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Request, message->message_type());
const Request* request = static_cast<const Request*>(message->message());
const NamespacePageId* namespace_page_id = request->namespace_page();
EXPECT_EQ("ledger",
convert::ExtendedStringView(namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(namespace_page_id->page_id()));
EXPECT_EQ(RequestMessage_WatchStartRequest, request->request_type());
}
TEST_F(PageCommunicatorImplTest, ConnectToNewMeshParticipant) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
EXPECT_TRUE(mesh.messages_.empty());
mesh.devices_.emplace("device2");
page_communicator.OnDeviceChange("device2",
p2p_provider::DeviceChangeType::NEW);
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
if (!VerifyMessageBuffer(verifier)) {
// Wrong serialization, abort.
FXL_LOG(ERROR) << "The message received is malformed.";
return;
};
const Message* message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Request, message->message_type());
const Request* request = static_cast<const Request*>(message->message());
const NamespacePageId* namespace_page_id = request->namespace_page();
EXPECT_EQ("ledger",
convert::ExtendedStringView(namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(namespace_page_id->page_id()));
EXPECT_EQ(RequestMessage_WatchStartRequest, request->request_type());
}
TEST_F(PageCommunicatorImplTest, GetObject) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
// Verify the message sent to request the object.
const Message* message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Request, message->message_type());
const Request* request = static_cast<const Request*>(message->message());
const NamespacePageId* namespace_page_id = request->namespace_page();
EXPECT_EQ("ledger",
convert::ExtendedStringView(namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(namespace_page_id->page_id()));
EXPECT_EQ(RequestMessage_ObjectRequest, request->request_type());
const ObjectRequest* object_request =
static_cast<const ObjectRequest*>(request->request());
EXPECT_EQ(1u, object_request->object_ids()->size());
EXPECT_EQ(0u, object_request->object_ids()->begin()->key_index());
EXPECT_EQ(0u, object_request->object_ids()->begin()->deletion_scope_id());
EXPECT_EQ("foo", convert::ExtendedStringView(
object_request->object_ids()->begin()->digest()));
}
TEST_F(PageCommunicatorImplTest, DontGetObjectsIfMarkPageSyncedToPeerFailed) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
&GetMessage);
// If storage fails to mark the page as synced to a peer, the mesh should not
// be updated.
storage.mark_synced_to_peer_status = storage::Status::IO_ERROR;
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
EXPECT_THAT(mesh.messages_, IsEmpty());
}
TEST_F(PageCommunicatorImplTest, ObjectRequest) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
storage.SetPiece(MakeObjectIdentifier("object_digest"), "some data");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
// Send request to PageCommunicator. We request two objects: |object_digest|
// and |object_digest2|. Only |object_digest| will be present in storage.
flatbuffers::FlatBufferBuilder request_buffer;
BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
{MakeObjectIdentifier("object_digest"),
MakeObjectIdentifier("object_digest2")});
MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
// Verify the response.
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
const Message* reply_message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
const Response* response =
static_cast<const Response*>(reply_message->message());
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page",
convert::ExtendedStringView(response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_ObjectResponse, response->response_type());
const ObjectResponse* object_response =
static_cast<const ObjectResponse*>(response->response());
ASSERT_EQ(2u, object_response->objects()->size());
auto it = object_response->objects()->begin();
EXPECT_EQ("object_digest", convert::ExtendedStringView(it->id()->digest()));
EXPECT_EQ(ObjectStatus_OK, it->status());
EXPECT_EQ("some data", convert::ExtendedStringView(it->data()->bytes()));
EXPECT_EQ(ObjectSyncStatus_UNSYNCED, it->sync_status());
it++;
EXPECT_EQ("object_digest2", convert::ExtendedStringView(it->id()->digest()));
EXPECT_EQ(ObjectStatus_UNKNOWN_OBJECT, it->status());
}
TEST_F(PageCommunicatorImplTest, ObjectRequestSynced) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
storage.SetPiece(MakeObjectIdentifier("object_digest"), "some data", true);
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
// Send request to PageCommunicator. We request two objects: |object_digest|
// and |object_digest2|. Only |object_digest| will be present in storage.
flatbuffers::FlatBufferBuilder request_buffer;
BuildObjectRequestBuffer(&request_buffer, "ledger", "page",
{MakeObjectIdentifier("object_digest")});
MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
// Verify the response.
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
const Message* reply_message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
const Response* response =
static_cast<const Response*>(reply_message->message());
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page",
convert::ExtendedStringView(response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_ObjectResponse, response->response_type());
const ObjectResponse* object_response =
static_cast<const ObjectResponse*>(response->response());
ASSERT_EQ(1u, object_response->objects()->size());
auto it = object_response->objects()->begin();
EXPECT_EQ("object_digest", convert::ExtendedStringView(it->id()->digest()));
EXPECT_EQ(ObjectStatus_OK, it->status());
EXPECT_EQ("some data", convert::ExtendedStringView(it->data()->bytes()));
EXPECT_EQ(ObjectSyncStatus_SYNCED_TO_CLOUD, it->sync_status());
}
TEST_F(PageCommunicatorImplTest, GetObjectProcessResponseSuccess) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::FlatBufferBuilder response_buffer;
BuildObjectResponseBuffer(
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false),
std::make_tuple(MakeObjectIdentifier("bar"), "bar_data", false)});
MessageHolder<Message> response_message(
convert::ToStringView(response_buffer), &GetMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ("foo_data", data->Get());
EXPECT_EQ(storage::IsObjectSynced::NO, is_object_synced);
}
TEST_F(PageCommunicatorImplTest, GetObjectProcessResponseSynced) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> new_device_message(convert::ToStringView(buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(new_device_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::FlatBufferBuilder response_buffer;
BuildObjectResponseBuffer(
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", true)});
MessageHolder<Message> response_message(
convert::ToStringView(response_buffer), &GetMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ("foo_data", data->Get());
EXPECT_EQ(storage::IsObjectSynced::YES, is_object_synced);
}
TEST_F(PageCommunicatorImplTest, GetObjectProcessResponseFail) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::FlatBufferBuilder response_buffer;
BuildObjectResponseBuffer(
&response_buffer, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
MessageHolder<Message> response_message(
convert::ToStringView(response_buffer), &GetMessage);
page_communicator.OnNewResponse(
"device2", std::move(response_message)
.TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::NOT_FOUND, status);
EXPECT_FALSE(data);
}
TEST_F(PageCommunicatorImplTest, GetObjectProcessResponseMultiDeviceSuccess) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device3",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
EXPECT_EQ(2u, mesh.messages_.size());
flatbuffers::FlatBufferBuilder response_buffer_1;
BuildObjectResponseBuffer(
&response_buffer_1, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
&GetMessage);
page_communicator.OnNewResponse(
"device2",
std::move(message_1).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_FALSE(called);
flatbuffers::FlatBufferBuilder response_buffer_2;
BuildObjectResponseBuffer(
&response_buffer_2, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "foo_data", false)});
MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
&GetMessage);
page_communicator.OnNewResponse(
"device3",
std::move(message_2).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::OK, status);
EXPECT_EQ("foo_data", data->Get());
EXPECT_EQ(storage::ChangeSource::P2P, source);
EXPECT_EQ(storage::IsObjectSynced::NO, is_object_synced);
}
TEST_F(PageCommunicatorImplTest, GetObjectProcessResponseMultiDeviceFail) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
message = MessageHolder<Message>(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device3",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called;
storage::Status status;
storage::ChangeSource source;
storage::IsObjectSynced is_object_synced;
std::unique_ptr<storage::DataSource::DataChunk> data;
page_communicator.GetObject(
MakeObjectIdentifier("foo"),
callback::Capture(callback::SetWhenCalled(&called), &status, &source,
&is_object_synced, &data));
RunLoopUntilIdle();
EXPECT_FALSE(called);
EXPECT_EQ(2u, mesh.messages_.size());
flatbuffers::FlatBufferBuilder response_buffer_1;
BuildObjectResponseBuffer(
&response_buffer_1, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
MessageHolder<Message> message_1(convert::ToStringView(response_buffer_1),
&GetMessage);
page_communicator.OnNewResponse(
"device2",
std::move(message_1).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_FALSE(called);
flatbuffers::FlatBufferBuilder response_buffer_2;
BuildObjectResponseBuffer(
&response_buffer_2, "ledger", "page",
{std::make_tuple(MakeObjectIdentifier("foo"), "", false)});
MessageHolder<Message> message_2(convert::ToStringView(response_buffer_2),
&GetMessage);
page_communicator.OnNewResponse(
"device3",
std::move(message_2).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
}));
EXPECT_TRUE(called);
EXPECT_EQ(storage::Status::NOT_FOUND, status);
EXPECT_FALSE(data);
}
TEST_F(PageCommunicatorImplTest, CommitUpdate) {
FakeDeviceMesh mesh;
FakePageStorage storage_1(dispatcher(), "page");
PageCommunicatorImpl page_communicator_1(&coroutine_service_, &storage_1,
&storage_1, "ledger", "page", &mesh);
page_communicator_1.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator_1.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
FakePageStorage storage_2(dispatcher(), "page");
PageCommunicatorImpl page_communicator_2(&coroutine_service_, &storage_2,
&storage_2, "ledger", "page", &mesh);
page_communicator_2.Start();
std::vector<std::unique_ptr<const storage::Commit>> commits;
commits.emplace_back(std::make_unique<FakeCommit>("id 1", "data 1"));
commits.emplace_back(std::make_unique<FakeCommit>("id 2", "data 2"));
ASSERT_NE(nullptr, storage_1.watcher_);
storage_1.watcher_->OnNewCommits(commits, storage::ChangeSource::CLOUD);
RunLoopUntilIdle();
// No new message is sent on commits from CLOUD.
ASSERT_EQ(0u, mesh.messages_.size());
storage_1.watcher_->OnNewCommits(commits, storage::ChangeSource::P2P);
RunLoopUntilIdle();
// No new message is sent on commits from P2P either.
ASSERT_EQ(0u, mesh.messages_.size());
storage_1.watcher_->OnNewCommits(commits, storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
// Local commit: a message is sent.
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message).TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
});
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page",
convert::ExtendedStringView(response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_CommitResponse, response->response_type());
// Send it to the other side.
page_communicator_2.OnNewResponse("device1", std::move(response));
RunLoopUntilIdle();
// The other side's storage has the commit.
ASSERT_EQ(1u, storage_2.commits_from_sync_.size());
ASSERT_EQ(2u, storage_2.commits_from_sync_[0].first.size());
EXPECT_EQ("id 1", storage_2.commits_from_sync_[0].first[0].id);
EXPECT_EQ("data 1", storage_2.commits_from_sync_[0].first[0].bytes);
EXPECT_EQ("id 2", storage_2.commits_from_sync_[0].first[1].id);
EXPECT_EQ("data 2", storage_2.commits_from_sync_[0].first[1].bytes);
// Verify we don't crash on response from storage
storage_2.commits_from_sync_[0].second(storage::Status::OK, {});
RunLoopUntilIdle();
}
TEST_F(PageCommunicatorImplTest, GetObjectDisconnect) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
bool called1, called2, called3, called4;
storage::Status status1, status2, status3, status4;
storage::ChangeSource source1, source2, source3, source4;
storage::IsObjectSynced is_object_synced1, is_object_synced2,
is_object_synced3, is_object_synced4;
std::unique_ptr<storage::DataSource::DataChunk> data1, data2, data3, data4;
page_communicator.GetObject(
MakeObjectIdentifier("foo1"),
callback::Capture(callback::SetWhenCalled(&called1), &status1, &source1,
&is_object_synced1, &data1));
page_communicator.GetObject(
MakeObjectIdentifier("foo2"),
callback::Capture(callback::SetWhenCalled(&called2), &status2, &source2,
&is_object_synced2, &data2));
page_communicator.GetObject(
MakeObjectIdentifier("foo3"),
callback::Capture(callback::SetWhenCalled(&called3), &status3, &source3,
&is_object_synced3, &data3));
page_communicator.GetObject(
MakeObjectIdentifier("foo4"),
callback::Capture(callback::SetWhenCalled(&called4), &status4, &source4,
&is_object_synced4, &data4));
RunLoopUntilIdle();
EXPECT_FALSE(called1);
EXPECT_FALSE(called2);
EXPECT_FALSE(called3);
EXPECT_FALSE(called4);
EXPECT_EQ(4u, mesh.messages_.size());
flatbuffers::FlatBufferBuilder stop_buffer;
BuildWatchStopBuffer(&stop_buffer, "ledger", "page");
MessageHolder<Message> watch_stop_message(convert::ToStringView(stop_buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(watch_stop_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
// All requests are terminated with a not found status.
EXPECT_TRUE(called1);
EXPECT_EQ(storage::Status::NOT_FOUND, status1);
EXPECT_EQ(storage::ChangeSource::P2P, source1);
EXPECT_FALSE(data1);
EXPECT_TRUE(called2);
EXPECT_EQ(storage::Status::NOT_FOUND, status2);
EXPECT_EQ(storage::ChangeSource::P2P, source2);
EXPECT_FALSE(data2);
EXPECT_TRUE(called3);
EXPECT_EQ(storage::Status::NOT_FOUND, status3);
EXPECT_EQ(storage::ChangeSource::P2P, source3);
EXPECT_FALSE(data3);
EXPECT_TRUE(called4);
EXPECT_EQ(storage::Status::NOT_FOUND, status4);
EXPECT_EQ(storage::ChangeSource::P2P, source4);
EXPECT_FALSE(data4);
}
TEST_F(PageCommunicatorImplTest, CommitRequest) {
FakeDeviceMesh mesh;
FakePageStorage storage(dispatcher(), "page");
const storage::Commit& commit_1 = storage.AddCommit("commit1", "data1");
PageCommunicatorImpl page_communicator(&coroutine_service_, &storage,
&storage, "ledger", "page", &mesh);
page_communicator.Start();
// Send request to PageCommunicator. We request two objects: |object_digest|
// and |object_digest2|. Only |object_digest| will be present in storage.
flatbuffers::FlatBufferBuilder request_buffer;
BuildCommitRequestBuffer(&request_buffer, "ledger", "page",
{storage::CommitId(commit_1.GetId()),
storage::CommitId("missing_commit")});
MessageHolder<Message> request_message(convert::ToStringView(request_buffer),
&GetMessage);
page_communicator.OnNewRequest(
"device2", std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
// Verify the response.
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
const Message* reply_message = GetMessage(mesh.messages_[0].second.data());
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
const Response* response =
static_cast<const Response*>(reply_message->message());
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page",
convert::ExtendedStringView(response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_CommitResponse, response->response_type());
const CommitResponse* commit_response =
static_cast<const CommitResponse*>(response->response());
ASSERT_EQ(2u, commit_response->commits()->size());
auto it = commit_response->commits()->begin();
EXPECT_EQ("commit1", convert::ExtendedStringView(it->id()->id()));
EXPECT_EQ(CommitStatus_OK, it->status());
EXPECT_EQ("data1", convert::ExtendedStringView(it->commit()->bytes()));
it++;
EXPECT_EQ("missing_commit", convert::ExtendedStringView(it->id()->id()));
EXPECT_EQ(CommitStatus_UNKNOWN_COMMIT, it->status());
}
// Sends an update for new commits that triggers a backlog sync.
TEST_F(PageCommunicatorImplTest, CommitBatchUpdate) {
FakeDeviceMesh mesh;
FakePageStorage storage_1(dispatcher(), "page");
storage_1.AddCommit("id 0", "data 0");
PageCommunicatorImpl page_communicator_1(&coroutine_service_, &storage_1,
&storage_1, "ledger", "page", &mesh);
page_communicator_1.Start();
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer, "ledger", "page");
MessageHolder<Message> message(convert::ToStringView(buffer), &GetMessage);
page_communicator_1.OnNewRequest(
"device2",
std::move(message).TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
}));
RunLoopUntilIdle();
FakePageStorage storage_2(dispatcher(), "page");
PageCommunicatorImpl page_communicator_2(&coroutine_service_, &storage_2,
&storage_2, "ledger", "page", &mesh);
page_communicator_2.Start();
std::vector<std::unique_ptr<const storage::Commit>> commits;
commits.emplace_back(std::make_unique<FakeCommit>(
"id 1", "data 1", std::vector<storage::CommitId>({"id 0"})));
commits.emplace_back(std::make_unique<FakeCommit>(
"id 2", "data 2", std::vector<storage::CommitId>({"id 1"})));
storage_1.watcher_->OnNewCommits(commits, storage::ChangeSource::LOCAL);
RunLoopUntilIdle();
// Local commit: a message is sent.
ASSERT_EQ(1u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[0].first);
{
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[0].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
MessageHolder<Message> reply_message(mesh.messages_[0].second, &GetMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message)
.TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
});
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(
response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_CommitResponse, response->response_type());
// Send it to the other side.
page_communicator_2.OnNewResponse("device1", std::move(response));
}
RunLoopUntilIdle();
// PageCommunicator should have tried to add the commit.
ASSERT_EQ(1u, storage_2.commits_from_sync_.size());
EXPECT_EQ(2u, storage_2.commits_from_sync_[0].first.size());
// Return that we miss one commit
storage_2.commits_from_sync_[0].second(storage::Status::NOT_FOUND, {"id 0"});
// |page_communicator_2| should ask for the base, "id 0" commit.
ASSERT_EQ(2u, mesh.messages_.size());
EXPECT_EQ("device1", mesh.messages_[1].first);
{
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[1].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
MessageHolder<Message> request_message(mesh.messages_[1].second,
&GetMessage);
ASSERT_EQ(MessageUnion_Request, request_message->message_type());
MessageHolder<Request> request =
std::move(request_message)
.TakeAndMap<Request>([](const Message* message) {
return static_cast<const Request*>(message->message());
});
const NamespacePageId* request_namespace_page_id =
request->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
request_namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(
request_namespace_page_id->page_id()));
EXPECT_EQ(RequestMessage_CommitRequest, request->request_type());
// Send it to the other side.
page_communicator_1.OnNewRequest("device2", std::move(request));
}
RunLoopUntilIdle();
// |page_communicator_1| sends commit "id 0" to device 2.
ASSERT_EQ(3u, mesh.messages_.size());
EXPECT_EQ("device2", mesh.messages_[2].first);
{
flatbuffers::Verifier verifier(
reinterpret_cast<const unsigned char*>(mesh.messages_[2].second.data()),
mesh.messages_[0].second.size());
ASSERT_TRUE(VerifyMessageBuffer(verifier));
MessageHolder<Message> reply_message(mesh.messages_[2].second, &GetMessage);
ASSERT_EQ(MessageUnion_Response, reply_message->message_type());
MessageHolder<Response> response =
std::move(reply_message)
.TakeAndMap<Response>([](const Message* message) {
return static_cast<const Response*>(message->message());
});
const NamespacePageId* response_namespace_page_id =
response->namespace_page();
EXPECT_EQ("ledger", convert::ExtendedStringView(
response_namespace_page_id->namespace_id()));
EXPECT_EQ("page", convert::ExtendedStringView(
response_namespace_page_id->page_id()));
EXPECT_EQ(ResponseMessage_CommitResponse, response->response_type());
// Send it to the other side.
page_communicator_2.OnNewResponse("device1", std::move(response));
}
RunLoopUntilIdle();
// Verify that we are truely adding the whole commit batch.
ASSERT_EQ(2u, storage_2.commits_from_sync_.size());
EXPECT_EQ(3u, storage_2.commits_from_sync_[1].first.size());
EXPECT_EQ("id 0", storage_2.commits_from_sync_[1].first[0].id);
EXPECT_EQ("data 0", storage_2.commits_from_sync_[1].first[0].bytes);
EXPECT_EQ("id 1", storage_2.commits_from_sync_[1].first[1].id);
EXPECT_EQ("data 1", storage_2.commits_from_sync_[1].first[1].bytes);
EXPECT_EQ("id 2", storage_2.commits_from_sync_[1].first[2].id);
EXPECT_EQ("data 2", storage_2.commits_from_sync_[1].first[2].bytes);
// Verify we don't crash on response from storage
storage_2.commits_from_sync_[1].second(storage::Status::OK, {});
}
} // namespace
} // namespace p2p_sync