| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/p2p_sync/impl/page_communicator_impl.h" |
| |
| #include <lib/callback/scoped_callback.h> |
| #include <lib/callback/waiter.h> |
| #include <lib/fit/function.h> |
| |
| #include "peridot/lib/convert/convert.h" |
| #include "src/ledger/bin/p2p_sync/impl/message_generated.h" |
| #include "src/ledger/bin/storage/public/read_data_source.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/ledger/lib/coroutine/coroutine_waiter.h" |
| #include "src/lib/fxl/memory/ref_ptr.h" |
| |
| namespace p2p_sync { |
| namespace { |
| storage::ObjectIdentifier ToObjectIdentifier(const ObjectId* fb_object_id) { |
| uint32_t key_index = fb_object_id->key_index(); |
| uint32_t deletion_scope_id = fb_object_id->deletion_scope_id(); |
| return storage::ObjectIdentifier{ |
| key_index, deletion_scope_id, |
| storage::ObjectDigest(fb_object_id->digest())}; |
| } |
| } // namespace |
| |
| // PendingObjectRequestHolder holds state for object requests that have been |
| // sent to peers and for which we wait for an answer. |
| class PageCommunicatorImpl::PendingObjectRequestHolder { |
| public: |
| explicit PendingObjectRequestHolder( |
| fit::function<void(storage::Status, storage::ChangeSource, |
| storage::IsObjectSynced, |
| std::unique_ptr<storage::DataSource::DataChunk>)> |
| callback) |
| : callback_(std::move(callback)) {} |
| |
| void set_on_empty(fit::closure on_empty) { on_empty_ = std::move(on_empty); } |
| |
| // Registers a new pending request to device |destination|. |
| void AddNewPendingRequest(std::string destination) { |
| requests_.emplace(std::move(destination)); |
| } |
| |
| // Processes the response from device |source|. |
| void Complete(fxl::StringView source, const Object* object) { |
| auto it = requests_.find(source); |
| if (it == requests_.end()) { |
| return; |
| } |
| if (object == nullptr || object->status() == ObjectStatus_UNKNOWN_OBJECT) { |
| requests_.erase(it); |
| if (!requests_.empty()) { |
| return; |
| } |
| // All requests have returned and none is valid: return an error. |
| callback_(storage::Status::INTERNAL_NOT_FOUND, storage::ChangeSource::P2P, |
| storage::IsObjectSynced::NO, nullptr); |
| if (on_empty_) { |
| on_empty_(); |
| } |
| return; |
| } |
| |
| std::unique_ptr<storage::DataSource::DataChunk> chunk = |
| storage::DataSource::DataChunk::Create( |
| convert::ToString(object->data()->bytes())); |
| storage::IsObjectSynced is_object_synced; |
| switch (object->sync_status()) { |
| case ObjectSyncStatus_UNSYNCED: |
| is_object_synced = storage::IsObjectSynced::NO; |
| break; |
| case ObjectSyncStatus_SYNCED_TO_CLOUD: |
| is_object_synced = storage::IsObjectSynced::YES; |
| break; |
| } |
| callback_(storage::Status::OK, storage::ChangeSource::P2P, is_object_synced, |
| std::move(chunk)); |
| if (on_empty_) { |
| on_empty_(); |
| } |
| } |
| |
| private: |
| fit::function<void( |
| storage::Status, storage::ChangeSource, storage::IsObjectSynced, |
| std::unique_ptr<storage::DataSource::DataChunk>)> const callback_; |
| // Set of devices for which we are waiting an answer. |
| // We might be able to get rid of this list and just use a counter (or even |
| // nothing at all) once we have a timeout on requests. |
| std::set<std::string, convert::StringViewComparator> requests_; |
| fit::closure on_empty_; |
| }; |
| |
| // ObjectResponseHolder holds temporary data we collect in order to build |
| // ObjectResponses. |
| // This is necessary as object data (from |storage::Object|) and synchronization |
| // data come from different asynchronous calls. |
| struct PageCommunicatorImpl::ObjectResponseHolder { |
| storage::ObjectIdentifier identifier; |
| std::unique_ptr<const storage::Piece> piece; |
| bool is_synced = false; |
| |
| explicit ObjectResponseHolder(storage::ObjectIdentifier identifier) |
| : identifier(std::move(identifier)) {} |
| }; |
| |
| PageCommunicatorImpl::PageCommunicatorImpl( |
| coroutine::CoroutineService* coroutine_service, |
| storage::PageStorage* storage, storage::PageSyncClient* sync_client, |
| std::string namespace_id, std::string page_id, DeviceMesh* mesh) |
| : coroutine_manager_(coroutine_service), |
| namespace_id_(std::move(namespace_id)), |
| page_id_(std::move(page_id)), |
| mesh_(mesh), |
| storage_(storage), |
| sync_client_(sync_client), |
| weak_factory_(this) {} |
| |
| PageCommunicatorImpl::~PageCommunicatorImpl() { |
| FXL_DCHECK(!in_destructor_); |
| in_destructor_ = true; |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| if (!started_) { |
| if (on_delete_) { |
| on_delete_(); |
| } |
| return; |
| } |
| |
| BuildWatchStopBuffer(&buffer); |
| for (const auto& device : interested_devices_) { |
| mesh_->Send(device, buffer); |
| } |
| |
| if (on_delete_) { |
| on_delete_(); |
| } |
| } |
| |
| void PageCommunicatorImpl::Start() { |
| FXL_DCHECK(!started_); |
| started_ = true; |
| sync_client_->SetSyncDelegate(this); |
| storage_->AddCommitWatcher(this); |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildWatchStartBuffer(&buffer); |
| |
| for (const auto& device : mesh_->GetDeviceList()) { |
| mesh_->Send(device, buffer); |
| } |
| } |
| |
| void PageCommunicatorImpl::set_on_delete(fit::closure on_delete) { |
| FXL_DCHECK(!on_delete_) << "set_on_delete() can only be called once."; |
| on_delete_ = std::move(on_delete); |
| } |
| |
| void PageCommunicatorImpl::OnDeviceChange( |
| fxl::StringView remote_device, p2p_provider::DeviceChangeType change_type) { |
| if (!started_ || in_destructor_) { |
| return; |
| } |
| |
| if (change_type == p2p_provider::DeviceChangeType::DELETED) { |
| const auto& it = interested_devices_.find(remote_device); |
| if (it != interested_devices_.end()) { |
| interested_devices_.erase(it); |
| } |
| const auto& it2 = not_interested_devices_.find(remote_device); |
| if (it2 != not_interested_devices_.end()) { |
| not_interested_devices_.erase(it2); |
| } |
| const auto& it3 = pending_commit_batches_.find(remote_device); |
| if (it3 != pending_commit_batches_.end()) { |
| pending_commit_batches_.erase(it3); |
| } |
| return; |
| } |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildWatchStartBuffer(&buffer); |
| mesh_->Send(remote_device, buffer); |
| } |
| |
| void PageCommunicatorImpl::OnNewRequest(fxl::StringView source, |
| MessageHolder<Request> message) { |
| FXL_DCHECK(!in_destructor_); |
| switch (message->request_type()) { |
| case RequestMessage_WatchStartRequest: { |
| MarkSyncedToPeer( |
| [this, source = source.ToString()](storage::Status status) { |
| if (status != storage::Status::OK) { |
| // If we fail to mark the page storage as synced to a peer, we |
| // might end up in a situation of deleting from disk a partially |
| // synced page. Log an error and return. |
| FXL_LOG(ERROR) << "Failed to mark PageStorage as synced to peer"; |
| return; |
| } |
| if (interested_devices_.find(source) == interested_devices_.end()) { |
| interested_devices_.insert(source); |
| } |
| auto it = not_interested_devices_.find(source); |
| if (it != not_interested_devices_.end()) { |
| // The device used to be uninterested, but now wants updates. |
| // Let's contact it again. |
| not_interested_devices_.erase(it); |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildWatchStartBuffer(&buffer); |
| mesh_->Send(source, buffer); |
| } |
| }); |
| break; |
| } |
| case RequestMessage_WatchStopRequest: { |
| const auto& it = interested_devices_.find(source); |
| if (it != interested_devices_.end()) { |
| interested_devices_.erase(it); |
| } |
| // Device |source| disconnected, thus will not answer any request. We thus |
| // mark all pending requests to |source| to be finished. |
| std::vector<PendingObjectRequestHolder*> requests; |
| requests.reserve(pending_object_requests_.size()); |
| for (auto& object_request : pending_object_requests_) { |
| // We cannot call Complete here because it deletes the object, making |
| // the iterator used in this for loop invalid. |
| requests.push_back(&object_request.second); |
| } |
| for (PendingObjectRequestHolder* const request : requests) { |
| request->Complete(source, nullptr); |
| } |
| break; |
| } |
| case RequestMessage_CommitRequest: |
| ProcessCommitRequest( |
| source.ToString(), |
| std::move(message).TakeAndMap<CommitRequest>( |
| [](const Request* request) { |
| return static_cast<const CommitRequest*>(request->request()); |
| })); |
| break; |
| case RequestMessage_ObjectRequest: |
| ProcessObjectRequest( |
| source, |
| std::move(message).TakeAndMap<ObjectRequest>( |
| [](const Request* request) { |
| return static_cast<const ObjectRequest*>(request->request()); |
| })); |
| break; |
| case RequestMessage_NONE: |
| FXL_LOG(ERROR) << "The message received is malformed"; |
| break; |
| } |
| } |
| |
| void PageCommunicatorImpl::OnNewResponse(fxl::StringView source, |
| MessageHolder<Response> message) { |
| FXL_DCHECK(!in_destructor_); |
| if (message->status() != ResponseStatus_OK) { |
| // The namespace or page was unknown on the other side. We can probably do |
| // something smart with this information (for instance, stop sending |
| // requests over), but we just ignore it for now. |
| not_interested_devices_.emplace(source.ToString()); |
| return; |
| } |
| switch (message->response_type()) { |
| case ResponseMessage_ObjectResponse: { |
| const ObjectResponse* object_response = |
| static_cast<const ObjectResponse*>(message->response()); |
| for (const Object* object : *(object_response->objects())) { |
| auto object_id = ToObjectIdentifier(object->id()); |
| auto pending_request = pending_object_requests_.find(object_id); |
| if (pending_request == pending_object_requests_.end()) { |
| continue; |
| } |
| pending_request->second.Complete(source, object); |
| } |
| break; |
| } |
| case ResponseMessage_CommitResponse: { |
| const CommitResponse* commit_response = |
| static_cast<const CommitResponse*>(message->response()); |
| std::vector<storage::PageStorage::CommitIdAndBytes> commits; |
| for (const Commit* commit : *(commit_response->commits())) { |
| if (commit->status() != CommitStatus_OK) { |
| continue; |
| } |
| commits.emplace_back(convert::ToString(commit->id()->id()), |
| convert::ToString(commit->commit()->bytes())); |
| } |
| auto it = pending_commit_batches_.find(source); |
| if (it != pending_commit_batches_.end()) { |
| it->second.AddToBatch(std::move(commits)); |
| } else { |
| auto it_pair = pending_commit_batches_.emplace( |
| std::piecewise_construct, std::forward_as_tuple(source.ToString()), |
| std::forward_as_tuple(source.ToString(), this, storage_)); |
| it_pair.first->second.AddToBatch(std::move(commits)); |
| } |
| break; |
| } |
| |
| case ResponseMessage_NONE: |
| FXL_LOG(ERROR) << "The message received is malformed"; |
| return; |
| } |
| } |
| |
| void PageCommunicatorImpl::GetObject( |
| storage::ObjectIdentifier object_identifier, |
| fit::function<void(storage::Status, storage::ChangeSource, |
| storage::IsObjectSynced, |
| std::unique_ptr<storage::DataSource::DataChunk>)> |
| callback) { |
| flatbuffers::FlatBufferBuilder buffer; |
| |
| BuildObjectRequestBuffer(&buffer, object_identifier); |
| |
| auto request_holder = pending_object_requests_.emplace( |
| std::move(object_identifier), std::move(callback)); |
| |
| for (const auto& device : interested_devices_) { |
| request_holder.first->second.AddNewPendingRequest(device); |
| } |
| for (const auto& device : interested_devices_) { |
| mesh_->Send(device, buffer); |
| } |
| } |
| |
| void PageCommunicatorImpl::OnNewCommits( |
| const std::vector<std::unique_ptr<const storage::Commit>>& commits, |
| storage::ChangeSource source) { |
| if (source != storage::ChangeSource::LOCAL) { |
| // Don't propagate synced commits. |
| return; |
| } |
| for (const auto& commit : commits) { |
| commits_to_upload_.emplace_back(commit->Clone()); |
| } |
| // We need to check if we need to merge first. |
| std::vector<std::unique_ptr<const storage::Commit>> head_commits; |
| storage::Status status = storage_->GetHeadCommits(&head_commits); |
| if (status != storage::Status::OK) { |
| return; |
| } |
| if (head_commits.size() != 1) { |
| // A merge needs to happen, let's wait until we |
| // have one. |
| return; |
| } |
| if (commits_to_upload_.empty()) { |
| // Commits have already been sent. Let's stop |
| // early. |
| return; |
| } |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildCommitBuffer(&buffer, commits_to_upload_); |
| |
| for (const auto& device : interested_devices_) { |
| mesh_->Send(device, buffer); |
| } |
| commits_to_upload_.clear(); |
| } |
| |
| void PageCommunicatorImpl::RequestCommits(fxl::StringView device, |
| std::vector<storage::CommitId> ids) { |
| flatbuffers::FlatBufferBuilder buffer; |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(buffer, |
| convert::ToFlatBufferVector(&buffer, namespace_id_), |
| convert::ToFlatBufferVector(&buffer, page_id_)); |
| std::vector<flatbuffers::Offset<CommitId>> commit_ids; |
| for (const auto& id : ids) { |
| flatbuffers::Offset<CommitId> commit_id = |
| CreateCommitId(buffer, convert::ToFlatBufferVector(&buffer, id)); |
| commit_ids.push_back(commit_id); |
| } |
| flatbuffers::Offset<CommitRequest> commit_request = |
| CreateCommitRequest(buffer, buffer.CreateVector(commit_ids)); |
| flatbuffers::Offset<Request> request = |
| CreateRequest(buffer, namespace_page_id, RequestMessage_CommitRequest, |
| commit_request.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(buffer, MessageUnion_Request, request.Union()); |
| buffer.Finish(message); |
| mesh_->Send(device, buffer); |
| } |
| |
| void PageCommunicatorImpl::BuildWatchStartBuffer( |
| flatbuffers::FlatBufferBuilder* buffer) { |
| flatbuffers::Offset<WatchStartRequest> watch_start = |
| CreateWatchStartRequest(*buffer); |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| flatbuffers::Offset<Request> request = |
| CreateRequest(*buffer, namespace_page_id, |
| RequestMessage_WatchStartRequest, watch_start.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Request, request.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::BuildWatchStopBuffer( |
| flatbuffers::FlatBufferBuilder* buffer) { |
| flatbuffers::Offset<WatchStopRequest> watch_stop = |
| CreateWatchStopRequest(*buffer); |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| flatbuffers::Offset<Request> request = |
| CreateRequest(*buffer, namespace_page_id, RequestMessage_WatchStopRequest, |
| watch_stop.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Request, request.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::BuildObjectRequestBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| storage::ObjectIdentifier object_identifier) { |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| flatbuffers::Offset<ObjectId> object_id = CreateObjectId( |
| *buffer, object_identifier.key_index(), |
| object_identifier.deletion_scope_id(), |
| convert::ToFlatBufferVector( |
| buffer, object_identifier.object_digest().Serialize())); |
| flatbuffers::Offset<ObjectRequest> object_request = CreateObjectRequest( |
| *buffer, buffer->CreateVector( |
| std::vector<flatbuffers::Offset<ObjectId>>({object_id}))); |
| flatbuffers::Offset<Request> request = |
| CreateRequest(*buffer, namespace_page_id, RequestMessage_ObjectRequest, |
| object_request.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Request, request.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::BuildCommitBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| const std::vector<std::unique_ptr<const storage::Commit>>& commits) { |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| std::vector<flatbuffers::Offset<Commit>> fb_commits; |
| for (const auto& commit : commits) { |
| flatbuffers::Offset<CommitId> fb_commit_id = CreateCommitId( |
| *buffer, convert::ToFlatBufferVector(buffer, commit->GetId())); |
| flatbuffers::Offset<Data> fb_commit_data = CreateData( |
| *buffer, |
| convert::ToFlatBufferVector(buffer, commit->GetStorageBytes())); |
| fb_commits.emplace_back( |
| CreateCommit(*buffer, fb_commit_id, CommitStatus_OK, fb_commit_data)); |
| } |
| |
| flatbuffers::Offset<CommitResponse> commit_response = |
| CreateCommitResponse(*buffer, buffer->CreateVector(fb_commits)); |
| flatbuffers::Offset<Response> response = |
| CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id, |
| ResponseMessage_CommitResponse, commit_response.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Response, response.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::BuildCommitResponseBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| const std::vector< |
| std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>& |
| commits) { |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| std::vector<flatbuffers::Offset<Commit>> fb_commits; |
| for (const auto& commit : commits) { |
| flatbuffers::Offset<CommitId> fb_commit_id = CreateCommitId( |
| *buffer, convert::ToFlatBufferVector(buffer, commit.first)); |
| if (commit.second) { |
| flatbuffers::Offset<Data> fb_commit_data = |
| CreateData(*buffer, convert::ToFlatBufferVector( |
| buffer, commit.second->GetStorageBytes())); |
| fb_commits.emplace_back( |
| CreateCommit(*buffer, fb_commit_id, CommitStatus_OK, fb_commit_data)); |
| } else { |
| fb_commits.emplace_back( |
| CreateCommit(*buffer, fb_commit_id, CommitStatus_UNKNOWN_COMMIT)); |
| } |
| } |
| |
| flatbuffers::Offset<CommitResponse> commit_response = |
| CreateCommitResponse(*buffer, buffer->CreateVector(fb_commits)); |
| flatbuffers::Offset<Response> response = |
| CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id, |
| ResponseMessage_CommitResponse, commit_response.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Response, response.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::ProcessCommitRequest( |
| std::string source, MessageHolder<CommitRequest> request) { |
| coroutine_manager_.StartCoroutine([this, source = std::move(source), |
| request = std::move(request)]( |
| coroutine::CoroutineHandler* handler) { |
| auto commit_waiter = fxl::MakeRefCounted<callback::Waiter< |
| storage::Status, |
| std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>>( |
| storage::Status::OK); |
| for (const CommitId* id : *request->commit_ids()) { |
| storage_->GetCommit( |
| id->id(), [commit_id = convert::ToString(id->id()), |
| callback = commit_waiter->NewCallback()]( |
| storage::Status status, |
| std::unique_ptr<const storage::Commit> commit) mutable { |
| if (status == storage::Status::INTERNAL_NOT_FOUND) { |
| // Not finding an commit is okay in this context: we'll just |
| // reply we don't have it. There is not need to abort |
| // processing the request. |
| callback(storage::Status::OK, |
| std::make_pair(std::move(commit_id), nullptr)); |
| return; |
| } |
| callback(status, |
| std::make_pair(std::move(commit_id), std::move(commit))); |
| }); |
| } |
| storage::Status status; |
| std::vector< |
| std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>> |
| commits; |
| if (coroutine::Wait(handler, std::move(commit_waiter), &status, &commits) == |
| coroutine::ContinuationStatus::INTERRUPTED) { |
| return; |
| } |
| |
| if (status != storage::Status::OK) { |
| return; |
| } |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildCommitResponseBuffer(&buffer, commits); |
| mesh_->Send(source, buffer); |
| }); |
| } |
| |
| void PageCommunicatorImpl::ProcessObjectRequest( |
| fxl::StringView source, MessageHolder<ObjectRequest> request) { |
| coroutine_manager_.StartCoroutine([this, source = source.ToString(), |
| request = std::move(request)]( |
| coroutine::CoroutineHandler* handler) { |
| // We use a std::list so that we can keep a reference to an element |
| // while adding new items. |
| std::list<ObjectResponseHolder> object_responses; |
| auto response_waiter = |
| fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>( |
| storage::Status::OK); |
| for (const ObjectId* object_id : *request->object_ids()) { |
| storage::ObjectIdentifier identifier{ |
| object_id->key_index(), object_id->deletion_scope_id(), |
| storage::ObjectDigest(object_id->digest())}; |
| object_responses.emplace_back(identifier); |
| auto& response = object_responses.back(); |
| storage_->GetPiece( |
| identifier, [callback = response_waiter->NewCallback(), &response]( |
| storage::Status status, |
| std::unique_ptr<const storage::Piece> piece) mutable { |
| if (status == storage::Status::INTERNAL_NOT_FOUND) { |
| // Not finding an object is okay in this context: we'll just |
| // reply we don't have it. There is not need to abort |
| // processing the request. |
| callback(storage::Status::OK); |
| return; |
| } |
| response.piece = std::move(piece); |
| callback(status); |
| }); |
| storage_->IsPieceSynced( |
| std::move(identifier), |
| [callback = response_waiter->NewCallback(), &response]( |
| storage::Status status, bool is_synced) { |
| if (status == storage::Status::INTERNAL_NOT_FOUND) { |
| // Not finding an object is okay in this context: we'll just |
| // reply we don't have it. There is not need to abort |
| // processing the request. |
| callback(storage::Status::OK); |
| return; |
| } |
| response.is_synced = is_synced; |
| callback(status); |
| }); |
| } |
| |
| storage::Status status; |
| if (coroutine::Wait(handler, response_waiter, &status) == |
| coroutine::ContinuationStatus::INTERRUPTED) { |
| return; |
| } |
| |
| if (status != storage::Status::OK) { |
| FXL_LOG(WARNING) << "Error while retrieving objects: " << status; |
| return; |
| } |
| |
| flatbuffers::FlatBufferBuilder buffer; |
| BuildObjectResponseBuffer(&buffer, std::move(object_responses)); |
| |
| mesh_->Send(source, buffer); |
| }); |
| } |
| |
| void PageCommunicatorImpl::BuildObjectResponseBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| std::list<ObjectResponseHolder> object_responses) { |
| flatbuffers::Offset<NamespacePageId> namespace_page_id = |
| CreateNamespacePageId(*buffer, |
| convert::ToFlatBufferVector(buffer, namespace_id_), |
| convert::ToFlatBufferVector(buffer, page_id_)); |
| std::vector<flatbuffers::Offset<Object>> fb_objects; |
| for (const ObjectResponseHolder& object_response : object_responses) { |
| flatbuffers::Offset<ObjectId> fb_object_id = CreateObjectId( |
| *buffer, object_response.identifier.key_index(), |
| object_response.identifier.deletion_scope_id(), |
| convert::ToFlatBufferVector( |
| buffer, object_response.identifier.object_digest().Serialize())); |
| if (object_response.piece) { |
| fxl::StringView data = object_response.piece->GetData(); |
| flatbuffers::Offset<Data> fb_data = |
| CreateData(*buffer, convert::ToFlatBufferVector(buffer, data)); |
| ObjectSyncStatus sync_status = object_response.is_synced |
| ? ObjectSyncStatus_SYNCED_TO_CLOUD |
| : ObjectSyncStatus_UNSYNCED; |
| fb_objects.emplace_back(CreateObject( |
| *buffer, fb_object_id, ObjectStatus_OK, fb_data, sync_status)); |
| } else { |
| fb_objects.emplace_back( |
| CreateObject(*buffer, fb_object_id, ObjectStatus_UNKNOWN_OBJECT)); |
| } |
| } |
| flatbuffers::Offset<ObjectResponse> object_response = |
| CreateObjectResponse(*buffer, buffer->CreateVector(fb_objects)); |
| flatbuffers::Offset<Response> response = |
| CreateResponse(*buffer, ResponseStatus_OK, namespace_page_id, |
| ResponseMessage_ObjectResponse, object_response.Union()); |
| flatbuffers::Offset<Message> message = |
| CreateMessage(*buffer, MessageUnion_Response, response.Union()); |
| buffer->Finish(message); |
| } |
| |
| void PageCommunicatorImpl::MarkSyncedToPeer( |
| fit::function<void(storage::Status)> callback) { |
| if (marked_as_synced_to_peer_) { |
| callback(storage::Status::OK); |
| return; |
| } |
| storage_->MarkSyncedToPeer( |
| [this, callback = std::move(callback)](storage::Status status) { |
| if (status == storage::Status::OK) { |
| marked_as_synced_to_peer_ = true; |
| } |
| callback(status); |
| }); |
| } |
| |
| } // namespace p2p_sync |