blob: 4434077208c427eae24b06551e1012b42de287eb [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/impl/leveldb_factory.h"
#include <mutex>
#include <lib/async/cpp/task.h>
#include <lib/callback/scoped_callback.h>
#include <lib/callback/trace_callback.h>
#include <lib/fxl/files/directory.h>
#include <lib/fxl/files/scoped_temp_dir.h>
#include <lib/fxl/memory/ref_counted.h>
#include <lib/fxl/memory/ref_ptr.h>
#include <lib/fxl/strings/string_view.h>
#include <lib/fxl/synchronization/thread_annotations.h>
#include "peridot/lib/convert/convert.h"
// LevelDbFactory tries to keep an empty, initialized instance of LevelDb always
// available. It stores this cached instance under cached_db/.
//
// On requests for new LevelDb instances (see |GetOrCreateDb|), if the cached
// instance is ready, it is moved to the requested destination and then a new
// LevelDb is prepared to be cached. If the cached instance is not yet
// available, the request is queued, and will be handled when the cached db is
// ready.
//
// Note that if multiple requests are received while waiting for the LevelDb
// initialization, only the first one is queued up. The rest directly request a
// new LevelDb instance at the final destination.
namespace storage {
namespace {
// TODO(LE-635): We need to clean the staging path, so that we don't leave
// unreachable storage on disk.
constexpr fxl::StringView kStagingPath = "staging";
constexpr fxl::StringView kCachedDbPath = "cached_db";
constexpr size_t kRandomBytesCount = 16;
// Returns whether the parent directory of |path| exists. If it is not possible
// to access the parent directory, returns whether the given |path| exists.
bool ParentDirectoryExists(ledger::DetachedPath path) {
size_t last_slash = path.path().find_last_of('/');
return files::IsDirectoryAt(path.root_fd(),
last_slash == std::string::npos
? path.path()
: path.path().substr(0, last_slash));
}
} // namespace
enum class LevelDbFactory::CreateInStagingPath : bool {
NO,
YES,
};
// Holds the LevelDb object together with the information on its initialization
// state, allowing the coordination between the main and the I/O thread for
// the creation of new LevelDb objects.
struct LevelDbFactory::DbWithInitializationState
: public fxl::RefCountedThreadSafe<
LevelDbFactory::DbWithInitializationState> {
public:
// The mutex used to avoid concurrency issues during initialization.
std::mutex mutex;
// Whether the initialization has been cancelled. This information is known on
// the main thread, which is the only one that should update this field if
// needed. The I/O thread should read |cancelled| to know whether to proceed
// with completing the requested initialization.
bool cancelled FXL_GUARDED_BY(mutex) = false;
// The LevelDb object itself should only be initialized while holding the
// mutex, to prevent a race condition when cancelling from the main thread.
std::unique_ptr<LevelDb> db FXL_GUARDED_BY(mutex) = nullptr;
private:
FRIEND_REF_COUNTED_THREAD_SAFE(DbWithInitializationState);
FRIEND_MAKE_REF_COUNTED(DbWithInitializationState);
DbWithInitializationState() {}
~DbWithInitializationState() {}
};
LevelDbFactory::LevelDbFactory(ledger::Environment* environment,
ledger::DetachedPath cache_path)
: environment_(environment),
staging_path_(cache_path.SubPath(kStagingPath)),
cached_db_path_(cache_path.SubPath(kCachedDbPath)),
coroutine_manager_(environment->coroutine_service()),
weak_factory_(this) {}
void LevelDbFactory::Init() {
// If there is already a LevelDb instance in the cache directory, initialize
// that one, instead of creating a new one.
CreateInStagingPath create_in_staging_path = static_cast<CreateInStagingPath>(
!files::IsDirectoryAt(cached_db_path_.root_fd(), cached_db_path_.path()));
PrepareCachedDb(create_in_staging_path);
}
void LevelDbFactory::GetOrCreateDb(
ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found,
fit::function<void(Status, std::unique_ptr<Db>)> callback) {
if (files::IsDirectoryAt(db_path.root_fd(), db_path.path())) {
// If the path exists, there is a LevelDb instance already there. Open and
// return it.
GetOrCreateDbAtPath(std::move(db_path), CreateInStagingPath::NO,
std::move(callback));
return;
}
if (on_db_not_found == DbFactory::OnDbNotFound::RETURN) {
callback(Status::NOT_FOUND, nullptr);
return;
}
// If creating the pre-cached db failed at some point it will likely fail
// again. Don't retry caching anymore.
if (cached_db_status_ == Status::OK) {
if (cached_db_is_ready_) {
// A cached instance is available. Use that one for the given callback.
ReturnPrecachedDb(std::move(db_path), std::move(callback));
return;
}
if (!pending_request_) {
// The cached instance is not ready yet, and there are no other pending
// requests. Store this one as pending until the cached db is ready.
pending_request_path_ = std::move(db_path);
pending_request_ = std::move(callback);
return;
}
}
// Either creation of a cached db has failed or a previous request is already
// waiting for the cached instance. Request a new LevelDb at the final
// destination.
GetOrCreateDbAtPath(std::move(db_path), CreateInStagingPath::YES,
std::move(callback));
}
void LevelDbFactory::GetOrCreateDbAtPath(
ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path,
fit::function<void(Status, std::unique_ptr<Db>)> callback) {
coroutine_manager_.StartCoroutine(
std::move(callback),
[this, db_path = std::move(db_path), create_in_staging_path](
coroutine::CoroutineHandler* handler,
fit::function<void(Status, std::unique_ptr<Db>)> callback) {
auto db_with_initialization_state =
fxl::MakeRefCounted<DbWithInitializationState>();
Status status;
if (coroutine::SyncCall(
handler,
[&](fit::function<void(Status)> callback) {
async::PostTask(
environment_->io_dispatcher(),
[this, db_path = std::move(db_path),
create_in_staging_path, db_with_initialization_state,
callback = std::move(callback)]() mutable {
GetOrCreateDbAtPathOnIOThread(
std::move(db_path), create_in_staging_path,
std::move(db_with_initialization_state),
std::move(callback));
});
},
&status) == coroutine::ContinuationStatus::OK) {
// The coroutine returned normally, the initialization was done
// completely on the I/O thread, return normally.
std::lock_guard<std::mutex> guard(
db_with_initialization_state->mutex);
callback(status, std::move(db_with_initialization_state->db));
return;
}
// The coroutine is interrupted, but the initialization has been posted
// on the I/O thread. The lock must be acquired and |cancelled| must be
// set to |true|.
//
// There are 3 cases to consider:
// 1. The lock is acquired before |GetOrCreateDbAtPathOnIOThread| is
// called. |cancelled| will be set to |true| and when
// |GetOrCreateDbAtPathOnIOThread| is executed, it will return early.
// 2. The lock is acquired after |GetOrCreateDbAtPathOnIOThread| is
// executed. |GetOrCreateDbAtPathOnIOThread| will not be called
// again, and there is no concurrency issue anymore.
// 3. The lock is tentatively acquired while
// |GetOrCreateDbAtPathOnIOThread| is run. Because
// |GetOrCreateDbAtPathOnIOThread| is guarded by the same mutex, this
// will block until |GetOrCreateDbAtPathOnIOThread| is executed, and
// the case is the same as 2.
std::lock_guard<std::mutex> guard(db_with_initialization_state->mutex);
db_with_initialization_state->cancelled = true;
db_with_initialization_state->db.reset();
callback(Status::INTERRUPTED, nullptr);
});
}
void LevelDbFactory::GetOrCreateDbAtPathOnIOThread(
ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path,
fxl::RefPtr<DbWithInitializationState> db_with_initialization_state,
fit::function<void(Status)> callback) {
std::lock_guard<std::mutex> guard(db_with_initialization_state->mutex);
if (db_with_initialization_state->cancelled) {
return;
}
Status status;
if (create_in_staging_path == CreateInStagingPath::YES) {
status = CreateDbThroughStagingPathOnIOThread(
std::move(db_path), &db_with_initialization_state->db);
} else {
FXL_DCHECK(files::IsDirectoryAt(db_path.root_fd(), db_path.path()));
db_with_initialization_state->db = std::make_unique<LevelDb>(
environment_->dispatcher(), std::move(db_path));
status = db_with_initialization_state->db->Init();
}
if (status != Status::OK) {
// Don't return the created db instance if initialization failed.
db_with_initialization_state->db.reset();
}
async::PostTask(
environment_->dispatcher(),
[status, callback = std::move(callback)]() mutable { callback(status); });
}
Status LevelDbFactory::CreateDbThroughStagingPathOnIOThread(
ledger::DetachedPath db_path, std::unique_ptr<LevelDb>* db) {
char name[kRandomBytesCount];
environment_->random()->Draw(name, kRandomBytesCount);
ledger::DetachedPath tmp_destination = staging_path_.SubPath(
convert::ToHex(fxl::StringView(name, kRandomBytesCount)));
// Create a LevelDb instance in a temporary path.
auto result =
std::make_unique<LevelDb>(environment_->dispatcher(), tmp_destination);
Status status = result->Init();
if (status != Status::OK) {
return status;
}
// If the parent directory doesn't exist, renameat will fail.
// Note that |cached_db_path_| will also be created throught the staging path
// and thus, this code path will be reached. Its parent directory is lazily
// created when result->Init() (see code above) is called:
// - |staging_path_| and |cached_db_path_| share the same parent (the
// |cache_path| given on the constructor), and
// - in LevelDb initialization, the directories up to the db path are created.
FXL_DCHECK(ParentDirectoryExists(db_path))
<< "Parent directory does not exit for path: " << db_path.path();
// Move it to the final destination.
if (renameat(tmp_destination.root_fd(), tmp_destination.path().c_str(),
db_path.root_fd(), db_path.path().c_str()) != 0) {
FXL_LOG(ERROR) << "Unable to move LevelDb from staging path to final "
"destination: "
<< db_path.path() << ". Error: " << strerror(errno);
return Status::IO_ERROR;
}
*db = std::move(result);
return Status::OK;
}
void LevelDbFactory::PrepareCachedDb(
CreateInStagingPath create_in_staging_path) {
TRACE_ASYNC_BEGIN("ledger", "prepare_cached_db", 0);
FXL_DCHECK(!cached_db_is_ready_);
FXL_DCHECK(cached_db_ == nullptr);
GetOrCreateDbAtPath(
cached_db_path_, create_in_staging_path,
callback::MakeScoped(weak_factory_.GetWeakPtr(),
[this](Status status, std::unique_ptr<Db> db) {
TRACE_ASYNC_END("ledger", "prepare_cached_db", 0);
cached_db_status_ = status;
cached_db_ = std::move(db);
cached_db_is_ready_ = true;
if (pending_request_) {
auto path = std::move(pending_request_path_);
auto callback = std::move(pending_request_);
pending_request_ = nullptr;
ReturnPrecachedDb(std::move(path),
std::move(callback));
}
}));
}
void LevelDbFactory::ReturnPrecachedDb(
ledger::DetachedPath db_path,
fit::function<void(Status, std::unique_ptr<Db>)> callback) {
FXL_DCHECK(cached_db_is_ready_);
if (cached_db_status_ != Status::OK) {
// If we failed to create a cached db instance, any future attempts will
// likely fail as well: just return the status and don't update
// |cached_db_is_ready_| or call |PrepareCachedDb|.
callback(cached_db_status_, nullptr);
return;
}
// Move the cached db to the final destination.
if (renameat(cached_db_path_.root_fd(), cached_db_path_.path().c_str(),
db_path.root_fd(), db_path.path().c_str()) != 0) {
FXL_LOG(ERROR) << "Unable to move LevelDb from: " << cached_db_path_.path()
<< " to final destination: " << db_path.path()
<< ". Error: " << strerror(errno);
// Moving to the final destination failed, but the cached db was created
// succesfully: no need to update |cached_db_is_ready_|, |cached_db_status_|
// or |cached_db_|.
callback(Status::IO_ERROR, nullptr);
return;
}
// We know the |cached_db_status_| is |OK| and the db is already in the final
// destination. Asynchronously start preparing the next cached db and then
// call the callback.
auto cached_db = std::move(cached_db_);
cached_db_is_ready_ = false;
PrepareCachedDb(CreateInStagingPath::YES);
callback(Status::OK, std::move(cached_db));
}
} // namespace storage