blob: 46b048a28e8649b9f184a3fba4df382aea5f0fba [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/impl/leveldb_factory.h"
#include <lib/async/cpp/task.h>
#include <lib/async_promise/executor.h>
#include <lib/callback/auto_cleanable.h>
#include <lib/callback/scoped_callback.h>
#include <lib/callback/trace_callback.h>
#include <lib/fit/bridge.h>
#include <lib/fit/promise.h>
#include <lib/fit/result.h>
#include <lib/fit/scope.h>
#include <trace/event.h>
#include <mutex>
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/lib/files/directory.h"
#include "src/lib/files/unique_fd.h"
#include "src/lib/fxl/logging.h"
#include "src/lib/fxl/memory/ref_counted.h"
#include "src/lib/fxl/memory/ref_ptr.h"
#include "src/lib/fxl/strings/string_view.h"
#include "src/lib/fxl/synchronization/thread_annotations.h"
// LevelDbFactory tries to keep an empty, initialized instance of LevelDb always
// available. It stores this cached instance under cached_db/.
//
// On requests for new LevelDb instances (see |GetOrCreateDb|), if the cached
// instance is ready, it is moved to the requested destination and then a new
// LevelDb is prepared to be cached. If the cached instance is not yet
// available, the request is queued, and will be handled when the cached db is
// ready.
//
// Note that if multiple requests are received while waiting for the LevelDb
// initialization, only the first one is queued up. The rest directly request a
// new LevelDb instance at the final destination.
namespace storage {
namespace {
// TODO(LE-635): We need to clean the staging path, so that we don't leave
// unreachable storage on disk.
constexpr fxl::StringView kStagingPath = "staging";
constexpr fxl::StringView kCachedDbPath = "cached_db";
constexpr size_t kRandomBytesCount = 16;
// Returns whether the parent directory of |path| exists. If it is not possible
// to access the parent directory, returns whether the given |path| exists.
bool ParentDirectoryExists(ledger::DetachedPath path) {
size_t last_slash = path.path().find_last_of('/');
return files::IsDirectoryAt(path.root_fd(),
last_slash == std::string::npos
? path.path()
: path.path().substr(0, last_slash));
}
enum class CreateInStagingPath : bool {
NO,
YES,
};
// LockingWrapper that allows to block the execution of wrapped promises.
class LockingWrapper {
public:
LockingWrapper() = default;
// Wrapper implementation, as expected by fit::promise:
template <class Promise>
decltype(auto) wrap(Promise promise) {
assert(promise);
return fit::make_promise_with_continuation(
LockingWrappedContinuation<Promise>(this, std::move(promise)));
}
// Acquire a lock on the promise execution, effectively blocking any wrapped
// promise once acquired.
std::unique_lock<std::mutex> lock() {
return std::unique_lock<std::mutex>(mutex_);
}
private:
// Promise continuation that acquires the shared lock from LockingWrapper
// before executing the promise.
template <class Promise>
class LockingWrappedContinuation {
public:
explicit LockingWrappedContinuation(LockingWrapper* wrapper,
Promise promise)
: wrapper_(wrapper), promise_(std::move(promise)) {}
// Executes the wrapped promise.
typename Promise::result_type operator()(fit::context& context) {
std::lock_guard<std::mutex> lg(wrapper_->mutex_);
return promise_(context);
}
private:
LockingWrapper* const wrapper_;
Promise promise_;
};
std::mutex mutex_;
};
// ScopedExecutor is a proxy for async::Executor that ensures that all tasks
// scheduled on it can be stopped from another thread.
class ScopedAsyncExecutor {
public:
// Creates a ScopedAsyncExecutor using the provided async loop dispatcher.
ScopedAsyncExecutor(async_dispatcher_t* dispatcher) : executor_(dispatcher) {
scope_.emplace();
}
~ScopedAsyncExecutor() { FXL_DCHECK(stopped_); }
// fit::executor:
void schedule_task(fit::promise<> task) {
if (stopped_) {
return;
}
// The wrapping order is important: by putting the locking wrapper after the
// scope wrapper, we ensure that tasks are first locked before we check for
// the scope's destruction. Thus, each promise is wrapped twice:
// LockingWrapper[fit::scope[promise]]
// This way, when we want to stop the executor and acquire the lock, we are
// sure that the scoped promises are not executing because they are still at
// the locking step. Once the scope is deleted and the lock released, the
// executor will try to execute the scoped promises, and exit early.
executor_.schedule_task(task.wrap_with(*scope_).wrap_with(wrapper_));
}
// Stop the executor. Once this method returns, it is guaranteed that no code
// provided to |schedule_task| will be executed. It is however unsafe to
// delete the class object at this point if |Stop| has been called on a
// different thread than the one used by this executor, as management code may
// still be running.
void Stop() {
auto lock = wrapper_.lock();
scope_.reset();
stopped_ = true;
}
private:
bool stopped_ = false;
async::Executor executor_;
std::optional<fit::scope> scope_;
LockingWrapper wrapper_;
};
} // namespace
// IOLevelDbFactory holds all operations happening on the IO thread.
class LevelDbFactory::IOLevelDbFactory {
public:
IOLevelDbFactory(ledger::Environment* environment,
ledger::DetachedPath cache_path)
: environment_(environment),
staging_path_(cache_path.SubPath(kStagingPath)),
cached_db_path_(cache_path.SubPath(kCachedDbPath)),
io_executor_(environment_->io_dispatcher()) {}
// Initialize the IO factory.
void Init();
// Returns through the completer a LevelDB database, initialized on the IO
// thread.
void GetOrCreateDb(ledger::DetachedPath db_path,
DbFactory::OnDbNotFound on_db_not_found,
fit::completer<std::unique_ptr<Db>, Status> completer);
// Self-destructs this class on the IO thread.
// |io_executor_| can't be destroyed when a task is in progress, and
// |io_executor_| tasks use member variables to operate. Thus, by scheduling
// the deletion of this class on the same dispatcher as the |io_executor_|, we
// ensure that |io_executor_| is destroyed when no task is running, that
// no task will access member variables after their destruction, and that we
// are not blocking the main thread while doing this.
void SelfDestruct(std::unique_ptr<LevelDbFactory::IOLevelDbFactory> self);
private:
// Gets or creates a new LevelDb instance in the given |db_path|,
// initializes and then returns it through the completer. Callers should
// execute the returned promise, as it would contain any deferred
// computation, if deferred computation is needed. This method should be
// called on the I/O thread.
fit::promise<> GetOrCreateDbOnIOThread(
ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found,
fit::completer<std::unique_ptr<Db>, Status> completer);
// Gets or creates a new LevelDb instance.
// This method should be called on the I/O thread.
fit::result<std::unique_ptr<Db>, Status> GetOrCreateDbAtPathOnIOThread(
ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path);
// Synchronously creates and initializes a new LevelDb instance in a two-step
// process: the new instance is created in a temporary directory under the
// staging path and, if successful, it is then moved to the given |db_path|.
// This way, if initialization is interrupted, the potentially corrupted
// database will be in the staging area.
// This method should be called on the I/O thread.
Status CreateDbThroughStagingPathOnIOThread(ledger::DetachedPath db_path,
std::unique_ptr<LevelDb>* db);
// Synchronously creates a new cached DB in the cached db path.
// This method should be called on the I/O thread.
fit::result<std::unique_ptr<Db>, Status> PrepareCachedDbOnIOThread(
CreateInStagingPath create_in_staging_path);
// Sychronously prepares a precached DB for normal use.
// This method should be called on the I/O thread.
fit::result<std::unique_ptr<Db>, Status> ReturnPrecachedDbOnIOThread(
ledger::DetachedPath db_path,
fit::result<std::unique_ptr<Db>, Status> result);
// We hold a cached database to speed up initialization. |cached_db_| is only
// manipulated on the I/O thread.
fit::future<std::unique_ptr<Db>, Status> cached_db_;
ledger::Environment* const environment_;
// The path where new LevelDb instances are created, before they are moved to
// their final destination, or the cached db path.
const ledger::DetachedPath staging_path_;
// The path that keeps the initialized cached instance of LevelDb.
const ledger::DetachedPath cached_db_path_;
ScopedAsyncExecutor io_executor_;
};
void LevelDbFactory::IOLevelDbFactory::Init() {
// If there is already a LevelDb instance in the cache directory, initialize
// that one, instead of creating a new one.
io_executor_.schedule_task(fit::make_promise([this](fit::context& context) {
fit::bridge<std::unique_ptr<Db>, Status> bridge;
cached_db_ = bridge.consumer.promise();
CreateInStagingPath create_in_staging_path =
static_cast<CreateInStagingPath>(!files::IsDirectoryAt(
cached_db_path_.root_fd(), cached_db_path_.path()));
auto cache_db_result = PrepareCachedDbOnIOThread(create_in_staging_path);
bridge.completer.complete_or_abandon(std::move(cache_db_result));
}));
}
void LevelDbFactory::IOLevelDbFactory::GetOrCreateDb(
ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found,
fit::completer<std::unique_ptr<Db>, Status> completer) {
io_executor_.schedule_task(fit::make_promise(
[this, db_path, on_db_not_found,
completer = std::move(completer)](fit::context& context) mutable {
return GetOrCreateDbOnIOThread(db_path, on_db_not_found,
std::move(completer));
}));
}
void LevelDbFactory::IOLevelDbFactory::SelfDestruct(
std::unique_ptr<LevelDbFactory::IOLevelDbFactory> self) {
FXL_DCHECK(self.get() == this);
io_executor_.Stop();
async::PostTask(environment_->io_dispatcher(), [self = std::move(self)]() {});
}
fit::promise<> LevelDbFactory::IOLevelDbFactory::GetOrCreateDbOnIOThread(
ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found,
fit::completer<std::unique_ptr<Db>, Status> completer) {
if (files::IsDirectoryAt(db_path.root_fd(), db_path.path())) {
// If the path exists, there is a LevelDb instance already there. Open and
// return it.
auto result = GetOrCreateDbAtPathOnIOThread(std::move(db_path),
CreateInStagingPath::NO);
completer.complete_or_abandon(std::move(result));
return fit::promise<>();
}
if (on_db_not_found == DbFactory::OnDbNotFound::RETURN) {
completer.complete_or_abandon(fit::error(Status::PAGE_NOT_FOUND));
return fit::promise<>();
}
switch (cached_db_.state()) {
case fit::future_state::ok:
completer.complete_or_abandon(ReturnPrecachedDbOnIOThread(
std::move(db_path), cached_db_.take_result()));
return fit::promise<>();
case fit::future_state::pending:
return cached_db_.take_promise().then(
[this, db_path, completer = std::move(completer)](
fit::result<std::unique_ptr<Db>, Status>& cache_result) mutable
-> void {
completer.complete_or_abandon(ReturnPrecachedDbOnIOThread(
std::move(db_path), std::move(cache_result)));
});
case fit::future_state::empty:
// If creating the pre-cached db failed at some point it will likely fail
// again. Don't retry caching anymore.
case fit::future_state::error: {
// Either creation of a cached db has failed or a previous request is
// already waiting for the cached instance. Request a new LevelDb at the
// final destination.
completer.complete_or_abandon(GetOrCreateDbAtPathOnIOThread(
std::move(db_path), CreateInStagingPath::YES));
return fit::promise<>();
}
}
}
fit::result<std::unique_ptr<Db>, Status>
LevelDbFactory::IOLevelDbFactory::GetOrCreateDbAtPathOnIOThread(
ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path) {
std::unique_ptr<LevelDb> leveldb;
Status status;
if (create_in_staging_path == CreateInStagingPath::YES) {
status = CreateDbThroughStagingPathOnIOThread(std::move(db_path), &leveldb);
} else {
FXL_DCHECK(files::IsDirectoryAt(db_path.root_fd(), db_path.path()));
leveldb = std::make_unique<LevelDb>(environment_->dispatcher(),
std::move(db_path));
status = leveldb->Init();
}
if (status != Status::OK) {
return fit::error(status);
}
return fit::ok(std::move(leveldb));
}
Status LevelDbFactory::IOLevelDbFactory::CreateDbThroughStagingPathOnIOThread(
ledger::DetachedPath db_path, std::unique_ptr<LevelDb>* db) {
char name[kRandomBytesCount];
environment_->random()->Draw(name, kRandomBytesCount);
ledger::DetachedPath tmp_destination = staging_path_.SubPath(
convert::ToHex(fxl::StringView(name, kRandomBytesCount)));
// Create a LevelDb instance in a temporary path.
auto result =
std::make_unique<LevelDb>(environment_->dispatcher(), tmp_destination);
Status status = result->Init();
if (status != Status::OK) {
return status;
}
// If the parent directory doesn't exist, renameat will fail.
// Note that |cached_db_path_| will also be created throught the staging path
// and thus, this code path will be reached. Its parent directory is lazily
// created when result->Init() (see code above) is called:
// - |staging_path_| and |cached_db_path_| share the same parent (the
// |cache_path| given on the constructor), and
// - in LevelDb initialization, the directories up to the db path are created.
FXL_DCHECK(ParentDirectoryExists(db_path))
<< "Parent directory does not exit for path: " << db_path.path();
// Move it to the final destination.
if (renameat(tmp_destination.root_fd(), tmp_destination.path().c_str(),
db_path.root_fd(), db_path.path().c_str()) != 0) {
FXL_LOG(ERROR) << "Unable to move LevelDb from staging path to final "
"destination: "
<< db_path.path() << ". Error: " << strerror(errno);
return Status::IO_ERROR;
}
*db = std::move(result);
return Status::OK;
}
fit::result<std::unique_ptr<Db>, Status>
LevelDbFactory::IOLevelDbFactory::PrepareCachedDbOnIOThread(
CreateInStagingPath create_in_staging_path) {
TRACE_DURATION("ledger", "prepare_cached_db");
return GetOrCreateDbAtPathOnIOThread(cached_db_path_, create_in_staging_path);
}
fit::result<std::unique_ptr<Db>, Status>
LevelDbFactory::IOLevelDbFactory::ReturnPrecachedDbOnIOThread(
ledger::DetachedPath db_path,
fit::result<std::unique_ptr<Db>, Status> result) {
if (result.is_error()) {
// If we failed to create a cached db instance, any future attempts will
// likely fail as well: just return the error, and subsequent attempts will
// not attempt to use a cached DB.
return result;
}
// Move the cached db to the final destination.
if (renameat(cached_db_path_.root_fd(), cached_db_path_.path().c_str(),
db_path.root_fd(), db_path.path().c_str()) != 0) {
FXL_LOG(ERROR) << "Unable to move LevelDb from: " << cached_db_path_.path()
<< " to final destination: " << db_path.path()
<< ". Error: " << strerror(errno);
// Moving to the final destination failed, but the cached db was created
// succesfully: we fail, and we'll retry the cached db next time.
fit::bridge<std::unique_ptr<Db>, Status> bridge;
cached_db_ = bridge.consumer.promise();
bridge.completer.complete_or_abandon(std::move(result));
return fit::error(Status::IO_ERROR);
}
// Asynchronously start preparing the next cached db.
fit::bridge<std::unique_ptr<Db>, Status> bridge;
cached_db_ = bridge.consumer.promise();
io_executor_.schedule_task(
fit::make_promise([this, completer = std::move(bridge.completer)](
fit::context& context) mutable {
auto cache_db_result =
PrepareCachedDbOnIOThread(CreateInStagingPath::YES);
completer.complete_or_abandon(std::move(cache_db_result));
}));
return result;
}
LevelDbFactory::LevelDbFactory(ledger::Environment* environment,
ledger::DetachedPath cache_path)
: main_executor_(environment->dispatcher()) {
io_level_db_factory_ =
std::make_unique<IOLevelDbFactory>(environment, cache_path);
}
LevelDbFactory::~LevelDbFactory() {
io_level_db_factory_->SelfDestruct(std::move(io_level_db_factory_));
}
void LevelDbFactory::Init() { io_level_db_factory_->Init(); }
void LevelDbFactory::GetOrCreateDb(
ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found,
fit::function<void(Status, std::unique_ptr<Db>)> callback) {
fit::bridge<std::unique_ptr<Db>, Status> bridge;
io_level_db_factory_->GetOrCreateDb(db_path, on_db_not_found,
std::move(bridge.completer));
main_executor_.schedule_task(
bridge.consumer.promise_or(fit::error(Status::ILLEGAL_STATE))
.then([callback = std::move(callback)](
fit::result<std::unique_ptr<Db>, Status>& result) {
switch (result.state()) {
case fit::result_state::error:
callback(result.take_error(), nullptr);
return;
case fit::result_state::ok:
callback(Status::OK, result.take_value());
return;
case fit::result_state::pending:
FXL_NOTREACHED();
return;
}
}));
}
} // namespace storage