blob: 025c049f5ce53962135e2d201752f5341cfe4f81 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_
#define PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_
#include <flatbuffers/flatbuffers.h>
#include <lib/callback/auto_cleanable.h>
#include <lib/callback/cancellable.h>
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/weak_ptr.h>
#include "peridot/bin/ledger/coroutine/coroutine.h"
#include "peridot/bin/ledger/coroutine/coroutine_manager.h"
#include "peridot/bin/ledger/p2p_provider/public/types.h"
#include "peridot/bin/ledger/p2p_sync/impl/commit_batch.h"
#include "peridot/bin/ledger/p2p_sync/impl/device_mesh.h"
#include "peridot/bin/ledger/p2p_sync/impl/message_generated.h"
#include "peridot/bin/ledger/p2p_sync/impl/message_holder.h"
#include "peridot/bin/ledger/p2p_sync/public/page_communicator.h"
#include "peridot/bin/ledger/storage/public/commit_watcher.h"
#include "peridot/bin/ledger/storage/public/page_storage.h"
#include "peridot/bin/ledger/storage/public/page_sync_client.h"
#include "peridot/bin/ledger/storage/public/page_sync_delegate.h"
#include "peridot/lib/convert/convert.h"
namespace p2p_sync {
class PageCommunicatorImplInspectorForTest;
class PageCommunicatorImpl : public PageCommunicator,
public storage::PageSyncDelegate,
public storage::CommitWatcher,
public CommitBatch::Delegate {
public:
PageCommunicatorImpl(coroutine::CoroutineService* coroutine_service,
storage::PageStorage* storage,
storage::PageSyncClient* sync_client,
std::string namespace_id, std::string page_id,
DeviceMesh* mesh);
~PageCommunicatorImpl() override;
void set_on_delete(fit::closure on_delete);
// OnDeviceChange is called each time a device connects or unconnects.
void OnDeviceChange(fxl::StringView remote_device,
p2p_provider::DeviceChangeType change_type);
// Called when a new request arrived for this page from device |source|.
void OnNewRequest(fxl::StringView source, MessageHolder<Request> message);
// Called when a new response arrived for this page from device |source|.
void OnNewResponse(fxl::StringView source, MessageHolder<Response> message);
// PageCommunicator:
void Start() override;
// storage::PageSyncDelegate:
void GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) override;
// storage::CommitWatcher:
void OnNewCommits(
const std::vector<std::unique_ptr<const storage::Commit>>& commits,
storage::ChangeSource source) override;
private:
friend class PageCommunicatorImplInspectorForTest;
class PendingObjectRequestHolder;
struct ObjectResponseHolder;
void RequestCommits(fxl::StringView device,
std::vector<storage::CommitId> ids) override;
// These methods build the flatbuffer message corresponding to their name.
void BuildWatchStartBuffer(flatbuffers::FlatBufferBuilder* buffer);
void BuildWatchStopBuffer(flatbuffers::FlatBufferBuilder* buffer);
void BuildObjectRequestBuffer(flatbuffers::FlatBufferBuilder* buffer,
storage::ObjectIdentifier object_identifier);
void BuildCommitBuffer(
flatbuffers::FlatBufferBuilder* buffer,
const std::vector<std::unique_ptr<const storage::Commit>>& commits);
void BuildObjectResponseBuffer(
flatbuffers::FlatBufferBuilder* buffer,
std::list<ObjectResponseHolder> object_responses);
// Processes an incoming CommitRequest object from device |source|.
void ProcessCommitRequest(std::string source,
MessageHolder<CommitRequest> request);
// Builds a CommitResponse buffer in response to an incoming CommitRequest.
// This is different from |BuildCommitBuffer| which builds CommitResponse for
// a remote watcher. In particular, |BuildCommitBuffer|'s commits always
// exist. In this method, the pair's second element for a commit will be null
// if the commit does not exist on this device.
void BuildCommitResponseBuffer(
flatbuffers::FlatBufferBuilder* buffer,
const std::vector<
std::pair<storage::CommitId, std::unique_ptr<const storage::Commit>>>&
commits);
// Processes an incoming ObjectRequest object.
void ProcessObjectRequest(fxl::StringView source,
MessageHolder<ObjectRequest> request);
// Marks the PageStorage as synced to a peer. If successful, on the following
// call to MarkSyncedToPeer, the given |callback| will be called immediately.
void MarkSyncedToPeer(fit::function<void(storage::Status)> callback);
// Map of pending requests for objects.
callback::AutoCleanableMap<storage::ObjectIdentifier,
PendingObjectRequestHolder>
pending_object_requests_;
// Map of pending commit batch insertions.
callback::AutoCleanableMap<std::string, CommitBatch,
convert::StringViewComparator>
pending_commit_batches_;
// List of devices we know are interested in this page.
std::set<std::string, convert::StringViewComparator> interested_devices_;
// List of devices we know are not interested in this page.
std::set<std::string, convert::StringViewComparator> not_interested_devices_;
fit::closure on_delete_;
bool marked_as_synced_to_peer_ = false;
bool started_ = false;
bool in_destructor_ = false;
// Commit upload: we queue commits to upload here while we check if a
// conflict exists. If it exist, we wait until it is resolved before
// uploading.
std::vector<std::unique_ptr<const storage::Commit>> commits_to_upload_;
coroutine::CoroutineManager coroutine_manager_;
const std::string namespace_id_;
const std::string page_id_;
DeviceMesh* const mesh_;
storage::PageStorage* const storage_;
storage::PageSyncClient* const sync_client_;
// This must be the last member of the class.
fxl::WeakPtrFactory<PageCommunicatorImpl> weak_factory_;
FXL_DISALLOW_COPY_AND_ASSIGN(PageCommunicatorImpl);
};
} // namespace p2p_sync
#endif // PERIDOT_BIN_LEDGER_P2P_SYNC_IMPL_PAGE_COMMUNICATOR_IMPL_H_