// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/ledger/bin/storage/impl/journal_impl.h"

#include <lib/callback/waiter.h>
#include <lib/fit/function.h>

#include <functional>
#include <map>
#include <string>
#include <utility>

#include "src/ledger/bin/storage/impl/btree/builder.h"
#include "src/ledger/bin/storage/impl/btree/tree_node.h"
#include "src/ledger/bin/storage/impl/commit_impl.h"
#include "src/ledger/bin/storage/public/commit.h"
#include "src/lib/fxl/memory/ref_ptr.h"

namespace storage {

JournalImpl::JournalImpl(Token /* token */, ledger::Environment* environment,
                         PageStorageImpl* page_storage,
                         std::unique_ptr<const storage::Commit> base)
    : environment_(environment),
      page_storage_(page_storage),
      base_(std::move(base)),
      committed_(false) {}

JournalImpl::~JournalImpl() {}

std::unique_ptr<Journal> JournalImpl::Simple(
    ledger::Environment* environment, PageStorageImpl* page_storage,
    std::unique_ptr<const storage::Commit> base) {
  FXL_DCHECK(base);

  return std::make_unique<JournalImpl>(Token(), environment, page_storage,
                                       std::move(base));
}

std::unique_ptr<Journal> JournalImpl::Merge(
    ledger::Environment* environment, PageStorageImpl* page_storage,
    std::unique_ptr<const storage::Commit> base,
    std::unique_ptr<const storage::Commit> other) {
  FXL_DCHECK(base);
  FXL_DCHECK(other);
  auto journal = std::make_unique<JournalImpl>(Token(), environment,
                                               page_storage, std::move(base));
  journal->other_ = std::move(other);
  return journal;
}

void JournalImpl::Commit(
    fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
        callback) {
  FXL_DCHECK(!committed_);
  committed_ = true;

  std::vector<std::unique_ptr<const storage::Commit>> parents;
  if (other_) {
    parents.reserve(2);
    parents.push_back(std::move(base_));
    parents.push_back(std::move(other_));
  } else {
    parents.reserve(1);
    parents.push_back(std::move(base_));
  }

  std::vector<storage::EntryChange> changes;
  for (auto [key, entry_change] : journal_entries_) {
    changes.push_back(std::move(entry_change));
  }

  if (cleared_ == JournalContainsClearOperation::NO) {
    // The journal doesn't contain the clear operation. The changes
    // recorded on the journal need to be executed over the content of
    // the first parent.
    ObjectIdentifier root_identifier = parents[0]->GetRootIdentifier();
    CreateCommitFromChanges(std::move(parents), std::move(root_identifier),
                            std::move(changes), std::move(callback));
    return;
  }

  // The journal contains the clear operation. The changes recorded on the
  // journal need to be executed over an empty page.
  btree::TreeNode::Empty(
      page_storage_,
      [this, parents = std::move(parents), changes = std::move(changes),
       callback = std::move(callback)](
          Status status, ObjectIdentifier root_identifier) mutable {
        if (status != Status::OK) {
          callback(status, nullptr);
          return;
        }
        CreateCommitFromChanges(std::move(parents), std::move(root_identifier),
                                std::move(changes), std::move(callback));
      });
}

void JournalImpl::Put(convert::ExtendedStringView key,
                      ObjectIdentifier object_identifier,
                      KeyPriority priority) {
  FXL_DCHECK(!committed_);
  EntryChange change;
  change.entry = {key.ToString(), std::move(object_identifier), priority};
  change.deleted = false;
  journal_entries_[key.ToString()] = std::move(change);
}

void JournalImpl::Delete(convert::ExtendedStringView key) {
  FXL_DCHECK(!committed_);
  EntryChange change;
  change.entry = {key.ToString(), ObjectIdentifier(), KeyPriority::EAGER};
  change.deleted = true;
  journal_entries_[key.ToString()] = std::move(change);
}

void JournalImpl::Clear() {
  FXL_DCHECK(!committed_);
  cleared_ = JournalContainsClearOperation::YES;
  journal_entries_.clear();
}

void JournalImpl::CreateCommitFromChanges(
    std::vector<std::unique_ptr<const storage::Commit>> parents,
    ObjectIdentifier root_identifier, std::vector<EntryChange> changes,
    fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
        callback) {
  btree::ApplyChanges(
      environment_->coroutine_service(), page_storage_,
      std::move(root_identifier), std::move(changes),
      [this, parents = std::move(parents), callback = std::move(callback)](
          Status status, ObjectIdentifier object_identifier,
          std::set<ObjectIdentifier> new_nodes) mutable {
        if (status != Status::OK) {
          callback(status, nullptr);
          return;
        }
        // If the commit is a no-op, return early, without creating a new
        // commit.
        if (parents.size() == 1 &&
            parents.front()->GetRootIdentifier() == object_identifier) {
          // |new_nodes| can be ignored here. If a clear operation has been
          // executed and the state has then been restored to the one before the
          // transaction, |ApplyChanges| might have re-created some nodes that
          // already exist. Because they already exist in a pre-existing commit,
          // there is no need to update their state.
          callback(Status::OK, nullptr);
          return;
        }
        std::unique_ptr<const storage::Commit> commit =
            CommitImpl::FromContentAndParents(environment_->clock(),
                                              page_storage_, object_identifier,
                                              std::move(parents));
        GetObjectsToSync(
            [this, new_nodes = std::move(new_nodes), commit = std::move(commit),
             callback = std::move(callback)](
                Status status,
                std::vector<ObjectIdentifier> objects_to_sync) mutable {
              if (status != Status::OK) {
                callback(status, nullptr);
                return;
              }

              objects_to_sync.reserve(objects_to_sync.size() +
                                      new_nodes.size());
              // TODO(qsr): When using C++17, move data out of the set using
              // extract.
              objects_to_sync.insert(objects_to_sync.end(), new_nodes.begin(),
                                     new_nodes.end());
              page_storage_->AddCommitFromLocal(
                  commit->Clone(), std::move(objects_to_sync),
                  [commit = std::move(commit),
                   callback = std::move(callback)](Status status) mutable {
                    if (status != Status::OK) {
                      callback(status, nullptr);
                      return;
                    }
                    callback(Status::OK, std::move(commit));
                  });
            });
      });
}

void JournalImpl::GetObjectsToSync(
    fit::function<void(Status status,
                       std::vector<ObjectIdentifier> objects_to_sync)>
        callback) {
  auto waiter = fxl::MakeRefCounted<callback::Waiter<Status, bool>>(Status::OK);
  std::vector<ObjectIdentifier> added_values;
  for (auto const& journal_entry : journal_entries_) {
    if (journal_entry.second.deleted) {
      continue;
    }
    added_values.push_back(journal_entry.second.entry.object_identifier);
    page_storage_->ObjectIsUntracked(added_values.back(),
                                     waiter->NewCallback());
  }
  waiter->Finalize(
      [added_values = std::move(added_values), callback = std::move(callback)](
          Status status, std::vector<bool> is_untracked) {
        if (status != Status::OK) {
          callback(status, {});
          return;
        }
        FXL_DCHECK(added_values.size() == is_untracked.size());

        // Only untracked objects should be synced.
        std::vector<ObjectIdentifier> objects_to_sync;
        for (size_t i = 0; i < is_untracked.size(); ++i) {
          if (is_untracked[i]) {
            objects_to_sync.push_back(std::move(added_values[i]));
          }
        }
        callback(Status::OK, std::move(objects_to_sync));
      });
}

}  // namespace storage
