blob: 39f60d5f8b77914df6da5967b57fb3f3c90320fa [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/app/page_eviction_manager_impl.h"
#include <lib/async/cpp/task.h>
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/zx/time.h>
#include <algorithm>
#include "src/ledger/bin/app/constants.h"
#include "src/ledger/bin/app/ledger_repository_impl.h"
#include "src/ledger/bin/app/page_usage_db.h"
#include "src/ledger/bin/app/types.h"
#include "src/ledger/bin/storage/public/constants.h"
#include "src/ledger/bin/storage/public/page_storage.h"
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/coroutine/coroutine.h"
#include "src/ledger/lib/coroutine/coroutine_waiter.h"
#include "src/lib/files/directory.h"
#include "src/lib/fxl/strings/concatenate.h"
namespace ledger {
namespace {
// Logs an error message if the given |status| is not |OK| or |INTERRUPTED|.
void LogOnPageUpdateError(fxl::StringView operation_description,
storage::Status status, fxl::StringView ledger_name,
storage::PageIdView page_id) {
// Don't print an error on |INTERRUPED|: it means that the operation was
// interrupted, because PageEvictionManagerImpl was destroyed before being
// empty.
if (status != storage::Status::OK &&
status != storage::Status::INTERRUPTED) {
FXL_LOG(ERROR) << "Failed to " << operation_description
<< " in PageUsage DB. storage::Status: "
<< fidl::ToUnderlying(status)
<< ". Ledger name: " << ledger_name
<< ". Page ID: " << convert::ToHex(page_id);
}
}
// If the given |status| is not |OK| or |INTERRUPTED|, logs an error message on
// failure to initialize. Returns true in case of error; false otherwise.
bool LogOnInitializationError(fxl::StringView operation_description,
storage::Status status) {
if (status != storage::Status::OK) {
if (status != storage::Status::INTERRUPTED) {
FXL_LOG(ERROR) << operation_description
<< " failed because of initialization error: "
<< fidl::ToUnderlying(status);
}
return true;
}
return false;
}
} // namespace
PageEvictionManagerImpl::Completer::Completer() {}
PageEvictionManagerImpl::Completer::~Completer() {
// We should not call the callbacks: they are SyncCall callbacks, so when we
// drop them the caller will receive |INTERRUPTED|.
}
void PageEvictionManagerImpl::Completer::Complete(storage::Status status) {
FXL_DCHECK(!completed_);
// If we get |INTERRUPTED| here, it means the caller did not return as soon as
// it received |INTERRUPTED|.
FXL_DCHECK(status != storage::Status::INTERRUPTED);
CallCallbacks(status);
}
storage::Status PageEvictionManagerImpl::Completer::WaitUntilDone(
coroutine::CoroutineHandler* handler) {
if (completed_) {
return status_;
}
auto sync_call_status =
coroutine::SyncCall(handler, [this](fit::closure callback) {
// SyncCall finishes its execution when the given |callback| is called.
// To block the termination of |SyncCall| (and of |WaitUntilDone|), here
// we push this |callback| in the vector of |callbacks_|. Once
// |Complete| is called, we will call all of these callbacks, which will
// eventually unblock all pending |WaitUntilDone| calls.
callbacks_.push_back(std::move(callback));
});
if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) {
return storage::Status::INTERRUPTED;
}
return status_;
}
void PageEvictionManagerImpl::Completer::CallCallbacks(storage::Status status) {
if (completed_) {
return;
}
completed_ = true;
status_ = status;
// We need to move the callbacks in the stack since calling any of the
// them might lead to the deletion of this object, invalidating callbacks_.
std::vector<fit::closure> callbacks = std::move(callbacks_);
callbacks_.clear();
for (const auto& callback : callbacks) {
callback();
}
}
void PageEvictionManagerImpl::Completer::Cancel() {
FXL_DCHECK(!completed_);
completed_ = true;
status_ = storage::Status::INTERRUPTED;
callbacks_.clear();
}
PageEvictionManagerImpl::PageEvictionManagerImpl(Environment* environment,
storage::DbFactory* db_factory,
DetachedPath db_path)
: environment_(environment),
db_factory_(db_factory),
db_path_(db_path.SubPath(kPageUsageDbSerializationVersion)),
coroutine_manager_(environment_->coroutine_service()),
weak_factory_(this) {}
PageEvictionManagerImpl::~PageEvictionManagerImpl() {}
void PageEvictionManagerImpl::Init() {
// Initializing the DB and marking pages as closed are slow operations and we
// shouldn't wait for them to finish, before returning from initialization:
// Start these operations and finalize the initialization completer when done.
coroutine_manager_.StartCoroutine([this](
coroutine::CoroutineHandler* handler) {
ExpiringToken token = NewExpiringToken();
if (!files::CreateDirectoryAt(db_path_.root_fd(), db_path_.path())) {
initialization_completer_.Complete(storage::Status::IO_ERROR);
return;
}
storage::Status status;
std::unique_ptr<storage::Db> db_instance;
if (coroutine::SyncCall(
handler,
[this](fit::function<void(storage::Status,
std::unique_ptr<storage::Db>)>
callback) {
db_factory_->GetOrCreateDb(
std::move(db_path_), storage::DbFactory::OnDbNotFound::CREATE,
std::move(callback));
},
&status,
&db_instance) == coroutine::ContinuationStatus::INTERRUPTED) {
initialization_completer_.Cancel();
return;
}
if (status != storage::Status::OK) {
initialization_completer_.Complete(status);
return;
}
db_ = std::make_unique<PageUsageDb>(environment_->clock(),
std::move(db_instance));
status = db_->MarkAllPagesClosed(handler);
if (status == storage::Status::INTERRUPTED) {
initialization_completer_.Cancel();
return;
}
initialization_completer_.Complete(status);
});
}
void PageEvictionManagerImpl::SetDelegate(
PageEvictionManager::Delegate* delegate) {
FXL_DCHECK(delegate);
FXL_DCHECK(!delegate_);
delegate_ = delegate;
}
void PageEvictionManagerImpl::set_on_empty(fit::closure on_empty_callback) {
on_empty_callback_ = std::move(on_empty_callback);
}
bool PageEvictionManagerImpl::IsEmpty() { return pending_operations_ == 0; }
void PageEvictionManagerImpl::TryEvictPages(
PageEvictionPolicy* policy, fit::function<void(storage::Status)> callback) {
coroutine_manager_.StartCoroutine(
std::move(callback),
[this, policy](coroutine::CoroutineHandler* handler,
fit::function<void(storage::Status)> callback) mutable {
ExpiringToken token = NewExpiringToken();
storage::Status status =
initialization_completer_.WaitUntilDone(handler);
if (LogOnInitializationError("TryEvictPages", status)) {
callback(status);
return;
}
std::unique_ptr<storage::Iterator<const PageInfo>> pages_it;
status = db_->GetPages(handler, &pages_it);
if (status != storage::Status::OK) {
callback(status);
return;
}
policy->SelectAndEvict(std::move(pages_it), std::move(callback));
});
}
void PageEvictionManagerImpl::MarkPageOpened(fxl::StringView ledger_name,
storage::PageIdView page_id) {
coroutine_manager_.StartCoroutine([this, ledger_name = ledger_name.ToString(),
page_id = page_id.ToString()](
coroutine::CoroutineHandler* handler) {
ExpiringToken token = NewExpiringToken();
storage::Status status = initialization_completer_.WaitUntilDone(handler);
if (LogOnInitializationError("MarkPageOpened", status)) {
return;
}
status = db_->MarkPageOpened(handler, ledger_name, page_id);
LogOnPageUpdateError("mark page as opened", status, ledger_name, page_id);
});
}
void PageEvictionManagerImpl::MarkPageClosed(fxl::StringView ledger_name,
storage::PageIdView page_id) {
coroutine_manager_.StartCoroutine([this, ledger_name = ledger_name.ToString(),
page_id = page_id.ToString()](
coroutine::CoroutineHandler* handler) {
ExpiringToken token = NewExpiringToken();
storage::Status status = initialization_completer_.WaitUntilDone(handler);
if (LogOnInitializationError("MarkPageClosed", status)) {
return;
}
status = db_->MarkPageClosed(handler, ledger_name, page_id);
LogOnPageUpdateError("mark page as closed", status, ledger_name, page_id);
});
}
void PageEvictionManagerImpl::TryEvictPage(
fxl::StringView ledger_name, storage::PageIdView page_id,
PageEvictionCondition condition,
fit::function<void(storage::Status, PageWasEvicted)> callback) {
coroutine_manager_.StartCoroutine(
std::move(callback),
[this, ledger_name = ledger_name.ToString(), page_id = page_id.ToString(),
condition](coroutine::CoroutineHandler* handler,
fit::function<void(storage::Status, PageWasEvicted)>
callback) mutable {
ExpiringToken token = NewExpiringToken();
storage::Status status =
initialization_completer_.WaitUntilDone(handler);
if (LogOnInitializationError("TryEvictPage", status)) {
callback(status, PageWasEvicted(false));
return;
}
PageWasEvicted was_evicted;
status = SynchronousTryEvictPage(handler, ledger_name, page_id,
condition, &was_evicted);
callback(status, was_evicted);
});
}
void PageEvictionManagerImpl::EvictPage(
fxl::StringView ledger_name, storage::PageIdView page_id,
fit::function<void(storage::Status)> callback) {
FXL_DCHECK(delegate_);
// We cannot delete the page storage and mark the deletion atomically. We thus
// delete the page first, and then mark it as evicted in Page Usage DB.
delegate_->DeletePageStorage(
ledger_name, page_id,
[this, ledger_name = ledger_name.ToString(), page_id = page_id.ToString(),
callback = std::move(callback)](storage::Status status) mutable {
// |PAGE_NOT_FOUND| is not an error, but it must have been handled
// before we try to evict the page.
FXL_DCHECK(status != storage::Status::PAGE_NOT_FOUND);
if (status == storage::Status::OK) {
MarkPageEvicted(std::move(ledger_name), std::move(page_id));
}
callback(status);
});
}
storage::Status PageEvictionManagerImpl::CanEvictPage(
coroutine::CoroutineHandler* handler, fxl::StringView ledger_name,
storage::PageIdView page_id, bool* can_evict) {
FXL_DCHECK(delegate_);
auto waiter = fxl::MakeRefCounted<
callback::Waiter<storage::Status, PagePredicateResult>>(
storage::Status::OK);
delegate_->PageIsClosedAndSynced(ledger_name, page_id, waiter->NewCallback());
delegate_->PageIsClosedOfflineAndEmpty(ledger_name, page_id,
waiter->NewCallback());
storage::Status status;
std::vector<PagePredicateResult> can_evict_states;
auto sync_call_status =
coroutine::Wait(handler, std::move(waiter), &status, &can_evict_states);
if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) {
return storage::Status::INTERRUPTED;
}
if (status != storage::Status::OK) {
return status;
}
FXL_DCHECK(can_evict_states.size() == 2);
// Receiving status |PAGE_OPENED| means that the page was opened during the
// query. If either result is |PAGE_OPENED| the page cannot be evicted, as the
// result of the other might be invalid at this point.
*can_evict = std::any_of(can_evict_states.begin(), can_evict_states.end(),
[](PagePredicateResult result) {
return result == PagePredicateResult::YES;
}) &&
std::none_of(can_evict_states.begin(), can_evict_states.end(),
[](PagePredicateResult result) {
return result == PagePredicateResult::PAGE_OPENED;
});
return storage::Status::OK;
}
storage::Status PageEvictionManagerImpl::CanEvictEmptyPage(
coroutine::CoroutineHandler* handler, fxl::StringView ledger_name,
storage::PageIdView page_id, bool* can_evict) {
FXL_DCHECK(delegate_);
storage::Status status;
PagePredicateResult empty_state;
auto sync_call_status = coroutine::SyncCall(
handler,
[this, ledger_name = ledger_name.ToString(),
page_id = page_id.ToString()](auto callback) {
delegate_->PageIsClosedOfflineAndEmpty(ledger_name, page_id,
std::move(callback));
},
&status, &empty_state);
if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) {
return storage::Status::INTERRUPTED;
}
*can_evict = (empty_state == PagePredicateResult::YES);
return status;
}
void PageEvictionManagerImpl::MarkPageEvicted(std::string ledger_name,
storage::PageId page_id) {
coroutine_manager_.StartCoroutine([this, ledger_name = std::move(ledger_name),
page_id = std::move(page_id)](
coroutine::CoroutineHandler* handler) {
storage::Status status =
db_->MarkPageEvicted(handler, ledger_name, page_id);
LogOnPageUpdateError("mark page as evicted", status, ledger_name, page_id);
});
}
storage::Status PageEvictionManagerImpl::SynchronousTryEvictPage(
coroutine::CoroutineHandler* handler, std::string ledger_name,
storage::PageId page_id, PageEvictionCondition condition,
PageWasEvicted* was_evicted) {
bool can_evict;
storage::Status status;
switch (condition) {
case IF_EMPTY:
status = CanEvictEmptyPage(handler, ledger_name, page_id, &can_evict);
break;
case IF_POSSIBLE:
status = CanEvictPage(handler, ledger_name, page_id, &can_evict);
}
if (status == storage::Status::PAGE_NOT_FOUND) {
// |PAGE_NOT_FOUND| is not an error: It is possible that the page was
// removed in a previous run, but for some reason marking failed (e.g.
// Ledger was shut down before the operation finished). Mark the page as
// evicted in Page Usage DB, and set |was_evicted| to false, since the page
// was not actually evicted here.
MarkPageEvicted(ledger_name, page_id);
*was_evicted = PageWasEvicted(false);
return storage::Status::OK;
}
if (status != storage::Status::OK || !can_evict) {
*was_evicted = PageWasEvicted(false);
return status;
}
// At this point, the requirements for calling |EvictPage| are met: the page
// exists and can be evicted.
auto sync_call_status = coroutine::SyncCall(
handler,
[this, ledger_name = std::move(ledger_name),
page_id = std::move(page_id)](auto callback) {
EvictPage(ledger_name, page_id, std::move(callback));
},
&status);
if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) {
return storage::Status::INTERRUPTED;
}
*was_evicted = PageWasEvicted(status == storage::Status::OK);
return status;
}
ExpiringToken PageEvictionManagerImpl::NewExpiringToken() {
++pending_operations_;
return ExpiringToken(callback::MakeScoped(weak_factory_.GetWeakPtr(), [this] {
--pending_operations_;
// We need to post a task here: Tokens expire while a coroutine is being
// executed, and if |on_empty_callback_| is executed directly, it might end
// up deleting the PageEvictionManagerImpl object, which will delete the
// |coroutine_manager_|.
async::PostTask(environment_->dispatcher(),
callback::MakeScoped(weak_factory_.GetWeakPtr(), [this] {
if (on_empty_callback_ && pending_operations_ == 0) {
on_empty_callback_();
}
}));
}));
}
} // namespace ledger