| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/app/page_delegate.h" |
| |
| #include <lib/callback/scoped_callback.h> |
| #include <lib/callback/waiter.h> |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| #include <lib/fsl/socket/strings.h> |
| #include <trace/event.h> |
| |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "peridot/lib/convert/convert.h" |
| #include "src/ledger/bin/app/constants.h" |
| #include "src/ledger/bin/app/page_manager.h" |
| #include "src/ledger/bin/app/page_snapshot_impl.h" |
| #include "src/ledger/bin/app/page_utils.h" |
| #include "src/ledger/bin/fidl/include/types.h" |
| #include "src/lib/fxl/memory/ref_ptr.h" |
| |
| namespace ledger { |
| |
| PageDelegate::PageDelegate(coroutine::CoroutineService* coroutine_service, |
| PageManager* manager, storage::PageStorage* storage, |
| MergeResolver* merge_resolver, |
| SyncWatcherSet* watchers, |
| std::unique_ptr<PageImpl> page_impl) |
| : manager_(manager), |
| storage_(storage), |
| merge_resolver_(merge_resolver), |
| branch_tracker_(coroutine_service, manager, storage), |
| watcher_set_(watchers), |
| page_impl_(std::move(page_impl)), |
| weak_factory_(this) { |
| page_impl_->set_on_binding_unbound([this] { |
| operation_serializer_.Serialize<storage::Status>( |
| [](storage::Status status) {}, |
| [this](fit::function<void(storage::Status)> callback) { |
| branch_tracker_.StopTransaction(nullptr); |
| callback(storage::Status::OK); |
| }); |
| }); |
| branch_tracker_.set_on_empty([this] { CheckEmpty(); }); |
| operation_serializer_.set_on_empty([this] { CheckEmpty(); }); |
| } |
| |
| PageDelegate::~PageDelegate() {} |
| |
| void PageDelegate::Init(fit::function<void(storage::Status)> on_done) { |
| storage::Status status = branch_tracker_.Init(); |
| if (status != storage::Status::OK) { |
| on_done(status); |
| return; |
| } |
| page_impl_->SetPageDelegate(this); |
| on_done(storage::Status::OK); |
| } |
| |
| void PageDelegate::GetSnapshot( |
| fidl::InterfaceRequest<PageSnapshot> snapshot_request, |
| std::vector<uint8_t> key_prefix, fidl::InterfaceHandle<PageWatcher> watcher, |
| fit::function<void(Status)> callback) { |
| // TODO(qsr): Update this so that only |GetCurrentCommitId| is done in a the |
| // operation serializer. |
| operation_serializer_.Serialize<storage::Status>( |
| PageUtils::AdaptStatusCallback(std::move(callback)), |
| [this, snapshot_request = std::move(snapshot_request), |
| key_prefix = std::move(key_prefix), watcher = std::move(watcher)]( |
| fit::function<void(storage::Status)> callback) mutable { |
| std::unique_ptr<const storage::Commit> commit = |
| branch_tracker_.GetBranchHead(); |
| std::string prefix = convert::ToString(key_prefix); |
| if (watcher) { |
| PageWatcherPtr watcher_ptr = watcher.Bind(); |
| branch_tracker_.RegisterPageWatcher(std::move(watcher_ptr), |
| commit->Clone(), prefix); |
| } |
| manager_->BindPageSnapshot( |
| std::move(commit), std::move(snapshot_request), std::move(prefix)); |
| callback(storage::Status::OK); |
| }); |
| } |
| |
| void PageDelegate::Put(std::vector<uint8_t> key, std::vector<uint8_t> value, |
| fit::function<void(Status)> callback) { |
| PutWithPriority(std::move(key), std::move(value), Priority::EAGER, |
| std::move(callback)); |
| } |
| |
| void PageDelegate::PutWithPriority(std::vector<uint8_t> key, |
| std::vector<uint8_t> value, |
| Priority priority, |
| fit::function<void(Status)> callback) { |
| FXL_DCHECK(key.size() <= kMaxKeySize); |
| auto promise = fxl::MakeRefCounted< |
| callback::Promise<storage::Status, storage::ObjectIdentifier>>( |
| storage::Status::ILLEGAL_STATE); |
| storage_->AddObjectFromLocal(storage::ObjectType::BLOB, |
| storage::DataSource::Create(std::move(value)), |
| {}, promise->NewCallback()); |
| |
| operation_serializer_.Serialize<storage::Status>( |
| PageUtils::AdaptStatusCallback(std::move(callback)), |
| [this, promise = std::move(promise), key = std::move(key), |
| priority](fit::function<void(storage::Status)> callback) mutable { |
| promise->Finalize(callback::MakeScoped( |
| weak_factory_.GetWeakPtr(), |
| [this, key = std::move(key), priority, |
| callback = std::move(callback)]( |
| storage::Status status, |
| storage::ObjectIdentifier object_identifier) mutable { |
| if (status != storage::Status::OK) { |
| callback(status); |
| return; |
| } |
| |
| PutInCommit(std::move(key), std::move(object_identifier), |
| priority == Priority::EAGER |
| ? storage::KeyPriority::EAGER |
| : storage::KeyPriority::LAZY, |
| std::move(callback)); |
| })); |
| }); |
| } |
| |
| void PageDelegate::PutReference(std::vector<uint8_t> key, Reference reference, |
| Priority priority, |
| fit::function<void(Status)> callback) { |
| FXL_DCHECK(key.size() <= kMaxKeySize); |
| // |ResolveReference| also makes sure that the reference was created for this |
| // page. |
| storage::ObjectIdentifier object_identifier; |
| storage::Status status = |
| manager_->ResolveReference(std::move(reference), &object_identifier); |
| if (status != storage::Status::OK) { |
| callback(PageUtils::ConvertStatus(status)); |
| return; |
| } |
| |
| operation_serializer_.Serialize<storage::Status>( |
| PageUtils::AdaptStatusCallback(std::move(callback)), |
| [this, key = std::move(key), |
| object_identifier = std::move(object_identifier), |
| priority](fit::function<void(storage::Status)> callback) mutable { |
| PutInCommit(std::move(key), std::move(object_identifier), |
| priority == Priority::EAGER ? storage::KeyPriority::EAGER |
| : storage::KeyPriority::LAZY, |
| std::move(callback)); |
| }); |
| } |
| |
| void PageDelegate::Delete(std::vector<uint8_t> key, |
| fit::function<void(Status)> callback) { |
| operation_serializer_.Serialize<storage::Status>( |
| PageUtils::AdaptStatusCallback(std::move(callback)), |
| [this, key = std::move(key)]( |
| fit::function<void(storage::Status)> callback) mutable { |
| RunInTransaction( |
| [key = std::move(key)](storage::Journal* journal) { |
| journal->Delete(key); |
| }, |
| std::move(callback)); |
| }); |
| } |
| |
| void PageDelegate::Clear(fit::function<void(Status)> callback) { |
| operation_serializer_.Serialize<storage::Status>( |
| PageUtils::AdaptStatusCallback(std::move(callback)), |
| [this](fit::function<void(storage::Status)> callback) mutable { |
| RunInTransaction([](storage::Journal* journal) { journal->Clear(); }, |
| std::move(callback)); |
| }); |
| } |
| |
| void PageDelegate::CreateReference( |
| std::unique_ptr<storage::DataSource> data, |
| fit::function<void(Status, CreateReferenceStatus, ReferencePtr)> callback) { |
| storage_->AddObjectFromLocal( |
| storage::ObjectType::BLOB, std::move(data), {}, |
| callback::MakeScoped( |
| weak_factory_.GetWeakPtr(), |
| [this, callback = std::move(callback)]( |
| storage::Status status, |
| storage::ObjectIdentifier object_identifier) { |
| if (status != storage::Status::OK && |
| status != storage::Status::IO_ERROR) { |
| callback(PageUtils::ConvertStatus(status), |
| CreateReferenceStatus::OK, nullptr); |
| return; |
| } |
| |
| // Convert IO_ERROR into INVALID_ARGUMENT. |
| // TODO(qsr): Refactor status handling so that io error due to |
| // storage and io error due to invalid argument can be |
| // distinguished. |
| // An INVALID_ARGUMENT should not cause the page to get |
| // disconnected, so use OK as status. |
| if (status == storage::Status::IO_ERROR) { |
| callback(Status::OK, CreateReferenceStatus::INVALID_ARGUMENT, |
| nullptr); |
| return; |
| } |
| |
| callback(Status::OK, CreateReferenceStatus::OK, |
| fidl::MakeOptional(manager_->CreateReference( |
| std::move(object_identifier)))); |
| })); |
| } |
| |
| void PageDelegate::StartTransaction(fit::function<void(Status)> callback) { |
| operation_serializer_.Serialize<Status>( |
| std::move(callback), |
| [this](fit::function<void(ledger::Status)> callback) { |
| if (journal_) { |
| callback(Status::TRANSACTION_ALREADY_IN_PROGRESS); |
| return; |
| } |
| std::unique_ptr<const storage::Commit> commit = |
| branch_tracker_.GetBranchHead(); |
| journal_ = storage_->StartCommit(std::move(commit)); |
| |
| branch_tracker_.StartTransaction( |
| [callback = std::move(callback)]() { callback(Status::OK); }); |
| }); |
| } |
| |
| void PageDelegate::Commit(fit::function<void(Status)> callback) { |
| operation_serializer_.Serialize<Status>( |
| std::move(callback), |
| [this](fit::function<void(ledger::Status)> callback) { |
| if (!journal_) { |
| callback(Status::NO_TRANSACTION_IN_PROGRESS); |
| return; |
| } |
| CommitJournal(std::move(journal_), |
| callback::MakeScoped( |
| weak_factory_.GetWeakPtr(), |
| [this, callback = std::move(callback)]( |
| storage::Status status, |
| std::unique_ptr<const storage::Commit> commit) { |
| branch_tracker_.StopTransaction(std::move(commit)); |
| callback(PageUtils::ConvertStatus(status)); |
| })); |
| }); |
| } |
| |
| void PageDelegate::Rollback(fit::function<void(Status)> callback) { |
| operation_serializer_.Serialize<Status>( |
| std::move(callback), |
| [this](fit::function<void(ledger::Status)> callback) { |
| if (!journal_) { |
| callback(Status::NO_TRANSACTION_IN_PROGRESS); |
| return; |
| } |
| journal_.reset(); |
| callback(Status::OK); |
| branch_tracker_.StopTransaction(nullptr); |
| }); |
| } |
| |
| void PageDelegate::SetSyncStateWatcher( |
| fidl::InterfaceHandle<SyncWatcher> watcher, |
| fit::function<void(Status)> callback) { |
| SyncWatcherPtr watcher_ptr = watcher.Bind(); |
| watcher_set_->AddSyncWatcher(std::move(watcher_ptr)); |
| callback(Status::OK); |
| } |
| |
| void PageDelegate::WaitForConflictResolution( |
| fit::function<void(Status, ConflictResolutionWaitStatus)> callback) { |
| if (!merge_resolver_->HasUnfinishedMerges()) { |
| callback(Status::OK, ConflictResolutionWaitStatus::NO_CONFLICTS); |
| return; |
| } |
| merge_resolver_->RegisterNoConflictCallback( |
| [callback = std::move(callback)](ConflictResolutionWaitStatus status) { |
| callback(Status::OK, status); |
| }); |
| } |
| |
| void PageDelegate::PutInCommit(std::vector<uint8_t> key, |
| storage::ObjectIdentifier object_identifier, |
| storage::KeyPriority priority, |
| fit::function<void(storage::Status)> callback) { |
| RunInTransaction( |
| [key = std::move(key), object_identifier = std::move(object_identifier), |
| priority](storage::Journal* journal) mutable { |
| journal->Put(key, std::move(object_identifier), priority); |
| }, |
| std::move(callback)); |
| } |
| |
| void PageDelegate::RunInTransaction( |
| fit::function<void(storage::Journal*)> runnable, |
| fit::function<void(storage::Status)> callback) { |
| if (journal_) { |
| // A transaction is in progress; add this change to it. |
| runnable(journal_.get()); |
| callback(storage::Status::OK); |
| return; |
| } |
| // No transaction is in progress; create one just for this change. |
| // TODO(LE-690): Batch together operations outside transactions that have been |
| // accumulated while waiting for the previous one to be committed. |
| branch_tracker_.StartTransaction([] {}); |
| std::unique_ptr<const storage::Commit> commit = |
| branch_tracker_.GetBranchHead(); |
| std::unique_ptr<storage::Journal> journal = |
| storage_->StartCommit(std::move(commit)); |
| runnable(journal.get()); |
| |
| CommitJournal( |
| std::move(journal), |
| callback::MakeScoped( |
| weak_factory_.GetWeakPtr(), |
| [this, callback = std::move(callback)]( |
| storage::Status status, |
| std::unique_ptr<const storage::Commit> commit) { |
| branch_tracker_.StopTransaction( |
| status == storage::Status::OK ? std::move(commit) : nullptr); |
| callback(status); |
| })); |
| } |
| |
| void PageDelegate::CommitJournal( |
| std::unique_ptr<storage::Journal> journal, |
| fit::function<void(storage::Status, std::unique_ptr<const storage::Commit>)> |
| callback) { |
| storage_->CommitJournal(std::move(journal), |
| [callback = std::move(callback)]( |
| storage::Status status, |
| std::unique_ptr<const storage::Commit> commit) { |
| callback(status, std::move(commit)); |
| }); |
| } |
| |
| void PageDelegate::CheckEmpty() { |
| if (on_empty_callback_ && page_impl_->IsEmpty() && |
| branch_tracker_.IsEmpty() && operation_serializer_.empty()) { |
| on_empty_callback_(); |
| } |
| } |
| |
| } // namespace ledger |