// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "peridot/bin/ledger/storage/impl/journal_impl.h"

#include <functional>
#include <map>
#include <string>
#include <utility>

#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>

#include "peridot/bin/ledger/storage/impl/btree/builder.h"
#include "peridot/bin/ledger/storage/impl/btree/tree_node.h"
#include "peridot/bin/ledger/storage/impl/commit_impl.h"
#include "peridot/bin/ledger/storage/public/commit.h"

namespace storage {

JournalImpl::JournalImpl(Token /* token */, JournalType type,
                         ledger::Environment* environment,
                         PageStorageImpl* page_storage, JournalId id,
                         CommitId base)
    : type_(type),
      environment_(environment),
      page_storage_(page_storage),
      id_(std::move(id)),
      base_(std::move(base)),
      valid_(true),
      failed_operation_(false) {}

JournalImpl::~JournalImpl() {
  // Log a warning if the journal was not committed or rolled back.
  if (valid_) {
    FXL_LOG(WARNING) << "Journal not committed or rolled back.";
  }
}

std::unique_ptr<Journal> JournalImpl::Simple(JournalType type,
                                             ledger::Environment* environment,
                                             PageStorageImpl* page_storage,
                                             const JournalId& id,
                                             const CommitId& base) {
  return std::make_unique<JournalImpl>(Token(), type, environment, page_storage,
                                       id, base);
}

std::unique_ptr<Journal> JournalImpl::Merge(ledger::Environment* environment,
                                            PageStorageImpl* page_storage,
                                            const JournalId& id,
                                            const CommitId& base,
                                            const CommitId& other) {
  auto journal = std::make_unique<JournalImpl>(
      Token(), JournalType::EXPLICIT, environment, page_storage, id, base);
  journal->other_ = std::make_unique<CommitId>(other);
  return journal;
}

const JournalId& JournalImpl::GetId() const { return id_; }

void JournalImpl::Commit(
    fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
        callback) {
  serializer_.Serialize<Status, std::unique_ptr<const storage::Commit>>(
      std::move(callback),
      [this](fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
                 callback) {
        if (!StateAllowsMutation()) {
          callback(Status::ILLEGAL_STATE, nullptr);
          return;
        }

        GetParents([this, callback = std::move(callback)](
                       Status status,
                       std::vector<std::unique_ptr<const storage::Commit>>
                           parents) mutable {
          if (status != Status::OK) {
            callback(status, nullptr);
            return;
          }
          page_storage_->GetJournalEntries(
              id_, [this, parents = std::move(parents),
                    callback = std::move(callback)](
                       Status status,
                       std::unique_ptr<Iterator<const EntryChange>> changes,
                       JournalContainsClearOperation
                           contains_clear_operation) mutable {
                if (status != Status::OK) {
                  callback(status, nullptr);
                  return;
                }
                if (contains_clear_operation ==
                    JournalContainsClearOperation::NO) {
                  // The journal doesn't contain the clear operation. The
                  // changes recorded on the journal need to be executed
                  // over the content of the first parent.
                  ObjectIdentifier root_identifier =
                      parents[0]->GetRootIdentifier();
                  CreateCommitFromChanges(
                      std::move(parents), std::move(root_identifier),
                      std::move(changes), std::move(callback));
                  return;
                }

                // The journal contains the clear operation. The changes
                // recorded on the journal need to be executed over an empty
                // page.
                btree::TreeNode::Empty(
                    page_storage_,
                    [this, parents = std::move(parents),
                     changes = std::move(changes),
                     callback = std::move(callback)](
                        Status status,
                        ObjectIdentifier root_identifier) mutable {
                      if (status != Status::OK) {
                        callback(status, nullptr);
                        return;
                      }
                      CreateCommitFromChanges(
                          std::move(parents), std::move(root_identifier),
                          std::move(changes), std::move(callback));
                    });
              });
        });
      });
}

void JournalImpl::Rollback(fit::function<void(Status)> callback) {
  serializer_.Serialize<Status>(std::move(callback),
                                [this](fit::function<void(Status)> callback) {
                                  RollbackInternal(std::move(callback));
                                });
}

void JournalImpl::Put(convert::ExtendedStringView key,
                      ObjectIdentifier object_identifier, KeyPriority priority,
                      fit::function<void(Status)> callback) {
  serializer_.Serialize<Status>(
      std::move(callback),
      [this, key = key.ToString(),
       object_identifier = std::move(object_identifier),
       priority](fit::function<void(Status)> callback) mutable {
        if (!StateAllowsMutation()) {
          callback(Status::ILLEGAL_STATE);
          return;
        }
        page_storage_->AddJournalEntry(
            id_, key, std::move(object_identifier), priority,
            [this, callback = std::move(callback)](Status s) {
              if (s != Status::OK) {
                failed_operation_ = true;
              }
              callback(s);
            });
      });
}

void JournalImpl::Delete(convert::ExtendedStringView key,
                         fit::function<void(Status)> callback) {
  serializer_.Serialize<Status>(
      std::move(callback),
      [this, key = key.ToString()](fit::function<void(Status)> callback) {
        if (!StateAllowsMutation()) {
          callback(Status::ILLEGAL_STATE);
          return;
        }

        page_storage_->RemoveJournalEntry(
            id_, key, [this, callback = std::move(callback)](Status s) {
              if (s != Status::OK) {
                failed_operation_ = true;
              }
              callback(s);
            });
      });
}

void JournalImpl::Clear(fit::function<void(Status)> callback) {
  serializer_.Serialize<Status>(
      std::move(callback), [this](fit::function<void(Status)> callback) {
        if (!StateAllowsMutation()) {
          callback(Status::ILLEGAL_STATE);
          return;
        }

        page_storage_->EmptyJournalAndMarkContainsClearOperation(
            id_, [this, callback = std::move(callback)](Status s) {
              if (s != Status::OK) {
                failed_operation_ = true;
              }
              callback(s);
            });
      });
}

void JournalImpl::GetParents(
    fit::function<void(Status,
                       std::vector<std::unique_ptr<const storage::Commit>>)>
        callback) {
  auto waiter = fxl::MakeRefCounted<
      callback::Waiter<Status, std::unique_ptr<const storage::Commit>>>(
      Status::OK);
  page_storage_->GetCommit(base_, waiter->NewCallback());
  if (other_) {
    page_storage_->GetCommit(*other_, waiter->NewCallback());
  }
  waiter->Finalize(std::move(callback));
}

void JournalImpl::CreateCommitFromChanges(
    std::vector<std::unique_ptr<const storage::Commit>> parents,
    ObjectIdentifier root_identifier,
    std::unique_ptr<Iterator<const EntryChange>> changes,
    fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
        callback) {
  btree::ApplyChanges(
      environment_->coroutine_service(), page_storage_,
      std::move(root_identifier), std::move(changes),
      [this, parents = std::move(parents), callback = std::move(callback)](
          Status status, ObjectIdentifier object_identifier,
          std::set<ObjectIdentifier> new_nodes) mutable {
        if (status != Status::OK) {
          callback(status, nullptr);
          return;
        }
        // If the commit is a no-op, return early.
        if (parents.size() == 1 &&
            parents.front()->GetRootIdentifier() == object_identifier) {
          // |new_nodes| can be ignored here. If a clear operation has been
          // executed and the state has then been restored to the one before the
          // transaction, |ApplyChanges| might have re-created some nodes that
          // already exist. Because they already exist in a pre-existing commit,
          // there is no need to update their state.

          // We are in an operation from the serializer: make sure not to sent
          // the rollback operation in the serializer as well, or a deadlock
          // will be created.
          RollbackInternal(
              [parent = std::move(parents.front()),
               callback = std::move(callback)](Status status) mutable {
                callback(status, std::move(parent));
              });
          return;
        }
        std::unique_ptr<const storage::Commit> commit =
            CommitImpl::FromContentAndParents(environment_->clock(),
                                              page_storage_, object_identifier,
                                              std::move(parents));
        GetObjectsToSync([this, new_nodes = std::move(new_nodes),
                          commit = std::move(commit),
                          callback = std::move(callback)](
                             Status status, std::vector<ObjectIdentifier>
                                                objects_to_sync) mutable {
          if (status != Status::OK) {
            callback(status, nullptr);
            return;
          }

          objects_to_sync.reserve(objects_to_sync.size() + new_nodes.size());
          // TODO(qsr): When using C++17, move data out of the set using
          // extract.
          objects_to_sync.insert(objects_to_sync.end(), new_nodes.begin(),
                                 new_nodes.end());
          page_storage_->AddCommitFromLocal(
              commit->Clone(), std::move(objects_to_sync),
              [this, commit = std::move(commit),
               callback = std::move(callback)](Status status) mutable {
                valid_ = false;
                if (status != Status::OK) {
                  callback(status, nullptr);
                  return;
                }
                page_storage_->RemoveJournal(
                    id_,
                    [commit = std::move(commit),
                     callback = std::move(callback)](Status status) mutable {
                      if (status != Status::OK) {
                        FXL_LOG(INFO)
                            << "Commit created, but failed to delete journal.";
                      }
                      callback(Status::OK, std::move(commit));
                    });
              });
        });
      });
}

void JournalImpl::GetObjectsToSync(
    fit::function<void(Status status,
                       std::vector<ObjectIdentifier> objects_to_sync)>
        callback) {
  page_storage_->GetJournalEntries(
      id_, [this, callback = std::move(callback)](
               Status s, std::unique_ptr<Iterator<const EntryChange>> entries,
               JournalContainsClearOperation contains_clear_operation) mutable {
        if (s != Status::OK) {
          callback(s, {});
          return;
        }
        // Compute the key-value pairs added in this journal.
        std::map<std::string, ObjectIdentifier> key_values;
        while (entries->Valid()) {
          const Entry& entry = (*entries)->entry;
          if ((*entries)->deleted) {
            key_values.erase(entry.key);
          } else {
            key_values[entry.key] = entry.object_identifier;
          }
          entries->Next();
        }
        auto waiter =
            fxl::MakeRefCounted<callback::Waiter<Status, bool>>(Status::OK);
        for (const auto& key_value : key_values) {
          page_storage_->ObjectIsUntracked(key_value.second,
                                           waiter->NewCallback());
        }
        waiter->Finalize([key_values = std::move(key_values),
                          callback = std::move(callback)](
                             Status s, std::vector<bool> is_untracked) {
          if (s != Status::OK) {
            callback(s, {});
            return;
          }
          // Compute the set of values.
          std::set<ObjectIdentifier> result_set;
          size_t i = 0;
          for (const auto& key_value : key_values) {
            // Only untracked objects should be synced.
            if (is_untracked[i++]) {
              result_set.insert(key_value.second);
            }
          }
          std::vector<ObjectIdentifier> objects_to_sync;
          std::copy(result_set.begin(), result_set.end(),
                    std::back_inserter(objects_to_sync));
          callback(Status::OK, std::move(objects_to_sync));
        });
      });
}

void JournalImpl::RollbackInternal(fit::function<void(Status)> callback) {
  if (!valid_) {
    callback(Status::ILLEGAL_STATE);
    return;
  }
  page_storage_->RemoveJournal(
      id_, [this, callback = std::move(callback)](Status s) {
        if (s == Status::OK) {
          valid_ = false;
        }
        callback(s);
      });
}

bool JournalImpl::StateAllowsMutation() {
  return valid_ && (type_ == JournalType::IMPLICIT || !failed_operation_);
}

}  // namespace storage
