| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/app/page_manager.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/fidl/cpp/interface_request.h> |
| #include <lib/fit/defer.h> |
| #include <lib/fit/function.h> |
| #include <lib/trace/event.h> |
| |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "src/ledger/bin/app/active_page_manager.h" |
| #include "src/ledger/bin/app/active_page_manager_container.h" |
| #include "src/ledger/bin/app/ledger_impl.h" |
| #include "src/ledger/bin/app/page_utils.h" |
| #include "src/ledger/bin/app/types.h" |
| #include "src/ledger/bin/fidl/include/types.h" |
| #include "src/ledger/bin/p2p_sync/public/page_communicator.h" |
| #include "src/ledger/bin/storage/public/page_storage.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/ledger/lib/callback/ensure_called.h" |
| #include "src/ledger/lib/callback/scoped_callback.h" |
| #include "src/ledger/lib/convert/convert.h" |
| #include "src/ledger/lib/logging/logging.h" |
| #include "src/ledger/lib/memory/weak_ptr.h" |
| |
| namespace ledger { |
| |
| PageManager::PageManager(Environment* environment, std::string ledger_name, storage::PageId page_id, |
| std::vector<PageUsageListener*> page_usage_listeners, |
| storage::LedgerStorage* ledger_storage, |
| sync_coordinator::LedgerSync* ledger_sync, |
| LedgerMergeManager* ledger_merge_manager) |
| : environment_(environment), |
| ledger_name_(std::move(ledger_name)), |
| page_id_(std::move(page_id)), |
| page_usage_listeners_(std::move(page_usage_listeners)), |
| ledger_storage_(ledger_storage), |
| ledger_sync_(ledger_sync), |
| ledger_merge_manager_(ledger_merge_manager), |
| weak_factory_(this) { |
| page_availability_manager_.SetOnDiscardable([this] { CheckDiscardable(); }); |
| } |
| |
| PageManager::~PageManager() = default; |
| |
| fit::closure PageManager::CreateDetacher() { |
| outstanding_detachers_++; |
| return [this]() { |
| outstanding_detachers_--; |
| LEDGER_DCHECK(outstanding_detachers_ >= 0); |
| CheckDiscardable(); |
| }; |
| } |
| |
| void PageManager::PageIsClosedAndSynced( |
| fit::function<void(storage::Status, PagePredicateResult)> callback) { |
| auto is_synced = [](ActivePageManager* active_page_manager, |
| fit::function<void(storage::Status, bool)> on_done) { |
| active_page_manager->IsSynced(std::move(on_done)); |
| }; |
| PageIsClosedAndSatisfiesPredicate(std::move(is_synced), std::move(callback)); |
| } |
| |
| void PageManager::PageIsClosedOfflineAndEmpty( |
| fit::function<void(storage::Status, PagePredicateResult)> callback) { |
| auto is_offline_and_empty = [](ActivePageManager* active_page_manager, |
| fit::function<void(storage::Status, bool)> on_done) { |
| active_page_manager->IsOfflineAndEmpty(std::move(on_done)); |
| }; |
| PageIsClosedAndSatisfiesPredicate(std::move(is_offline_and_empty), std::move(callback)); |
| } |
| |
| void PageManager::DeletePageStorage(fit::function<void(storage::Status)> callback) { |
| if (active_page_manager_container_) { |
| callback(storage::Status::ILLEGAL_STATE); |
| return; |
| } |
| // Block all page requests until MarkAvailable is called. |
| page_availability_manager_.MarkPageBusy(); |
| ledger_storage_->DeletePageStorage( |
| page_id_, MakeScoped(weak_factory_.GetWeakPtr(), |
| [this, callback = std::move(callback)](storage::Status status) { |
| // This may destruct this |
| // |PageManager|. |
| page_availability_manager_.MarkPageAvailable(); |
| callback(status); |
| })); |
| } |
| |
| void PageManager::GetPage(LedgerImpl::Delegate::PageState page_state, |
| fidl::InterfaceRequest<Page> page_request, |
| fit::function<void(storage::Status)> callback) { |
| MaybeMarkPageOpened(); |
| |
| // If we have the page manager ready, just bind the request and return. |
| if (active_page_manager_container_) { |
| active_page_manager_container_.value().BindPage(std::move(page_request), std::move(callback)); |
| return; |
| } |
| |
| ActivePageManagerContainer* container = CreateActivePageManagerContainer(); |
| // TODO(LE-631): We will need to remove empty pages that are unknown to the |
| // user or the page usage database. |
| container->BindPage(std::move(page_request), std::move(callback)); |
| |
| InitActivePageManagerContainer(container, |
| [this, container, page_state](storage::Status status) mutable { |
| // Create the page if it wasn't found. |
| if (status == storage::Status::PAGE_NOT_FOUND) { |
| CreatePageStorage(page_state, container); |
| } |
| }); |
| } |
| |
| void PageManager::SetOnDiscardable(fit::closure on_discardable) { |
| on_discardable_ = std::move(on_discardable); |
| } |
| |
| bool PageManager::IsDiscardable() const { |
| return (!active_page_manager_container_ || active_page_manager_container_->IsDiscardable()) && |
| outstanding_operations_ == 0 && page_availability_manager_.IsDiscardable() && |
| outstanding_detachers_ == 0; |
| } |
| |
| void PageManager::StartPageSync() { |
| // If the active page manager container is open, just return as the sync should have been already |
| // triggered. |
| if (active_page_manager_container_) { |
| return; |
| } |
| |
| // Create the container and set up an active page manager to start sync with the cloud. |
| ActivePageManagerContainer* container = CreateActivePageManagerContainer(); |
| InitActivePageManagerContainer(container, [container](storage::Status status) { |
| // InitActivePageManager does not handle the case of PAGE_NOT_FOUND errors to allow creation of |
| // a new page, but no creation is required here, so status is merely propagated. |
| if (status == storage::Status::PAGE_NOT_FOUND) { |
| container->SetActivePageManager(status, nullptr); |
| } |
| }); |
| } |
| |
| void PageManager::InitActivePageManagerContainer(ActivePageManagerContainer* container, |
| fit::function<void(storage::Status)> callback) { |
| page_availability_manager_.OnPageAvailable([this, container, |
| callback = std::move(callback)]() mutable { |
| ledger_storage_->GetPageStorage( |
| page_id_, |
| [this, container, callback = std::move(callback)]( |
| storage::Status status, std::unique_ptr<storage::PageStorage> page_storage) mutable { |
| if (status != storage::Status::OK && status != storage::Status::PAGE_NOT_FOUND) { |
| container->SetActivePageManager(status, nullptr); |
| callback(status); |
| return; |
| } |
| if (status == storage::Status::PAGE_NOT_FOUND) { |
| callback(status); |
| return; |
| } |
| |
| // If the page was found locally, just use it and return. |
| LEDGER_DCHECK(page_storage); |
| NewActivePageManager( |
| std::move(page_storage), ActivePageManager::PageStorageState::AVAILABLE, |
| [container, callback = std::move(callback)]( |
| storage::Status status, std::unique_ptr<ActivePageManager> active_page_manager) { |
| container->SetActivePageManager(status, std::move(active_page_manager)); |
| callback(status); |
| }); |
| }); |
| }); |
| } |
| |
| void PageManager::CreatePageStorage(LedgerImpl::Delegate::PageState page_state, |
| ActivePageManagerContainer* container) { |
| page_availability_manager_.OnPageAvailable([this, page_state, container]() mutable { |
| ledger_storage_->CreatePageStorage( |
| page_id_, [this, page_state, container]( |
| storage::Status status, std::unique_ptr<storage::PageStorage> page_storage) { |
| if (status != storage::Status::OK) { |
| container->SetActivePageManager(status, nullptr); |
| return; |
| } |
| |
| NewActivePageManager(std::move(page_storage), |
| page_state == LedgerImpl::Delegate::PageState::NEW |
| ? ActivePageManager::PageStorageState::AVAILABLE |
| : ActivePageManager::PageStorageState::NEEDS_SYNC, |
| [container](storage::Status status, |
| std::unique_ptr<ActivePageManager> active_page_manager) { |
| container->SetActivePageManager(status, |
| std::move(active_page_manager)); |
| }); |
| }); |
| }); |
| } |
| |
| ActivePageManagerContainer* PageManager::CreateActivePageManagerContainer() { |
| LEDGER_DCHECK(!active_page_manager_container_); |
| auto& active_page_manager_container = active_page_manager_container_.emplace( |
| environment_, ledger_name_, page_id_, page_usage_listeners_); |
| active_page_manager_container_->SetOnDiscardable([this]() { |
| active_page_manager_container_.reset(); |
| CheckDiscardable(); |
| }); |
| return &active_page_manager_container; |
| } |
| |
| void PageManager::NewActivePageManager( |
| std::unique_ptr<storage::PageStorage> page_storage, ActivePageManager::PageStorageState state, |
| fit::function<void(storage::Status, std::unique_ptr<ActivePageManager>)> callback) { |
| if (!ledger_sync_) { |
| auto result = std::make_unique<ActivePageManager>( |
| environment_, std::move(page_storage), nullptr, |
| ledger_merge_manager_->GetMergeResolver(page_storage.get()), state); |
| callback(storage::Status::OK, std::move(result)); |
| return; |
| } |
| |
| ledger_sync_->CreatePageSync( |
| page_storage.get(), page_storage.get(), |
| [this, page_storage = std::move(page_storage), state, callback = std::move(callback)]( |
| storage::Status status, std::unique_ptr<sync_coordinator::PageSync> page_sync) mutable { |
| if (status != storage::Status::OK) { |
| callback(status, nullptr); |
| return; |
| } |
| auto result = std::make_unique<ActivePageManager>( |
| environment_, std::move(page_storage), std::move(page_sync), |
| ledger_merge_manager_->GetMergeResolver(page_storage.get()), state); |
| callback(storage::Status::OK, std::move(result)); |
| }); |
| } |
| |
| void PageManager::PageIsClosedAndSatisfiesPredicate( |
| fit::function<void(ActivePageManager*, fit::function<void(storage::Status, bool)>)> predicate, |
| fit::function<void(storage::Status, PagePredicateResult)> callback) { |
| // Start logging whether the page has been opened during the execution of |
| // this method. |
| auto tracker = NewPageTracker(); |
| |
| ActivePageManagerContainer* container; |
| |
| if (active_page_manager_container_) { |
| // The page manager is open, check if there are any open connections. |
| container = &active_page_manager_container_.value(); |
| if (active_page_manager_container_->PageConnectionIsOpen()) { |
| callback(storage::Status::OK, PagePredicateResult::PAGE_OPENED); |
| return; |
| } |
| } else { |
| // Create the container and get the PageStorage. |
| container = CreateActivePageManagerContainer(); |
| InitActivePageManagerContainer(container, [container](storage::Status status) { |
| if (status == storage::Status::PAGE_NOT_FOUND) { |
| container->SetActivePageManager(status, nullptr); |
| } |
| }); |
| } |
| |
| container->NewInternalRequest([this, tracker = std::move(tracker), |
| predicate = std::move(predicate), callback = std::move(callback)]( |
| storage::Status status, ExpiringToken token, |
| ActivePageManager* active_page_manager) mutable { |
| if (status != storage::Status::OK) { |
| callback(status, PagePredicateResult::PAGE_OPENED); |
| return; |
| } |
| LEDGER_DCHECK(active_page_manager); |
| predicate( |
| active_page_manager, |
| MakeScoped( |
| weak_factory_.GetWeakPtr(), |
| [this, tracker = std::move(tracker), callback = std::move(callback), |
| token = std::move(token)](storage::Status status, bool condition) mutable { |
| if (status != storage::Status::OK) { |
| callback(status, PagePredicateResult::PAGE_OPENED); |
| } |
| // |token| is expected to go out of scope. The |
| // PageManager is kept non-empty by |tracker|. |
| async::PostTask( |
| environment_->dispatcher(), |
| MakeScoped(weak_factory_.GetWeakPtr(), [condition, callback = std::move(callback), |
| tracker = std::move(tracker)]() mutable { |
| if (!tracker()) { |
| // If |RemoveTrackedPage| returns false, this |
| // means that the page was opened during this |
| // operation and |PAGE_OPENED| must be returned. |
| callback(storage::Status::OK, PagePredicateResult::PAGE_OPENED); |
| return; |
| } |
| callback(storage::Status::OK, |
| condition ? PagePredicateResult::YES : PagePredicateResult::NO); |
| })); |
| })); |
| }); |
| } |
| |
| fit::function<bool()> PageManager::NewPageTracker() { |
| outstanding_operations_++; |
| uint64_t operation_id = was_opened_id_++; |
| was_opened_.push_back(operation_id); |
| |
| WeakPtr<PageManager> weak_this = weak_factory_.GetWeakPtr(); |
| |
| auto stop_tracking = [this, weak_this, operation_id]() { |
| if (!weak_this) { |
| return false; |
| } |
| outstanding_operations_--; |
| auto check_empty_on_return = fit::defer([this] { CheckDiscardable(); }); |
| |
| // Erase operation_id, if found, from the vector - operation_id may not be present in the vector |
| // if the vector was cleared (as happens during a call to GetPage) between when the operation |
| // started and now. |
| auto it = std::find(was_opened_.begin(), was_opened_.end(), operation_id); |
| if (it != was_opened_.end()) { |
| was_opened_.erase(it); |
| return true; |
| } |
| return false; |
| }; |
| return EnsureCalled(std::move(stop_tracking)); |
| } |
| |
| void PageManager::MaybeMarkPageOpened() { was_opened_.clear(); } |
| |
| void PageManager::CheckDiscardable() { |
| if (on_discardable_ && IsDiscardable()) { |
| on_discardable_(); |
| } |
| } |
| |
| } // namespace ledger |