// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/ledger/app/page_manager.h"

#include <algorithm>

#include <lib/callback/trace_callback.h>
#include <lib/fit/function.h>
#include <lib/fxl/logging.h>
#include <zircon/syscalls.h>

#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/bin/ledger/fidl/include/types.h"
#include "peridot/bin/ledger/storage/impl/data_serialization.h"

namespace ledger {

PageManager::PageManager(Environment* environment,
                         std::unique_ptr<storage::PageStorage> page_storage,
                         std::unique_ptr<sync_coordinator::PageSync> page_sync,
                         std::unique_ptr<MergeResolver> merge_resolver,
                         PageManager::PageStorageState state,
                         zx::duration sync_timeout)
    : environment_(environment),
      page_storage_(std::move(page_storage)),
      page_sync_(std::move(page_sync)),
      merge_resolver_(std::move(merge_resolver)),
      sync_timeout_(sync_timeout),
      task_runner_(environment->dispatcher()) {
  pages_.set_on_empty([this] { CheckEmpty(); });
  snapshots_.set_on_empty([this] { CheckEmpty(); });
  page_debug_bindings_.set_empty_set_handler([this] { CheckEmpty(); });

  if (page_sync_) {
    page_sync_->SetSyncWatcher(&watchers_);
    page_sync_->SetOnIdle([this] { CheckEmpty(); });
    page_sync_->SetOnBacklogDownloaded([this] { OnSyncBacklogDownloaded(); });
    page_sync_->Start();
    if (state == PageManager::PageStorageState::NEEDS_SYNC) {
      // The page storage was created locally. We wait a bit in order to get the
      // initial state from the network before accepting requests.
      task_runner_.PostDelayedTask(
          [this] {
            if (!sync_backlog_downloaded_) {
              FXL_LOG(INFO) << "Initial sync will continue in background, "
                            << "in the meantime binding to local page data "
                            << "(might be stale or empty).";
              OnSyncBacklogDownloaded();
            }
          },
          sync_timeout_);
    } else {
      sync_backlog_downloaded_ = true;
    }
  } else {
    sync_backlog_downloaded_ = true;
  }
  merge_resolver_->set_on_empty([this] { CheckEmpty(); });
  merge_resolver_->SetPageManager(this);
}

PageManager::~PageManager() {
  for (const auto& delaying_facade : delaying_facades_) {
    delaying_facade.second(Status::INTERNAL_ERROR);
  }
  delaying_facades_.clear();
}

void PageManager::AddPageDelayingFacade(
    std::unique_ptr<PageDelayingFacade> delaying_facade,
    fit::function<void(Status)> on_done) {
  auto traced_on_done = TRACE_CALLBACK(std::move(on_done), "ledger",
                                       "page_manager_add_page_delaying_facade");
  if (!sync_backlog_downloaded_) {
    delaying_facades_.emplace_back(std::move(delaying_facade),
                                   std::move(traced_on_done));
    return;
  }
  pages_
      .emplace(environment_->coroutine_service(), this, page_storage_.get(),
               merge_resolver_.get(), &watchers_, std::move(delaying_facade))
      .Init(std::move(traced_on_done));
}

void PageManager::BindPageDebug(fidl::InterfaceRequest<PageDebug> page_debug,
                                fit::function<void(Status)> callback) {
  page_debug_bindings_.AddBinding(this, std::move(page_debug));
  callback(Status::OK);
}

void PageManager::BindPageSnapshot(
    std::unique_ptr<const storage::Commit> commit,
    fidl::InterfaceRequest<PageSnapshot> snapshot_request,
    std::string key_prefix) {
  snapshots_.emplace(std::move(snapshot_request), page_storage_.get(),
                     std::move(commit), std::move(key_prefix));
}

Reference PageManager::CreateReference(
    storage::ObjectIdentifier object_identifier) {
  uint64_t index = environment_->random()->Draw<uint64_t>();
  FXL_DCHECK(references_.find(index) == references_.end());
  references_[index] = std::move(object_identifier);
  Reference reference;
  reference.opaque_id = convert::ToArray(storage::SerializeData(index));
  return reference;
}

Status PageManager::ResolveReference(
    Reference reference, storage::ObjectIdentifier* object_identifier) {
  if (reference.opaque_id.size() != sizeof(uint64_t)) {
    return Status::REFERENCE_NOT_FOUND;
  }
  uint64_t index = storage::DeserializeData<uint64_t>(
      convert::ToStringView(reference.opaque_id));
  auto iterator = references_.find(index);
  if (iterator == references_.end()) {
    return Status::REFERENCE_NOT_FOUND;
  }
  *object_identifier = iterator->second;
  return Status::OK;
}

void PageManager::IsSynced(fit::function<void(Status, bool)> callback) {
  page_storage_->IsSynced(
      [callback = std::move(callback)](storage::Status status, bool is_synced) {
        callback(PageUtils::ConvertStatus(status), is_synced);
      });
}

void PageManager::IsOfflineAndEmpty(
    fit::function<void(Status, bool)> callback) {
  if (page_storage_->IsOnline()) {
    callback(Status::OK, false);
    return;
  }
  // The page is offline. Check and return if it's also empty.
  page_storage_->IsEmpty(
      [callback = std::move(callback)](storage::Status status, bool is_empty) {
        callback(PageUtils::ConvertStatus(status), is_empty);
      });
}

bool PageManager::IsEmpty() {
  return pages_.empty() && snapshots_.empty() && delaying_facades_.empty() &&
         merge_resolver_->IsEmpty() && (!page_sync_ || page_sync_->IsIdle()) &&
         page_debug_bindings_.size() == 0;
}

void PageManager::CheckEmpty() {
  if (on_empty_callback_ && IsEmpty()) {
    on_empty_callback_();
  }
}

void PageManager::OnSyncBacklogDownloaded() {
  if (sync_backlog_downloaded_) {
    FXL_LOG(INFO) << "Initial sync in background finished. "
                  << "Clients will receive a change notification.";
  }
  sync_backlog_downloaded_ = true;
  for (auto& delaying_facade : delaying_facades_) {
    AddPageDelayingFacade(std::move(delaying_facade.first),
                          std::move(delaying_facade.second));
  }
  delaying_facades_.clear();
}

void PageManager::GetHeadCommitsIds(GetHeadCommitsIdsCallback callback) {
  page_storage_->GetHeadCommitIds(
      [callback = std::move(callback)](storage::Status status,
                                       std::vector<storage::CommitId> heads) {
        fidl::VectorPtr<ledger_internal::CommitId> result;
        result.resize(0);
        for (const auto& head : heads) {
          ledger_internal::CommitId commit_id;
          commit_id.id = convert::ToArray(head);
          result.push_back(std::move(commit_id));
        }

        callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT),
                 std::move(result));
      });
}

void PageManager::GetSnapshot(
    ledger_internal::CommitId commit_id,
    fidl::InterfaceRequest<PageSnapshot> snapshot_request,
    GetSnapshotCallback callback) {
  page_storage_->GetCommit(
      convert::ToStringView(commit_id.id),
      [this, snapshot_request = std::move(snapshot_request),
       callback = std::move(callback)](
          storage::Status status,
          std::unique_ptr<const storage::Commit> commit) mutable {
        if (status == storage::Status::OK) {
          BindPageSnapshot(std::move(commit), std::move(snapshot_request), "");
        }
        callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT));
      });
}

void PageManager::GetCommit(ledger_internal::CommitId commit_id,
                            GetCommitCallback callback) {
  page_storage_->GetCommit(
      convert::ToStringView(commit_id.id),
      [callback = std::move(callback)](
          storage::Status status,
          std::unique_ptr<const storage::Commit> commit) mutable {
        ledger_internal::CommitPtr commit_struct = nullptr;
        if (status == storage::Status::OK) {
          commit_struct = ledger_internal::Commit::New();
          commit_struct->commit_id.id = convert::ToArray(commit->GetId());
          commit_struct->parents_ids.resize(0);
          for (const storage::CommitIdView& parent : commit->GetParentIds()) {
            ledger_internal::CommitId id;
            id.id = convert::ToArray(parent);
            commit_struct->parents_ids.push_back(std::move(id));
          }
          commit_struct->timestamp = commit->GetTimestamp().get();
          commit_struct->generation = commit->GetGeneration();
        }
        callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT),
                 std::move(commit_struct));
      });
}

}  // namespace ledger
