blob: cf927cfe3cd372aebc6b9571ca37510a983d2e92 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/p2p_sync/impl/page_communicator_impl.h"
#include <lib/callback/scoped_callback.h>
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/p2p_sync/impl/message_generated.h"
#include "src/ledger/bin/storage/public/read_data_source.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/coroutine/coroutine_waiter.h"
#include "src/lib/fxl/memory/ref_ptr.h"
namespace p2p_sync {
namespace {
storage::ObjectIdentifier ToObjectIdentifier(const ObjectId* fb_object_id) {
uint32_t key_index = fb_object_id->key_index();
uint32_t deletion_scope_id = fb_object_id->deletion_scope_id();
return storage::ObjectIdentifier{
key_index, deletion_scope_id,
storage::ObjectDigest(fb_object_id->digest())};
}
} // namespace
// PendingObjectRequestHolder holds state for object requests that have been
// sent to peers and for which we wait for an answer.
class PageCommunicatorImpl::PendingObjectRequestHolder {
public:
explicit PendingObjectRequestHolder(
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback)
: callback_(std::move(callback)) {}
void set_on_empty(fit::closure on_empty) { on_empty_ = std::move(on_empty); }
// Registers a new pending request to device |destination|.
void AddNewPendingRequest(std::string destination) {
requests_.emplace(std::move(destination));
}
// Processes the response from device |source|.
void Complete(fxl::StringView source, const Object* object) {
auto it = requests_.find(source);
if (it == requests_.end()) {
return;
}
if (object == nullptr || object->status() == ObjectStatus_UNKNOWN_OBJECT) {
requests_.erase(it);
if (!requests_.empty()) {
return;
}
// All requests have returned and none is valid: return an error.
callback_(storage::Status::INTERNAL_NOT_FOUND, storage::ChangeSource::P2P,
storage::IsObjectSynced::NO, nullptr);
if (on_empty_) {
on_empty_();
}
return;
}
std::unique_ptr<storage::DataSource::DataChunk> chunk =
storage::DataSource::DataChunk::Create(
convert::ToString(object->data()->bytes()));
storage::IsObjectSynced is_object_synced;
switch (object->sync_status()) {
case ObjectSyncStatus_UNSYNCED:
is_object_synced = storage::IsObjectSynced::NO;
break;
case ObjectSyncStatus_SYNCED_TO_CLOUD:
is_object_synced = storage::IsObjectSynced::YES;
break;
}
callback_(storage::Status::OK, storage::ChangeSource::P2P, is_object_synced,
std::move(chunk));
if (on_empty_) {
on_empty_();
}
}
private:
fit::function<void(
storage::Status, storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)> const callback_;
// Set of devices for which we are waiting an answer.
// We might be able to get rid of this list and just use a counter (or even
// nothing at all) once we have a timeout on requests.
std::set<std::string, convert::StringViewComparator> requests_;
fit::closure on_empty_;
};
// ObjectResponseHolder holds temporary data we collect in order to build
// ObjectResponses.
// This is necessary as object data (from |storage::Object|) and synchronization
// data come from different asynchronous calls.
struct PageCommunicatorImpl::ObjectResponseHolder {
storage::ObjectIdentifier identifier;
std::unique_ptr<const storage::Piece> piece;
bool is_synced = false;
explicit ObjectResponseHolder(storage::ObjectIdentifier identifier)
: identifier(std::move(identifier)) {}
};
PageCommunicatorImpl::PageCommunicatorImpl(
coroutine::CoroutineService* coroutine_service,
storage::PageStorage* storage, storage::PageSyncClient* sync_client,
std::string namespace_id, std::string page_id, DeviceMesh* mesh)
: coroutine_manager_(coroutine_service),
namespace_id_(std::move(namespace_id)),
page_id_(std::move(page_id)),
mesh_(mesh),
storage_(storage),
sync_client_(sync_client),
weak_factory_(this) {}
PageCommunicatorImpl::~PageCommunicatorImpl() {
FXL_DCHECK(!in_destructor_);
in_destructor_ = true;
flatbuffers::FlatBufferBuilder buffer;
if (!started_) {
if (on_delete_) {
on_delete_();
}
return;
}
BuildWatchStopBuffer(&buffer);
for (const auto& device : interested_devices_) {
mesh_->Send(device, buffer);
}
if (on_delete_) {
on_delete_();
}
}
void PageCommunicatorImpl::Start() {
FXL_DCHECK(!started_);
started_ = true;
sync_client_->SetSyncDelegate(this);
storage_->AddCommitWatcher(this);
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer);
for (const auto& device : mesh_->GetDeviceList()) {
mesh_->Send(device, buffer);
}
}
void PageCommunicatorImpl::set_on_delete(fit::closure on_delete) {
FXL_DCHECK(!on_delete_) << "set_on_delete() can only be called once.";
on_delete_ = std::move(on_delete);
}
void PageCommunicatorImpl::OnDeviceChange(
fxl::StringView remote_device, p2p_provider::DeviceChangeType change_type) {
if (!started_ || in_destructor_) {
return;
}
if (change_type == p2p_provider::DeviceChangeType::DELETED) {
const auto& it = interested_devices_.find(remote_device);
if (it != interested_devices_.end()) {
interested_devices_.erase(it);
}
const auto& it2 = not_interested_devices_.find(remote_device);
if (it2 != not_interested_devices_.end()) {
not_interested_devices_.erase(it2);
}
const auto& it3 = pending_commit_batches_.find(remote_device);
if (it3 != pending_commit_batches_.end()) {
pending_commit_batches_.erase(it3);
}
return;
}
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer);
mesh_->Send(remote_device, buffer);
}
void PageCommunicatorImpl::OnNewRequest(fxl::StringView source,
MessageHolder<Request> message) {
FXL_DCHECK(!in_destructor_);
switch (message->request_type()) {
case RequestMessage_WatchStartRequest: {
MarkSyncedToPeer(
[this, source = source.ToString()](storage::Status status) {
if (status != storage::Status::OK) {
// If we fail to mark the page storage as synced to a peer, we
// might end up in a situation of deleting from disk a partially
// synced page. Log an error and return.
FXL_LOG(ERROR) << "Failed to mark PageStorage as synced to peer";
return;
}
if (interested_devices_.find(source) == interested_devices_.end()) {
interested_devices_.insert(source);
}
auto it = not_interested_devices_.find(source);
if (it != not_interested_devices_.end()) {
// The device used to be uninterested, but now wants updates.
// Let's contact it again.
not_interested_devices_.erase(it);
flatbuffers::FlatBufferBuilder buffer;
BuildWatchStartBuffer(&buffer);
mesh_->Send(source, buffer);
}
});
break;
}
case RequestMessage_WatchStopRequest: {
const auto& it = interested_devices_.find(source);
if (it != interested_devices_.end()) {
interested_devices_.erase(it);
}
// Device |source| disconnected, thus will not answer any request. We thus
// mark all pending requests to |source| to be finished.
std::vector<PendingObjectRequestHolder*> requests;
requests.reserve(pending_object_requests_.size());
for (auto& object_request : pending_object_requests_) {
// We cannot call Complete here because it deletes the object, making
// the iterator used in this for loop invalid.
requests.push_back(&object_request.second);
}
for (PendingObjectRequestHolder* const request : requests) {
request->Complete(source, nullptr);
}
break;
}
case RequestMessage_CommitRequest:
ProcessCommitRequest(
source.ToString(),
std::move(message).TakeAndMap<CommitRequest>(
[](const Request* request) {
return static_cast<const CommitRequest*>(request->request());
}));
break;
case RequestMessage_ObjectRequest:
ProcessObjectRequest(
source,
std::move(message).TakeAndMap<ObjectRequest>(
[](const Request* request) {
return static_cast<const ObjectRequest*>(request->request());
}));
break;
case RequestMessage_NONE:
FXL_LOG(ERROR) << "The message received is malformed";
break;
}
}
void PageCommunicatorImpl::OnNewResponse(fxl::StringView source,
MessageHolder<Response> message) {
FXL_DCHECK(!in_destructor_);
if (message->status() != ResponseStatus_OK) {
// The namespace or page was unknown on the other side. We can probably do
// something smart with this information (for instance, stop sending
// requests over), but we just ignore it for now.
not_interested_devices_.emplace(source.ToString());
return;
}
switch (message->response_type()) {
case ResponseMessage_ObjectResponse: {
const ObjectResponse* object_response =
static_cast<const ObjectResponse*>(message->response());
for (const Object* object : *(object_response->objects())) {
auto object_id = ToObjectIdentifier(object->id());
auto pending_request = pending_object_requests_.find(object_id);
if (pending_request == pending_object_requests_.end()) {
continue;
}
pending_request->second.Complete(source, object);
}
break;
}
case ResponseMessage_CommitResponse: {
const CommitResponse* commit_response =
static_cast<const CommitResponse*>(message->response());
std::vector<storage::PageStorage::CommitIdAndBytes> commits;
for (const Commit* commit : *(commit_response->commits())) {
if (commit->status() != CommitStatus_OK) {
continue;
}
commits.emplace_back(convert::ToString(commit->id()->id()),
convert::ToString(commit->commit()->bytes()));
}
auto it = pending_commit_batches_.find(source);
if (it != pending_commit_batches_.end()) {
it->second.AddToBatch(std::move(commits));
} else {
auto it_pair = pending_commit_batches_.emplace(
std::piecewise_construct, std::forward_as_tuple(source.ToString()),
std::forward_as_tuple(source.ToString(), this, storage_));
it_pair.first->second.AddToBatch(std::move(commits));
}
break;
}
case ResponseMessage_NONE:
FXL_LOG(ERROR) << "The message received is malformed";
return;
}
}
void PageCommunicatorImpl::GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
flatbuffers::FlatBufferBuilder buffer;
BuildObjectRequestBuffer(&buffer, object_identifier);
auto request_holder = pending_object_requests_.emplace(
std::move(object_identifier), std::move(callback));
for (const auto& device : interested_devices_) {
request_holder.first->second.AddNewPendingRequest(device);
}
for (const auto& device : interested_devices_) {
mesh_->Send(device, buffer);
}
}
void PageCommunicatorImpl::OnNewCommits(
const std::vector<std::unique_ptr<const storage::Commit>>& commits,
storage::ChangeSource source) {
if (source != storage::ChangeSource::LOCAL) {
// Don't propagate synced commits.
return;
}
for (const auto& commit : commits) {
commits_to_upload_.emplace_back(commit->Clone());
}
// We need to check if we need to merge first.
std::vector<std::unique_ptr<const storage::Commit>> head_commits;
storage::Status status = storage_->GetHeadCommits(&head_commits);
if (status != storage::Status::OK) {
return;
}
if (head_commits.size() != 1) {
// A merge needs to happen, let's wait until we
// have one.
return;
}
if (commits_to_upload_.empty()) {
// Commits have already been sent. Let's stop
// early.
return;
}
flatbuffers::FlatBufferBuilder buffer;
BuildCommitBuffer(&buffer, commits_to_upload_);
for (const auto& device : interested_devices_) {
mesh_->Send(device, buffer);
}
commits_to_upload_.clear();
}
void PageCommunicatorImpl::RequestCommits(fxl::StringView device,
std::vector<storage::CommitId> ids) {
flatbuffers::FlatBufferBuilder buffer;
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(buffer,
convert::ToFlatBufferVector(&buffer, namespace_id_),
convert::ToFlatBufferVector(&buffer, page_id_));
std::vector<flatbuffers::Offset<CommitId>> commit_ids;
for (const auto& id : ids) {
flatbuffers::Offset<CommitId> commit_id =
CreateCommitId(buffer, convert::ToFlatBufferVector(&buffer, id));
commit_ids.push_back(commit_id);
}
flatbuffers::Offset<CommitRequest> commit_request =
CreateCommitRequest(buffer, buffer.CreateVector(commit_ids));
flatbuffers::Offset<Request> request =
CreateRequest(buffer, namespace_page_id, RequestMessage_CommitRequest,
commit_request.Union());
flatbuffers::Offset<Message> message =
CreateMessage(buffer, MessageUnion_Request, request.Union());
buffer.Finish(message);
mesh_->Send(device, buffer);
}
void PageCommunicatorImpl::BuildWatchStartBuffer(
flatbuffers::FlatBufferBuilder* buffer) {
flatbuffers::Offset<WatchStartRequest> watch_start =
CreateWatchStartRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
flatbuffers::Offset<Request> request =
CreateRequest(*buffer, namespace_page_id,
RequestMessage_WatchStartRequest, watch_start.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::BuildWatchStopBuffer(
flatbuffers::FlatBufferBuilder* buffer) {
flatbuffers::Offset<WatchStopRequest> watch_stop =
CreateWatchStopRequest(*buffer);
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
flatbuffers::Offset<Request> request =
CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest,
watch_stop.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::BuildObjectRequestBuffer(
flatbuffers::FlatBufferBuilder* buffer,
storage::ObjectIdentifier object_identifier) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
flatbuffers::Offset<ObjectId> object_id = CreateObjectId(
*buffer, object_identifier.key_index(),
object_identifier.deletion_scope_id(),
convert::ToFlatBufferVector(
buffer, object_identifier.object_digest().Serialize()));
flatbuffers::Offset<ObjectRequest> object_request = CreateObjectRequest(
*buffer, buffer->CreateVector(
std::vector<flatbuffers::Offset<ObjectId>>({object_id})));
flatbuffers::Offset<Request> request =
CreateRequest(*buffer, namespace_page_id, RequestMessage_ObjectRequest,
object_request.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Request, request.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::BuildCommitBuffer(
flatbuffers::FlatBufferBuilder* buffer,
const std::vector<std::unique_ptr<const storage::Commit>>& commits) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
std::vector<flatbuffers::Offset<Commit>> fb_commits;
for (const auto& commit : commits) {
flatbuffers::Offset<CommitId> fb_commit_id = CreateCommitId(
*buffer, convert::ToFlatBufferVector(buffer, commit->GetId()));
flatbuffers::Offset<Data> fb_commit_data = CreateData(
*buffer,
convert::ToFlatBufferVector(buffer, commit->GetStorageBytes()));
fb_commits.emplace_back(
CreateCommit(*buffer, fb_commit_id, CommitStatus_OK, fb_commit_data));
}
flatbuffers::Offset<CommitResponse> commit_response =
CreateCommitResponse(*buffer, buffer->CreateVector(fb_commits));
flatbuffers::Offset<Response> response =
CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id,
ResponseMessage_CommitResponse, commit_response.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Response, response.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::BuildCommitResponseBuffer(
flatbuffers::FlatBufferBuilder* buffer,
const std::vector<
std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>&
commits) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
std::vector<flatbuffers::Offset<Commit>> fb_commits;
for (const auto& commit : commits) {
flatbuffers::Offset<CommitId> fb_commit_id = CreateCommitId(
*buffer, convert::ToFlatBufferVector(buffer, commit.first));
if (commit.second) {
flatbuffers::Offset<Data> fb_commit_data =
CreateData(*buffer, convert::ToFlatBufferVector(
buffer, commit.second->GetStorageBytes()));
fb_commits.emplace_back(
CreateCommit(*buffer, fb_commit_id, CommitStatus_OK, fb_commit_data));
} else {
fb_commits.emplace_back(
CreateCommit(*buffer, fb_commit_id, CommitStatus_UNKNOWN_COMMIT));
}
}
flatbuffers::Offset<CommitResponse> commit_response =
CreateCommitResponse(*buffer, buffer->CreateVector(fb_commits));
flatbuffers::Offset<Response> response =
CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id,
ResponseMessage_CommitResponse, commit_response.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Response, response.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::ProcessCommitRequest(
std::string source, MessageHolder<CommitRequest> request) {
coroutine_manager_.StartCoroutine([this, source = std::move(source),
request = std::move(request)](
coroutine::CoroutineHandler* handler) {
auto commit_waiter = fxl::MakeRefCounted<callback::Waiter<
storage::Status,
std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>>(
storage::Status::OK);
for (const CommitId* id : *request->commit_ids()) {
storage_->GetCommit(
id->id(), [commit_id = convert::ToString(id->id()),
callback = commit_waiter->NewCallback()](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) mutable {
if (status == storage::Status::INTERNAL_NOT_FOUND) {
// Not finding an commit is okay in this context: we'll just
// reply we don't have it. There is not need to abort
// processing the request.
callback(storage::Status::OK,
std::make_pair(std::move(commit_id), nullptr));
return;
}
callback(status,
std::make_pair(std::move(commit_id), std::move(commit)));
});
}
storage::Status status;
std::vector<
std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>
commits;
if (coroutine::Wait(handler, std::move(commit_waiter), &status, &commits) ==
coroutine::ContinuationStatus::INTERRUPTED) {
return;
}
if (status != storage::Status::OK) {
return;
}
flatbuffers::FlatBufferBuilder buffer;
BuildCommitResponseBuffer(&buffer, commits);
mesh_->Send(source, buffer);
});
}
void PageCommunicatorImpl::ProcessObjectRequest(
fxl::StringView source, MessageHolder<ObjectRequest> request) {
coroutine_manager_.StartCoroutine([this, source = source.ToString(),
request = std::move(request)](
coroutine::CoroutineHandler* handler) {
// We use a std::list so that we can keep a reference to an element
// while adding new items.
std::list<ObjectResponseHolder> object_responses;
auto response_waiter =
fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>(
storage::Status::OK);
for (const ObjectId* object_id : *request->object_ids()) {
storage::ObjectIdentifier identifier{
object_id->key_index(), object_id->deletion_scope_id(),
storage::ObjectDigest(object_id->digest())};
object_responses.emplace_back(identifier);
auto& response = object_responses.back();
storage_->GetPiece(
identifier, [callback = response_waiter->NewCallback(), &response](
storage::Status status,
std::unique_ptr<const storage::Piece> piece) mutable {
if (status == storage::Status::INTERNAL_NOT_FOUND) {
// Not finding an object is okay in this context: we'll just
// reply we don't have it. There is not need to abort
// processing the request.
callback(storage::Status::OK);
return;
}
response.piece = std::move(piece);
callback(status);
});
storage_->IsPieceSynced(
std::move(identifier),
[callback = response_waiter->NewCallback(), &response](
storage::Status status, bool is_synced) {
if (status == storage::Status::INTERNAL_NOT_FOUND) {
// Not finding an object is okay in this context: we'll just
// reply we don't have it. There is not need to abort
// processing the request.
callback(storage::Status::OK);
return;
}
response.is_synced = is_synced;
callback(status);
});
}
storage::Status status;
if (coroutine::Wait(handler, response_waiter, &status) ==
coroutine::ContinuationStatus::INTERRUPTED) {
return;
}
if (status != storage::Status::OK) {
FXL_LOG(WARNING) << "Error while retrieving objects: " << status;
return;
}
flatbuffers::FlatBufferBuilder buffer;
BuildObjectResponseBuffer(&buffer, std::move(object_responses));
mesh_->Send(source, buffer);
});
}
void PageCommunicatorImpl::BuildObjectResponseBuffer(
flatbuffers::FlatBufferBuilder* buffer,
std::list<ObjectResponseHolder> object_responses) {
flatbuffers::Offset<NamespacePageId> namespace_page_id =
CreateNamespacePageId(*buffer,
convert::ToFlatBufferVector(buffer, namespace_id_),
convert::ToFlatBufferVector(buffer, page_id_));
std::vector<flatbuffers::Offset<Object>> fb_objects;
for (const ObjectResponseHolder& object_response : object_responses) {
flatbuffers::Offset<ObjectId> fb_object_id = CreateObjectId(
*buffer, object_response.identifier.key_index(),
object_response.identifier.deletion_scope_id(),
convert::ToFlatBufferVector(
buffer, object_response.identifier.object_digest().Serialize()));
if (object_response.piece) {
fxl::StringView data = object_response.piece->GetData();
flatbuffers::Offset<Data> fb_data =
CreateData(*buffer, convert::ToFlatBufferVector(buffer, data));
ObjectSyncStatus sync_status = object_response.is_synced
? ObjectSyncStatus_SYNCED_TO_CLOUD
: ObjectSyncStatus_UNSYNCED;
fb_objects.emplace_back(CreateObject(
*buffer, fb_object_id, ObjectStatus_OK, fb_data, sync_status));
} else {
fb_objects.emplace_back(
CreateObject(*buffer, fb_object_id, ObjectStatus_UNKNOWN_OBJECT));
}
}
flatbuffers::Offset<ObjectResponse> object_response =
CreateObjectResponse(*buffer, buffer->CreateVector(fb_objects));
flatbuffers::Offset<Response> response =
CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id,
ResponseMessage_ObjectResponse, object_response.Union());
flatbuffers::Offset<Message> message =
CreateMessage(*buffer, MessageUnion_Response, response.Union());
buffer->Finish(message);
}
void PageCommunicatorImpl::MarkSyncedToPeer(
fit::function<void(storage::Status)> callback) {
if (marked_as_synced_to_peer_) {
callback(storage::Status::OK);
return;
}
storage_->MarkSyncedToPeer(
[this, callback = std::move(callback)](storage::Status status) {
if (status == storage::Status::OK) {
marked_as_synced_to_peer_ = true;
}
callback(status);
});
}
} // namespace p2p_sync