// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/ledger/bin/storage/impl/leveldb.h"

#include <lib/async/cpp/task.h>
#include <lib/fit/function.h>
#include <trace/event.h>

#include <utility>

#include "src/ledger/bin/cobalt/cobalt.h"
#include "src/ledger/bin/storage/impl/object_impl.h"
#include "src/ledger/lib/coroutine/coroutine.h"
#include "src/lib/files/directory.h"
#include "src/lib/files/path.h"
#include "src/lib/fxl/logging.h"
#include "util/env_fuchsia.h"

namespace storage {

using coroutine::CoroutineHandler;

namespace {

Status MakeEmptySyncCallAndCheck(async_dispatcher_t* dispatcher,
                                 coroutine::CoroutineHandler* handler) {
  if (coroutine::SyncCall(handler, [&dispatcher](fit::closure on_done) {
        async::PostTask(dispatcher, std::move(on_done));
      }) == coroutine::ContinuationStatus::INTERRUPTED) {
    return Status::INTERRUPTED;
  }
  return Status::OK;
}

Status ConvertStatus(leveldb::Status s) {
  if (s.IsNotFound()) {
    return Status::INTERNAL_NOT_FOUND;
  }
  if (!s.ok()) {
    FXL_LOG(ERROR) << "LevelDB error: " << s.ToString();
    return Status::INTERNAL_ERROR;
  }
  return Status::OK;
}

class BatchImpl : public Db::Batch {
 public:
  // Creates a new Batch based on a leveldb batch. Once |Execute| is called,
  // |callback| will be called with the same batch, ready to be written in
  // leveldb. If the destructor is called without a previous execution of the
  // batch, |callback| will be called with a |nullptr|.
  BatchImpl(
      async_dispatcher_t* dispatcher,
      std::unique_ptr<leveldb::WriteBatch> batch,
      fit::function<Status(std::unique_ptr<leveldb::WriteBatch>)> callback)
      : dispatcher_(dispatcher),
        batch_(std::move(batch)),
        callback_(std::move(callback)) {}

  ~BatchImpl() override {
    if (batch_)
      callback_(nullptr);
  }

  Status Put(CoroutineHandler* handler, convert::ExtendedStringView key,
             fxl::StringView value) override {
    FXL_DCHECK(batch_);
    if (MakeEmptySyncCallAndCheck(dispatcher_, handler) ==
        Status::INTERRUPTED) {
      return Status::INTERRUPTED;
    }
    batch_->Put(key, convert::ToSlice(value));
    return Status::OK;
  }

  Status Delete(CoroutineHandler* handler,
                convert::ExtendedStringView key) override {
    FXL_DCHECK(batch_);
    batch_->Delete(key);
    return MakeEmptySyncCallAndCheck(dispatcher_, handler);
  }

  Status Execute(CoroutineHandler* handler) override {
    FXL_DCHECK(batch_);
    if (MakeEmptySyncCallAndCheck(dispatcher_, handler) ==
        Status::INTERRUPTED) {
      return Status::INTERRUPTED;
    }
    return callback_(std::move(batch_));
  }

 private:
  async_dispatcher_t* const dispatcher_;
  std::unique_ptr<leveldb::WriteBatch> batch_;

  fit::function<Status(std::unique_ptr<leveldb::WriteBatch>)> callback_;
};

class RowIterator
    : public Iterator<const std::pair<convert::ExtendedStringView,
                                      convert::ExtendedStringView>> {
 public:
  RowIterator(std::unique_ptr<leveldb::Iterator> it, std::string prefix)
      : it_(std::move(it)), prefix_(std::move(prefix)) {
    PrepareEntry();
  }

  ~RowIterator() override {}

  Iterator<const std::pair<convert::ExtendedStringView,
                           convert::ExtendedStringView>>&
  Next() override {
    it_->Next();
    PrepareEntry();
    return *this;
  }

  bool Valid() const final {
    return it_->Valid() && it_->key().starts_with(prefix_);
  }

  Status GetStatus() const override {
    return it_->status().ok() ? Status::OK : Status::INTERNAL_ERROR;
  }

  const std::pair<convert::ExtendedStringView, convert::ExtendedStringView>&
  operator*() const override {
    return *(row_.get());
  }

  const std::pair<convert::ExtendedStringView, convert::ExtendedStringView>*
  operator->() const override {
    return row_.get();
  }

 private:
  void PrepareEntry() {
    if (!Valid()) {
      row_.reset(nullptr);
      return;
    }
    row_ = std::make_unique<
        std::pair<convert::ExtendedStringView, convert::ExtendedStringView>>(
        it_->key(), it_->value());
  }

  std::unique_ptr<leveldb::Iterator> it_;
  const std::string prefix_;

  std::unique_ptr<
      std::pair<convert::ExtendedStringView, convert::ExtendedStringView>>
      row_;
};

}  // namespace

LevelDb::LevelDb(async_dispatcher_t* dispatcher, ledger::DetachedPath db_path)
    : dispatcher_(dispatcher), db_path_(std::move(db_path)) {}

LevelDb::~LevelDb() {
  FXL_DCHECK(!active_batches_count_)
      << "Not all LevelDb batches have been executed or rolled back.";
}

Status LevelDb::Init() {
  TRACE_DURATION("ledger", "leveldb_init");
  if (!files::CreateDirectoryAt(db_path_.root_fd(), db_path_.path())) {
    FXL_LOG(ERROR) << "Failed to create directory under " << db_path_.path();
    return Status::INTERNAL_ERROR;
  }
  fxl::UniqueFD unique_fd;
  ledger::DetachedPath db_path = db_path_;
  if (db_path_.path() != ".") {
    // Open a UniqueFD at the db path.
    unique_fd = db_path_.OpenFD(&db_path);
    if (!unique_fd.is_valid()) {
      FXL_LOG(ERROR) << "Unable to open directory at " << db_path_.path()
                     << ". errno: " << errno;
      return Status::INTERNAL_ERROR;
    }
  }
  env_ = leveldb::MakeFuchsiaEnv(db_path.root_fd());
  leveldb::Options options;
  options.env = env_.get();
  options.create_if_missing = true;
  leveldb::DB* db = nullptr;
  leveldb::Status status = leveldb::DB::Open(options, db_path.path(), &db);
  if (status.IsCorruption()) {
    FXL_LOG(ERROR) << "Ledger state corrupted at " << db_path_.path()
                   << " with leveldb status: " << status.ToString();
    FXL_LOG(WARNING) << "Trying to recover by erasing the local state.";
    FXL_LOG(WARNING)
        << "***** ALL LOCAL CHANGES IN THIS PAGE WILL BE LOST *****";
    ledger::ReportEvent(ledger::CobaltEvent::LEDGER_LEVELDB_STATE_CORRUPTED);

    if (!files::DeletePathAt(db_path_.root_fd(), db_path_.path(), true)) {
      FXL_LOG(ERROR) << "Failed to delete corrupted ledger at "
                     << db_path_.path();
      return Status::INTERNAL_ERROR;
    }
    leveldb::Status status = leveldb::DB::Open(options, db_path.path(), &db);
    if (!status.ok()) {
      FXL_LOG(ERROR) << "Failed to create a new LevelDB at " << db_path_.path()
                     << " with leveldb status: " << status.ToString();
      return Status::INTERNAL_ERROR;
    }
  } else if (!status.ok()) {
    FXL_LOG(ERROR) << "Failed to open ledger at " << db_path_.path()
                   << " with leveldb status: " << status.ToString();
    return Status::INTERNAL_ERROR;
  }
  db_.reset(db);
  return Status::OK;
}

Status LevelDb::StartBatch(CoroutineHandler* handler,
                           std::unique_ptr<Db::Batch>* batch) {
  auto db_batch = std::make_unique<leveldb::WriteBatch>();
  active_batches_count_++;
  *batch = std::make_unique<BatchImpl>(
      dispatcher_, std::move(db_batch),
      [this](std::unique_ptr<leveldb::WriteBatch> db_batch) {
        active_batches_count_--;
        if (db_batch) {
          leveldb::Status status = db_->Write(write_options_, db_batch.get());
          if (!status.ok()) {
            FXL_LOG(ERROR) << "Failed to execute batch with status: "
                           << status.ToString();
            return Status::INTERNAL_ERROR;
          }
        }
        return Status::OK;
      });
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

Status LevelDb::Get(CoroutineHandler* handler, convert::ExtendedStringView key,
                    std::string* value) {
  if (MakeEmptySyncCallAndCheck(dispatcher_, handler) == Status::INTERRUPTED) {
    return Status::INTERRUPTED;
  }
  return ConvertStatus(db_->Get(read_options_, key, value));
}

Status LevelDb::HasKey(CoroutineHandler* handler,
                       convert::ExtendedStringView key) {
  std::unique_ptr<leveldb::Iterator> iterator(db_->NewIterator(read_options_));
  iterator->Seek(key);

  if (!iterator->Valid() || iterator->key() != key) {
    return Status::INTERNAL_NOT_FOUND;
  }
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

Status LevelDb::GetObject(CoroutineHandler* handler,
                          convert::ExtendedStringView key,
                          ObjectIdentifier object_identifier,
                          std::unique_ptr<const Piece>* piece) {
  FXL_DCHECK(piece);
  std::unique_ptr<leveldb::Iterator> iterator(db_->NewIterator(read_options_));
  iterator->Seek(key);

  if (!iterator->Valid() || iterator->key() != key) {
    return Status::INTERNAL_NOT_FOUND;
  }

  *piece = std::make_unique<LevelDBPiece>(std::move(object_identifier),
                                          std::move(iterator));
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

Status LevelDb::GetByPrefix(CoroutineHandler* handler,
                            convert::ExtendedStringView prefix,
                            std::vector<std::string>* key_suffixes) {
  std::vector<std::string> result;
  std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(read_options_));
  for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix);
       it->Next()) {
    leveldb::Slice key = it->key();
    key.remove_prefix(prefix.size());
    result.push_back(key.ToString());
  }
  if (!it->status().ok()) {
    return ConvertStatus(it->status());
  }
  key_suffixes->swap(result);
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

Status LevelDb::GetEntriesByPrefix(
    CoroutineHandler* handler, convert::ExtendedStringView prefix,
    std::vector<std::pair<std::string, std::string>>* entries) {
  std::vector<std::pair<std::string, std::string>> result;
  std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(read_options_));
  for (it->Seek(prefix); it->Valid() && it->key().starts_with(prefix);
       it->Next()) {
    leveldb::Slice key = it->key();
    key.remove_prefix(prefix.size());
    result.emplace_back(key.ToString(), it->value().ToString());
  }
  if (!it->status().ok()) {
    return ConvertStatus(it->status());
  }
  entries->swap(result);
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

Status LevelDb::GetIteratorAtPrefix(
    CoroutineHandler* handler, convert::ExtendedStringView prefix,
    std::unique_ptr<Iterator<const std::pair<
        convert::ExtendedStringView, convert::ExtendedStringView>>>* iterator) {
  std::unique_ptr<leveldb::Iterator> local_iterator(
      db_->NewIterator(read_options_));
  local_iterator->Seek(prefix);

  if (iterator) {
    std::unique_ptr<Iterator<const std::pair<convert::ExtendedStringView,
                                             convert::ExtendedStringView>>>
        row_iterator = std::make_unique<RowIterator>(std::move(local_iterator),
                                                     prefix.ToString());
    iterator->swap(row_iterator);
  }
  return MakeEmptySyncCallAndCheck(dispatcher_, handler);
}

}  // namespace storage
