| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/storage/impl/leveldb_factory.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async_promise/executor.h> |
| #include <lib/callback/auto_cleanable.h> |
| #include <lib/callback/scoped_callback.h> |
| #include <lib/callback/trace_callback.h> |
| #include <lib/fit/bridge.h> |
| #include <lib/fit/promise.h> |
| #include <lib/fit/result.h> |
| #include <lib/fit/scope.h> |
| #include <trace/event.h> |
| |
| #include <mutex> |
| |
| #include "peridot/lib/convert/convert.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/lib/files/directory.h" |
| #include "src/lib/files/unique_fd.h" |
| #include "src/lib/fxl/logging.h" |
| #include "src/lib/fxl/memory/ref_counted.h" |
| #include "src/lib/fxl/memory/ref_ptr.h" |
| #include "src/lib/fxl/strings/string_view.h" |
| #include "src/lib/fxl/synchronization/thread_annotations.h" |
| |
| // LevelDbFactory tries to keep an empty, initialized instance of LevelDb always |
| // available. It stores this cached instance under cached_db/. |
| // |
| // On requests for new LevelDb instances (see |GetOrCreateDb|), if the cached |
| // instance is ready, it is moved to the requested destination and then a new |
| // LevelDb is prepared to be cached. If the cached instance is not yet |
| // available, the request is queued, and will be handled when the cached db is |
| // ready. |
| // |
| // Note that if multiple requests are received while waiting for the LevelDb |
| // initialization, only the first one is queued up. The rest directly request a |
| // new LevelDb instance at the final destination. |
| |
| namespace storage { |
| namespace { |
| |
| // TODO(LE-635): We need to clean the staging path, so that we don't leave |
| // unreachable storage on disk. |
| constexpr fxl::StringView kStagingPath = "staging"; |
| constexpr fxl::StringView kCachedDbPath = "cached_db"; |
| |
| constexpr size_t kRandomBytesCount = 16; |
| |
| // Returns whether the parent directory of |path| exists. If it is not possible |
| // to access the parent directory, returns whether the given |path| exists. |
| bool ParentDirectoryExists(ledger::DetachedPath path) { |
| size_t last_slash = path.path().find_last_of('/'); |
| return files::IsDirectoryAt(path.root_fd(), |
| last_slash == std::string::npos |
| ? path.path() |
| : path.path().substr(0, last_slash)); |
| } |
| |
| enum class CreateInStagingPath : bool { |
| NO, |
| YES, |
| }; |
| |
| // LockingWrapper that allows to block the execution of wrapped promises. |
| class LockingWrapper { |
| public: |
| LockingWrapper() = default; |
| |
| // Wrapper implementation, as expected by fit::promise: |
| template <class Promise> |
| decltype(auto) wrap(Promise promise) { |
| assert(promise); |
| return fit::make_promise_with_continuation( |
| LockingWrappedContinuation<Promise>(this, std::move(promise))); |
| } |
| |
| // Acquire a lock on the promise execution, effectively blocking any wrapped |
| // promise once acquired. |
| std::unique_lock<std::mutex> lock() { |
| return std::unique_lock<std::mutex>(mutex_); |
| } |
| |
| private: |
| // Promise continuation that acquires the shared lock from LockingWrapper |
| // before executing the promise. |
| template <class Promise> |
| class LockingWrappedContinuation { |
| public: |
| explicit LockingWrappedContinuation(LockingWrapper* wrapper, |
| Promise promise) |
| : wrapper_(wrapper), promise_(std::move(promise)) {} |
| |
| // Executes the wrapped promise. |
| typename Promise::result_type operator()(fit::context& context) { |
| std::lock_guard<std::mutex> lg(wrapper_->mutex_); |
| return promise_(context); |
| } |
| |
| private: |
| LockingWrapper* const wrapper_; |
| Promise promise_; |
| }; |
| |
| std::mutex mutex_; |
| }; |
| |
| // ScopedExecutor is a proxy for async::Executor that ensures that all tasks |
| // scheduled on it can be stopped from another thread. |
| class ScopedAsyncExecutor { |
| public: |
| // Creates a ScopedAsyncExecutor using the provided async loop dispatcher. |
| ScopedAsyncExecutor(async_dispatcher_t* dispatcher) : executor_(dispatcher) { |
| scope_.emplace(); |
| } |
| |
| ~ScopedAsyncExecutor() { FXL_DCHECK(stopped_); } |
| |
| // fit::executor: |
| void schedule_task(fit::promise<> task) { |
| if (stopped_) { |
| return; |
| } |
| // The wrapping order is important: by putting the locking wrapper after the |
| // scope wrapper, we ensure that tasks are first locked before we check for |
| // the scope's destruction. Thus, each promise is wrapped twice: |
| // LockingWrapper[fit::scope[promise]] |
| // This way, when we want to stop the executor and acquire the lock, we are |
| // sure that the scoped promises are not executing because they are still at |
| // the locking step. Once the scope is deleted and the lock released, the |
| // executor will try to execute the scoped promises, and exit early. |
| executor_.schedule_task(task.wrap_with(*scope_).wrap_with(wrapper_)); |
| } |
| |
| // Stop the executor. Once this method returns, it is guaranteed that no code |
| // provided to |schedule_task| will be executed. It is however unsafe to |
| // delete the class object at this point if |Stop| has been called on a |
| // different thread than the one used by this executor, as management code may |
| // still be running. |
| void Stop() { |
| auto lock = wrapper_.lock(); |
| scope_.reset(); |
| stopped_ = true; |
| } |
| |
| private: |
| bool stopped_ = false; |
| async::Executor executor_; |
| std::optional<fit::scope> scope_; |
| LockingWrapper wrapper_; |
| }; |
| } // namespace |
| |
| // IOLevelDbFactory holds all operations happening on the IO thread. |
| class LevelDbFactory::IOLevelDbFactory { |
| public: |
| IOLevelDbFactory(ledger::Environment* environment, |
| ledger::DetachedPath cache_path) |
| : environment_(environment), |
| staging_path_(cache_path.SubPath(kStagingPath)), |
| cached_db_path_(cache_path.SubPath(kCachedDbPath)), |
| io_executor_(environment_->io_dispatcher()) {} |
| |
| // Initialize the IO factory. |
| void Init(); |
| |
| // Returns through the completer a LevelDB database, initialized on the IO |
| // thread. |
| void GetOrCreateDb(ledger::DetachedPath db_path, |
| DbFactory::OnDbNotFound on_db_not_found, |
| fit::completer<std::unique_ptr<Db>, Status> completer); |
| |
| // Self-destructs this class on the IO thread. |
| // |io_executor_| can't be destroyed when a task is in progress, and |
| // |io_executor_| tasks use member variables to operate. Thus, by scheduling |
| // the deletion of this class on the same dispatcher as the |io_executor_|, we |
| // ensure that |io_executor_| is destroyed when no task is running, that |
| // no task will access member variables after their destruction, and that we |
| // are not blocking the main thread while doing this. |
| void SelfDestruct(std::unique_ptr<LevelDbFactory::IOLevelDbFactory> self); |
| |
| private: |
| // Gets or creates a new LevelDb instance in the given |db_path|, |
| // initializes and then returns it through the completer. Callers should |
| // execute the returned promise, as it would contain any deferred |
| // computation, if deferred computation is needed. This method should be |
| // called on the I/O thread. |
| fit::promise<> GetOrCreateDbOnIOThread( |
| ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found, |
| fit::completer<std::unique_ptr<Db>, Status> completer); |
| |
| // Gets or creates a new LevelDb instance. |
| // This method should be called on the I/O thread. |
| fit::result<std::unique_ptr<Db>, Status> GetOrCreateDbAtPathOnIOThread( |
| ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path); |
| |
| // Synchronously creates and initializes a new LevelDb instance in a two-step |
| // process: the new instance is created in a temporary directory under the |
| // staging path and, if successful, it is then moved to the given |db_path|. |
| // This way, if initialization is interrupted, the potentially corrupted |
| // database will be in the staging area. |
| // This method should be called on the I/O thread. |
| Status CreateDbThroughStagingPathOnIOThread(ledger::DetachedPath db_path, |
| std::unique_ptr<LevelDb>* db); |
| |
| // Synchronously creates a new cached DB in the cached db path. |
| // This method should be called on the I/O thread. |
| fit::result<std::unique_ptr<Db>, Status> PrepareCachedDbOnIOThread( |
| CreateInStagingPath create_in_staging_path); |
| |
| // Sychronously prepares a precached DB for normal use. |
| // This method should be called on the I/O thread. |
| fit::result<std::unique_ptr<Db>, Status> ReturnPrecachedDbOnIOThread( |
| ledger::DetachedPath db_path, |
| fit::result<std::unique_ptr<Db>, Status> result); |
| |
| // We hold a cached database to speed up initialization. |cached_db_| is only |
| // manipulated on the I/O thread. |
| fit::future<std::unique_ptr<Db>, Status> cached_db_; |
| |
| ledger::Environment* const environment_; |
| // The path where new LevelDb instances are created, before they are moved to |
| // their final destination, or the cached db path. |
| const ledger::DetachedPath staging_path_; |
| // The path that keeps the initialized cached instance of LevelDb. |
| const ledger::DetachedPath cached_db_path_; |
| ScopedAsyncExecutor io_executor_; |
| }; |
| |
| void LevelDbFactory::IOLevelDbFactory::Init() { |
| // If there is already a LevelDb instance in the cache directory, initialize |
| // that one, instead of creating a new one. |
| io_executor_.schedule_task(fit::make_promise([this](fit::context& context) { |
| fit::bridge<std::unique_ptr<Db>, Status> bridge; |
| cached_db_ = bridge.consumer.promise(); |
| CreateInStagingPath create_in_staging_path = |
| static_cast<CreateInStagingPath>(!files::IsDirectoryAt( |
| cached_db_path_.root_fd(), cached_db_path_.path())); |
| auto cache_db_result = PrepareCachedDbOnIOThread(create_in_staging_path); |
| bridge.completer.complete_or_abandon(std::move(cache_db_result)); |
| })); |
| } |
| |
| void LevelDbFactory::IOLevelDbFactory::GetOrCreateDb( |
| ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found, |
| fit::completer<std::unique_ptr<Db>, Status> completer) { |
| io_executor_.schedule_task(fit::make_promise( |
| [this, db_path, on_db_not_found, |
| completer = std::move(completer)](fit::context& context) mutable { |
| return GetOrCreateDbOnIOThread(db_path, on_db_not_found, |
| std::move(completer)); |
| })); |
| } |
| |
| void LevelDbFactory::IOLevelDbFactory::SelfDestruct( |
| std::unique_ptr<LevelDbFactory::IOLevelDbFactory> self) { |
| FXL_DCHECK(self.get() == this); |
| io_executor_.Stop(); |
| async::PostTask(environment_->io_dispatcher(), [self = std::move(self)]() {}); |
| } |
| |
| fit::promise<> LevelDbFactory::IOLevelDbFactory::GetOrCreateDbOnIOThread( |
| ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found, |
| fit::completer<std::unique_ptr<Db>, Status> completer) { |
| if (files::IsDirectoryAt(db_path.root_fd(), db_path.path())) { |
| // If the path exists, there is a LevelDb instance already there. Open and |
| // return it. |
| auto result = GetOrCreateDbAtPathOnIOThread(std::move(db_path), |
| CreateInStagingPath::NO); |
| completer.complete_or_abandon(std::move(result)); |
| return fit::promise<>(); |
| } |
| |
| if (on_db_not_found == DbFactory::OnDbNotFound::RETURN) { |
| completer.complete_or_abandon(fit::error(Status::PAGE_NOT_FOUND)); |
| return fit::promise<>(); |
| } |
| |
| switch (cached_db_.state()) { |
| case fit::future_state::ok: |
| completer.complete_or_abandon(ReturnPrecachedDbOnIOThread( |
| std::move(db_path), cached_db_.take_result())); |
| return fit::promise<>(); |
| case fit::future_state::pending: |
| return cached_db_.take_promise().then( |
| [this, db_path, completer = std::move(completer)]( |
| fit::result<std::unique_ptr<Db>, Status>& cache_result) mutable |
| -> void { |
| completer.complete_or_abandon(ReturnPrecachedDbOnIOThread( |
| std::move(db_path), std::move(cache_result))); |
| }); |
| case fit::future_state::empty: |
| // If creating the pre-cached db failed at some point it will likely fail |
| // again. Don't retry caching anymore. |
| case fit::future_state::error: { |
| // Either creation of a cached db has failed or a previous request is |
| // already waiting for the cached instance. Request a new LevelDb at the |
| // final destination. |
| completer.complete_or_abandon(GetOrCreateDbAtPathOnIOThread( |
| std::move(db_path), CreateInStagingPath::YES)); |
| return fit::promise<>(); |
| } |
| } |
| } |
| |
| fit::result<std::unique_ptr<Db>, Status> |
| LevelDbFactory::IOLevelDbFactory::GetOrCreateDbAtPathOnIOThread( |
| ledger::DetachedPath db_path, CreateInStagingPath create_in_staging_path) { |
| std::unique_ptr<LevelDb> leveldb; |
| Status status; |
| if (create_in_staging_path == CreateInStagingPath::YES) { |
| status = CreateDbThroughStagingPathOnIOThread(std::move(db_path), &leveldb); |
| } else { |
| FXL_DCHECK(files::IsDirectoryAt(db_path.root_fd(), db_path.path())); |
| leveldb = std::make_unique<LevelDb>(environment_->dispatcher(), |
| std::move(db_path)); |
| status = leveldb->Init(); |
| } |
| if (status != Status::OK) { |
| return fit::error(status); |
| } |
| return fit::ok(std::move(leveldb)); |
| } |
| |
| Status LevelDbFactory::IOLevelDbFactory::CreateDbThroughStagingPathOnIOThread( |
| ledger::DetachedPath db_path, std::unique_ptr<LevelDb>* db) { |
| char name[kRandomBytesCount]; |
| environment_->random()->Draw(name, kRandomBytesCount); |
| ledger::DetachedPath tmp_destination = staging_path_.SubPath( |
| convert::ToHex(fxl::StringView(name, kRandomBytesCount))); |
| // Create a LevelDb instance in a temporary path. |
| auto result = |
| std::make_unique<LevelDb>(environment_->dispatcher(), tmp_destination); |
| Status status = result->Init(); |
| if (status != Status::OK) { |
| return status; |
| } |
| // If the parent directory doesn't exist, renameat will fail. |
| // Note that |cached_db_path_| will also be created throught the staging path |
| // and thus, this code path will be reached. Its parent directory is lazily |
| // created when result->Init() (see code above) is called: |
| // - |staging_path_| and |cached_db_path_| share the same parent (the |
| // |cache_path| given on the constructor), and |
| // - in LevelDb initialization, the directories up to the db path are created. |
| FXL_DCHECK(ParentDirectoryExists(db_path)) |
| << "Parent directory does not exit for path: " << db_path.path(); |
| // Move it to the final destination. |
| if (renameat(tmp_destination.root_fd(), tmp_destination.path().c_str(), |
| db_path.root_fd(), db_path.path().c_str()) != 0) { |
| FXL_LOG(ERROR) << "Unable to move LevelDb from staging path to final " |
| "destination: " |
| << db_path.path() << ". Error: " << strerror(errno); |
| return Status::IO_ERROR; |
| } |
| *db = std::move(result); |
| return Status::OK; |
| } |
| |
| fit::result<std::unique_ptr<Db>, Status> |
| LevelDbFactory::IOLevelDbFactory::PrepareCachedDbOnIOThread( |
| CreateInStagingPath create_in_staging_path) { |
| TRACE_DURATION("ledger", "prepare_cached_db"); |
| return GetOrCreateDbAtPathOnIOThread(cached_db_path_, create_in_staging_path); |
| } |
| |
| fit::result<std::unique_ptr<Db>, Status> |
| LevelDbFactory::IOLevelDbFactory::ReturnPrecachedDbOnIOThread( |
| ledger::DetachedPath db_path, |
| fit::result<std::unique_ptr<Db>, Status> result) { |
| if (result.is_error()) { |
| // If we failed to create a cached db instance, any future attempts will |
| // likely fail as well: just return the error, and subsequent attempts will |
| // not attempt to use a cached DB. |
| return result; |
| } |
| |
| // Move the cached db to the final destination. |
| if (renameat(cached_db_path_.root_fd(), cached_db_path_.path().c_str(), |
| db_path.root_fd(), db_path.path().c_str()) != 0) { |
| FXL_LOG(ERROR) << "Unable to move LevelDb from: " << cached_db_path_.path() |
| << " to final destination: " << db_path.path() |
| << ". Error: " << strerror(errno); |
| // Moving to the final destination failed, but the cached db was created |
| // succesfully: we fail, and we'll retry the cached db next time. |
| fit::bridge<std::unique_ptr<Db>, Status> bridge; |
| cached_db_ = bridge.consumer.promise(); |
| bridge.completer.complete_or_abandon(std::move(result)); |
| return fit::error(Status::IO_ERROR); |
| } |
| |
| // Asynchronously start preparing the next cached db. |
| fit::bridge<std::unique_ptr<Db>, Status> bridge; |
| cached_db_ = bridge.consumer.promise(); |
| io_executor_.schedule_task( |
| fit::make_promise([this, completer = std::move(bridge.completer)]( |
| fit::context& context) mutable { |
| auto cache_db_result = |
| PrepareCachedDbOnIOThread(CreateInStagingPath::YES); |
| completer.complete_or_abandon(std::move(cache_db_result)); |
| })); |
| return result; |
| } |
| |
| LevelDbFactory::LevelDbFactory(ledger::Environment* environment, |
| ledger::DetachedPath cache_path) |
| : main_executor_(environment->dispatcher()) { |
| io_level_db_factory_ = |
| std::make_unique<IOLevelDbFactory>(environment, cache_path); |
| } |
| |
| LevelDbFactory::~LevelDbFactory() { |
| io_level_db_factory_->SelfDestruct(std::move(io_level_db_factory_)); |
| } |
| |
| void LevelDbFactory::Init() { io_level_db_factory_->Init(); } |
| void LevelDbFactory::GetOrCreateDb( |
| ledger::DetachedPath db_path, DbFactory::OnDbNotFound on_db_not_found, |
| fit::function<void(Status, std::unique_ptr<Db>)> callback) { |
| fit::bridge<std::unique_ptr<Db>, Status> bridge; |
| io_level_db_factory_->GetOrCreateDb(db_path, on_db_not_found, |
| std::move(bridge.completer)); |
| |
| main_executor_.schedule_task( |
| bridge.consumer.promise_or(fit::error(Status::ILLEGAL_STATE)) |
| .then([callback = std::move(callback)]( |
| fit::result<std::unique_ptr<Db>, Status>& result) { |
| switch (result.state()) { |
| case fit::result_state::error: |
| callback(result.take_error(), nullptr); |
| return; |
| case fit::result_state::ok: |
| callback(Status::OK, result.take_value()); |
| return; |
| case fit::result_state::pending: |
| FXL_NOTREACHED(); |
| return; |
| } |
| })); |
| } |
| |
| } // namespace storage |