blob: 8ddb8c1f8eab39262ff6c5bbf00f469b666b8b9b [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/sync_coordinator/impl/page_sync_impl.h"
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>
namespace sync_coordinator {
namespace {
// Holder for a synchronization provider (cloud or peer-to-peer).
//
// This object handles communication between storage and the page synchronizer.
class SyncProviderHolderBase : public storage::PageSyncClient,
public storage::PageSyncDelegate {
public:
SyncProviderHolderBase();
~SyncProviderHolderBase() override;
// storage::PageSyncClient:
void SetSyncDelegate(storage::PageSyncDelegate* page_sync) override;
// PageSyncDelegate:
void GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status status,
storage::ChangeSource change_source,
storage::IsObjectSynced is_object_synced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) override;
private:
storage::PageSyncDelegate* page_sync_delegate_;
};
SyncProviderHolderBase::SyncProviderHolderBase() {}
SyncProviderHolderBase::~SyncProviderHolderBase() {}
void SyncProviderHolderBase::SetSyncDelegate(
storage::PageSyncDelegate* page_sync) {
page_sync_delegate_ = page_sync;
}
void SyncProviderHolderBase::GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status status,
storage::ChangeSource change_source,
storage::IsObjectSynced is_object_synced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
page_sync_delegate_->GetObject(std::move(object_identifier),
std::move(callback));
}
} // namespace
class PageSyncImpl::CloudSyncHolder : public SyncProviderHolderBase {
public:
CloudSyncHolder();
~CloudSyncHolder() override;
void SetCloudSync(std::unique_ptr<cloud_sync::PageSync> cloud_sync);
cloud_sync::PageSync* GetCloudSync();
private:
std::unique_ptr<cloud_sync::PageSync> cloud_sync_;
};
PageSyncImpl::CloudSyncHolder::CloudSyncHolder() {}
PageSyncImpl::CloudSyncHolder::~CloudSyncHolder() {}
void PageSyncImpl::CloudSyncHolder::SetCloudSync(
std::unique_ptr<cloud_sync::PageSync> cloud_sync) {
FXL_DCHECK(!cloud_sync_);
cloud_sync_ = std::move(cloud_sync);
}
cloud_sync::PageSync* PageSyncImpl::CloudSyncHolder::GetCloudSync() {
FXL_DCHECK(cloud_sync_);
return cloud_sync_.get();
}
class PageSyncImpl::P2PSyncHolder : public SyncProviderHolderBase {
public:
P2PSyncHolder();
~P2PSyncHolder() override;
void SetP2PSync(std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync);
p2p_sync::PageCommunicator* GetP2PSync();
private:
std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync_;
};
PageSyncImpl::P2PSyncHolder::P2PSyncHolder() {}
PageSyncImpl::P2PSyncHolder::~P2PSyncHolder() {}
void PageSyncImpl::P2PSyncHolder::SetP2PSync(
std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync) {
FXL_DCHECK(!p2p_sync_);
p2p_sync_ = std::move(p2p_sync);
}
p2p_sync::PageCommunicator* PageSyncImpl::P2PSyncHolder::GetP2PSync() {
FXL_DCHECK(p2p_sync_);
return p2p_sync_.get();
}
PageSyncImpl::PageSyncImpl(storage::PageStorage* storage,
storage::PageSyncClient* sync_client)
: storage_(storage), sync_client_(sync_client) {
FXL_DCHECK(storage_);
FXL_DCHECK(sync_client_);
}
PageSyncImpl::~PageSyncImpl() {}
storage::PageSyncClient* PageSyncImpl::CreateCloudSyncClient() {
FXL_DCHECK(!cloud_sync_);
cloud_sync_ = std::make_unique<CloudSyncHolder>();
return cloud_sync_.get();
}
void PageSyncImpl::SetCloudSync(
std::unique_ptr<cloud_sync::PageSync> cloud_sync) {
FXL_DCHECK(cloud_sync_);
if (!cloud_sync) {
// Cloud sync failed to produce an initialized |cloud_sync| instance - e.g.
// because cloud provider is disconnected. Unset the entire cloud sync
// holder to disable the cloud sync logic.
cloud_sync_.reset();
return;
}
cloud_sync->SetOnUnrecoverableError([this] {
FXL_LOG(WARNING) << "Shutting down page cloud sync.";
// TODO(ppi): handle recovery from cloud provider disconnection, LE-567.
cloud_sync_.reset();
});
cloud_sync_->SetCloudSync(std::move(cloud_sync));
}
storage::PageSyncClient* PageSyncImpl::CreateP2PSyncClient() {
FXL_DCHECK(!p2p_sync_);
p2p_sync_ = std::make_unique<P2PSyncHolder>();
return p2p_sync_.get();
}
void PageSyncImpl::SetP2PSync(
std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync) {
FXL_DCHECK(p2p_sync_);
p2p_sync_->SetP2PSync(std::move(p2p_sync));
}
void PageSyncImpl::Start() {
sync_client_->SetSyncDelegate(this);
if (cloud_sync_) {
cloud_sync_->GetCloudSync()->Start();
}
if (p2p_sync_) {
p2p_sync_->GetP2PSync()->Start();
}
}
void PageSyncImpl::SetOnIdle(fit::closure on_idle) {
// Only handle cloud sync for now.
if (cloud_sync_) {
cloud_sync_->GetCloudSync()->SetOnIdle(std::move(on_idle));
}
}
bool PageSyncImpl::IsIdle() {
if (cloud_sync_) {
return cloud_sync_->GetCloudSync()->IsIdle();
}
return true;
}
void PageSyncImpl::SetOnBacklogDownloaded(fit::closure on_backlog_downloaded) {
if (cloud_sync_) {
cloud_sync_->GetCloudSync()->SetOnBacklogDownloaded(
std::move(on_backlog_downloaded));
}
}
void PageSyncImpl::SetSyncWatcher(SyncStateWatcher* watcher) {
watcher_ = std::make_unique<SyncWatcherConverter>(watcher);
if (cloud_sync_) {
cloud_sync_->GetCloudSync()->SetSyncWatcher(watcher_.get());
}
}
void PageSyncImpl::GetObject(
storage::ObjectIdentifier object_identifier,
fit::function<void(storage::Status, storage::ChangeSource,
storage::IsObjectSynced is_object_synced,
std::unique_ptr<storage::DataSource::DataChunk>)>
callback) {
// AnyWaiter returns the first successful value to its Finalize callback. For
// example, if P2P returns before cloud with a NOT_FOUND status, then we will
// wait for Cloud to return; if P2P returns with an OK status, we will pass
// the P2P-returned value immediately.
auto waiter = fxl::MakeRefCounted<callback::AnyWaiter<
storage::Status,
std::tuple<storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>>>>(
storage::Status::OK, storage::Status::NOT_FOUND,
std::tuple<storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>>());
if (cloud_sync_) {
cloud_sync_->GetObject(
object_identifier,
[callback = waiter->NewCallback()](
storage::Status status, storage::ChangeSource source,
storage::IsObjectSynced is_object_synced,
std::unique_ptr<storage::DataSource::DataChunk> data) {
callback(status,
std::make_tuple(source, is_object_synced, std::move(data)));
});
}
if (p2p_sync_) {
p2p_sync_->GetObject(
std::move(object_identifier),
[callback = waiter->NewCallback()](
storage::Status status, storage::ChangeSource source,
storage::IsObjectSynced is_object_synced,
std::unique_ptr<storage::DataSource::DataChunk> data) {
callback(status,
std::make_tuple(source, is_object_synced, std::move(data)));
});
}
waiter->Finalize(
[callback = std::move(callback)](
storage::Status status,
std::tuple<storage::ChangeSource, storage::IsObjectSynced,
std::unique_ptr<storage::DataSource::DataChunk>>
data) {
callback(status, std::get<0>(data), std::get<1>(data),
std::move(std::get<2>(data)));
});
}
} // namespace sync_coordinator