| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_ |
| #define PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_ |
| |
| #include <flatbuffers/flatbuffers.h> |
| #include <lib/callback/auto_cleanable.h> |
| #include <lib/callback/cancellable.h> |
| #include <lib/callback/waiter.h> |
| #include <lib/fit/function.h> |
| #include <lib/fxl/memory/weak_ptr.h> |
| |
| #include "peridot/bin/ledger/coroutine/coroutine.h" |
| #include "peridot/bin/ledger/coroutine/coroutine_manager.h" |
| #include "peridot/bin/ledger/p2p_provider/public/types.h" |
| #include "peridot/bin/ledger/p2p_sync/impl/commit_batch.h" |
| #include "peridot/bin/ledger/p2p_sync/impl/device_mesh.h" |
| #include "peridot/bin/ledger/p2p_sync/impl/message_generated.h" |
| #include "peridot/bin/ledger/p2p_sync/impl/message_holder.h" |
| #include "peridot/bin/ledger/p2p_sync/public/page_communicator.h" |
| #include "peridot/bin/ledger/storage/public/commit_watcher.h" |
| #include "peridot/bin/ledger/storage/public/page_storage.h" |
| #include "peridot/bin/ledger/storage/public/page_sync_client.h" |
| #include "peridot/bin/ledger/storage/public/page_sync_delegate.h" |
| #include "peridot/lib/convert/convert.h" |
| |
| namespace p2p_sync { |
| class PageCommunicatorImplInspectorForTest; |
| |
| class PageCommunicatorImpl : public PageCommunicator, |
| public storage::PageSyncDelegate, |
| public storage::CommitWatcher, |
| public CommitBatch::Delegate { |
| public: |
| PageCommunicatorImpl(coroutine::CoroutineService* coroutine_service, |
| storage::PageStorage* storage, |
| storage::PageSyncClient* sync_client, |
| std::string namespace_id, std::string page_id, |
| DeviceMesh* mesh); |
| ~PageCommunicatorImpl() override; |
| |
| void set_on_delete(fit::closure on_delete); |
| |
| // OnDeviceChange is called each time a device connects or unconnects. |
| void OnDeviceChange(fxl::StringView remote_device, |
| p2p_provider::DeviceChangeType change_type); |
| |
| // Called when a new request arrived for this page from device |source|. |
| void OnNewRequest(fxl::StringView source, MessageHolder<Request> message); |
| |
| // Called when a new response arrived for this page from device |source|. |
| void OnNewResponse(fxl::StringView source, MessageHolder<Response> message); |
| |
| // PageCommunicator: |
| void Start() override; |
| |
| // storage::PageSyncDelegate: |
| void GetObject( |
| storage::ObjectIdentifier object_identifier, |
| fit::function<void(storage::Status, storage::ChangeSource, |
| storage::IsObjectSynced, |
| std::unique_ptr<storage::DataSource::DataChunk>)> |
| callback) override; |
| |
| // storage::CommitWatcher: |
| void OnNewCommits( |
| const std::vector<std::unique_ptr<const storage::Commit>>& commits, |
| storage::ChangeSource source) override; |
| |
| private: |
| friend class PageCommunicatorImplInspectorForTest; |
| class PendingObjectRequestHolder; |
| struct ObjectResponseHolder; |
| |
| void RequestCommits(fxl::StringView device, |
| std::vector<storage::CommitId> ids) override; |
| |
| // These methods build the flatbuffer message corresponding to their name. |
| void BuildWatchStartBuffer(flatbuffers::FlatBufferBuilder* buffer); |
| void BuildWatchStopBuffer(flatbuffers::FlatBufferBuilder* buffer); |
| void BuildObjectRequestBuffer(flatbuffers::FlatBufferBuilder* buffer, |
| storage::ObjectIdentifier object_identifier); |
| void BuildCommitBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| const std::vector<std::unique_ptr<const storage::Commit>>& commits); |
| void BuildObjectResponseBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| std::list<ObjectResponseHolder> object_responses); |
| |
| // Processes an incoming CommitRequest object from device |source|. |
| void ProcessCommitRequest(std::string source, |
| MessageHolder<CommitRequest> request); |
| |
| // Builds a CommitResponse buffer in response to an incoming CommitRequest. |
| // This is different from |BuildCommitBuffer| which builds CommitResponse for |
| // a remote watcher. In particular, |BuildCommitBuffer|'s commits always |
| // exist. In this method, the pair's second element for a commit will be null |
| // if the commit does not exist on this device. |
| void BuildCommitResponseBuffer( |
| flatbuffers::FlatBufferBuilder* buffer, |
| const std::vector< |
| std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>& |
| commits); |
| |
| // Processes an incoming ObjectRequest object. |
| void ProcessObjectRequest(fxl::StringView source, |
| MessageHolder<ObjectRequest> request); |
| |
| // Marks the PageStorage as synced to a peer. If successful, on the following |
| // call to MarkSyncedToPeer, the given |callback| will be called immediately. |
| void MarkSyncedToPeer(fit::function<void(storage::Status)> callback); |
| |
| // Map of pending requests for objects. |
| callback::AutoCleanableMap<storage::ObjectIdentifier, |
| PendingObjectRequestHolder> |
| pending_object_requests_; |
| // Map of pending commit batch insertions. |
| callback::AutoCleanableMap<std::string, CommitBatch, |
| convert::StringViewComparator> |
| pending_commit_batches_; |
| // List of devices we know are interested in this page. |
| std::set<std::string, convert::StringViewComparator> interested_devices_; |
| // List of devices we know are not interested in this page. |
| std::set<std::string, convert::StringViewComparator> not_interested_devices_; |
| fit::closure on_delete_; |
| bool marked_as_synced_to_peer_ = false; |
| bool started_ = false; |
| bool in_destructor_ = false; |
| |
| // Commit upload: we queue commits to upload here while we check if a |
| // conflict exists. If it exist, we wait until it is resolved before |
| // uploading. |
| std::vector<std::unique_ptr<const storage::Commit>> commits_to_upload_; |
| |
| coroutine::CoroutineManager coroutine_manager_; |
| const std::string namespace_id_; |
| const std::string page_id_; |
| DeviceMesh* const mesh_; |
| storage::PageStorage* const storage_; |
| storage::PageSyncClient* const sync_client_; |
| |
| // This must be the last member of the class. |
| fxl::WeakPtrFactory<PageCommunicatorImpl> weak_factory_; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(PageCommunicatorImpl); |
| }; |
| |
| } // namespace p2p_sync |
| |
| #endif // PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_ |