// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/ledger/sync_coordinator/impl/page_sync_impl.h"

#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>

namespace sync_coordinator {
namespace {
// Holder for a synchronization provider (cloud or peer-to-peer).
//
// This object handles communication between storage and the page synchronizer.
class SyncProviderHolderBase : public storage::PageSyncClient,
                               public storage::PageSyncDelegate {
 public:
  SyncProviderHolderBase();
  ~SyncProviderHolderBase() override;

  // storage::PageSyncClient:
  void SetSyncDelegate(storage::PageSyncDelegate* page_sync) override;

  // PageSyncDelegate:
  void GetObject(
      storage::ObjectIdentifier object_identifier,
      fit::function<void(storage::Status status,
                         storage::ChangeSource change_source,
                         storage::IsObjectSynced is_object_synced,
                         std::unique_ptr<storage::DataSource::DataChunk>)>
          callback) override;

 private:
  storage::PageSyncDelegate* page_sync_delegate_;
};

SyncProviderHolderBase::SyncProviderHolderBase() {}

SyncProviderHolderBase::~SyncProviderHolderBase() {}

void SyncProviderHolderBase::SetSyncDelegate(
    storage::PageSyncDelegate* page_sync) {
  page_sync_delegate_ = page_sync;
}

void SyncProviderHolderBase::GetObject(
    storage::ObjectIdentifier object_identifier,
    fit::function<void(storage::Status status,
                       storage::ChangeSource change_source,
                       storage::IsObjectSynced is_object_synced,
                       std::unique_ptr<storage::DataSource::DataChunk>)>
        callback) {
  page_sync_delegate_->GetObject(std::move(object_identifier),
                                 std::move(callback));
}
}  // namespace

class PageSyncImpl::CloudSyncHolder : public SyncProviderHolderBase {
 public:
  CloudSyncHolder();
  ~CloudSyncHolder() override;

  void SetCloudSync(std::unique_ptr<cloud_sync::PageSync> cloud_sync);
  cloud_sync::PageSync* GetCloudSync();

 private:
  std::unique_ptr<cloud_sync::PageSync> cloud_sync_;
};

PageSyncImpl::CloudSyncHolder::CloudSyncHolder() {}

PageSyncImpl::CloudSyncHolder::~CloudSyncHolder() {}

void PageSyncImpl::CloudSyncHolder::SetCloudSync(
    std::unique_ptr<cloud_sync::PageSync> cloud_sync) {
  FXL_DCHECK(!cloud_sync_);
  cloud_sync_ = std::move(cloud_sync);
}

cloud_sync::PageSync* PageSyncImpl::CloudSyncHolder::GetCloudSync() {
  FXL_DCHECK(cloud_sync_);
  return cloud_sync_.get();
}

class PageSyncImpl::P2PSyncHolder : public SyncProviderHolderBase {
 public:
  P2PSyncHolder();
  ~P2PSyncHolder() override;

  void SetP2PSync(std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync);
  p2p_sync::PageCommunicator* GetP2PSync();

 private:
  std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync_;
};

PageSyncImpl::P2PSyncHolder::P2PSyncHolder() {}

PageSyncImpl::P2PSyncHolder::~P2PSyncHolder() {}

void PageSyncImpl::P2PSyncHolder::SetP2PSync(
    std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync) {
  FXL_DCHECK(!p2p_sync_);
  p2p_sync_ = std::move(p2p_sync);
}

p2p_sync::PageCommunicator* PageSyncImpl::P2PSyncHolder::GetP2PSync() {
  FXL_DCHECK(p2p_sync_);
  return p2p_sync_.get();
}

PageSyncImpl::PageSyncImpl(storage::PageStorage* storage,
                           storage::PageSyncClient* sync_client)
    : storage_(storage), sync_client_(sync_client) {
  FXL_DCHECK(storage_);
  FXL_DCHECK(sync_client_);
}

PageSyncImpl::~PageSyncImpl() {}

storage::PageSyncClient* PageSyncImpl::CreateCloudSyncClient() {
  FXL_DCHECK(!cloud_sync_);
  cloud_sync_ = std::make_unique<CloudSyncHolder>();
  return cloud_sync_.get();
}

void PageSyncImpl::SetCloudSync(
    std::unique_ptr<cloud_sync::PageSync> cloud_sync) {
  FXL_DCHECK(cloud_sync_);
  if (!cloud_sync) {
    // Cloud sync failed to produce an initialized |cloud_sync| instance - e.g.
    // because cloud provider is disconnected. Unset the entire cloud sync
    // holder to disable the cloud sync logic.
    cloud_sync_.reset();
    return;
  }

  cloud_sync->SetOnUnrecoverableError([this] {
    FXL_LOG(WARNING) << "Shutting down page cloud sync.";
    // TODO(ppi): handle recovery from cloud provider disconnection, LE-567.
    cloud_sync_.reset();
  });
  cloud_sync_->SetCloudSync(std::move(cloud_sync));
}

storage::PageSyncClient* PageSyncImpl::CreateP2PSyncClient() {
  FXL_DCHECK(!p2p_sync_);
  p2p_sync_ = std::make_unique<P2PSyncHolder>();
  return p2p_sync_.get();
}

void PageSyncImpl::SetP2PSync(
    std::unique_ptr<p2p_sync::PageCommunicator> p2p_sync) {
  FXL_DCHECK(p2p_sync_);
  p2p_sync_->SetP2PSync(std::move(p2p_sync));
}

void PageSyncImpl::Start() {
  sync_client_->SetSyncDelegate(this);
  if (cloud_sync_) {
    cloud_sync_->GetCloudSync()->Start();
  }
  if (p2p_sync_) {
    p2p_sync_->GetP2PSync()->Start();
  }
}

void PageSyncImpl::SetOnIdle(fit::closure on_idle) {
  // Only handle cloud sync for now.
  if (cloud_sync_) {
    cloud_sync_->GetCloudSync()->SetOnIdle(std::move(on_idle));
  }
}

bool PageSyncImpl::IsIdle() {
  if (cloud_sync_) {
    return cloud_sync_->GetCloudSync()->IsIdle();
  }
  return true;
}

void PageSyncImpl::SetOnBacklogDownloaded(fit::closure on_backlog_downloaded) {
  if (cloud_sync_) {
    cloud_sync_->GetCloudSync()->SetOnBacklogDownloaded(
        std::move(on_backlog_downloaded));
  }
}

void PageSyncImpl::SetSyncWatcher(SyncStateWatcher* watcher) {
  watcher_ = std::make_unique<SyncWatcherConverter>(watcher);
  if (cloud_sync_) {
    cloud_sync_->GetCloudSync()->SetSyncWatcher(watcher_.get());
  }
}

void PageSyncImpl::GetObject(
    storage::ObjectIdentifier object_identifier,
    fit::function<void(storage::Status, storage::ChangeSource,
                       storage::IsObjectSynced is_object_synced,
                       std::unique_ptr<storage::DataSource::DataChunk>)>
        callback) {
  // AnyWaiter returns the first successful value to its Finalize callback. For
  // example, if P2P returns before cloud with a NOT_FOUND status, then we will
  // wait for Cloud to return; if P2P returns with an OK status, we will pass
  // the P2P-returned value immediately.
  auto waiter = fxl::MakeRefCounted<callback::AnyWaiter<
      storage::Status,
      std::tuple<storage::ChangeSource, storage::IsObjectSynced,
                 std::unique_ptr<storage::DataSource::DataChunk>>>>(
      storage::Status::OK, storage::Status::NOT_FOUND,
      std::tuple<storage::ChangeSource, storage::IsObjectSynced,
                 std::unique_ptr<storage::DataSource::DataChunk>>());
  if (cloud_sync_) {
    cloud_sync_->GetObject(
        object_identifier,
        [callback = waiter->NewCallback()](
            storage::Status status, storage::ChangeSource source,
            storage::IsObjectSynced is_object_synced,
            std::unique_ptr<storage::DataSource::DataChunk> data) {
          callback(status,
                   std::make_tuple(source, is_object_synced, std::move(data)));
        });
  }
  if (p2p_sync_) {
    p2p_sync_->GetObject(
        std::move(object_identifier),
        [callback = waiter->NewCallback()](
            storage::Status status, storage::ChangeSource source,
            storage::IsObjectSynced is_object_synced,
            std::unique_ptr<storage::DataSource::DataChunk> data) {
          callback(status,
                   std::make_tuple(source, is_object_synced, std::move(data)));
        });
  }
  waiter->Finalize(
      [callback = std::move(callback)](
          storage::Status status,
          std::tuple<storage::ChangeSource, storage::IsObjectSynced,
                     std::unique_ptr<storage::DataSource::DataChunk>>
              data) {
        callback(status, std::get<0>(data), std::get<1>(data),
                 std::move(std::get<2>(data)));
      });
}

}  // namespace sync_coordinator
