blob: ddd21a2f6343188a7447885585ab0bdb17f3b746 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/app/page_delegate.h"
#include <lib/callback/scoped_callback.h>
#include <lib/callback/waiter.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <trace/event.h>
#include <string>
#include <utility>
#include <vector>
#include "peridot/lib/convert/convert.h"
#include "src/ledger/bin/app/constants.h"
#include "src/ledger/bin/app/page_manager.h"
#include "src/ledger/bin/app/page_snapshot_impl.h"
#include "src/ledger/bin/app/page_utils.h"
#include "src/ledger/bin/fidl/include/types.h"
#include "src/lib/fxl/memory/ref_ptr.h"
namespace ledger {
PageDelegate::PageDelegate(coroutine::CoroutineService* coroutine_service,
PageManager* manager, storage::PageStorage* storage,
MergeResolver* merge_resolver,
SyncWatcherSet* watchers,
std::unique_ptr<PageImpl> page_impl)
: manager_(manager),
storage_(storage),
merge_resolver_(merge_resolver),
branch_tracker_(coroutine_service, manager, storage),
watcher_set_(watchers),
page_impl_(std::move(page_impl)),
weak_factory_(this) {
page_impl_->set_on_binding_unbound([this] {
operation_serializer_.Serialize<storage::Status>(
[](storage::Status status) {},
[this](fit::function<void(storage::Status)> callback) {
branch_tracker_.StopTransaction(nullptr);
callback(storage::Status::OK);
});
});
branch_tracker_.set_on_empty([this] { CheckEmpty(); });
operation_serializer_.set_on_empty([this] { CheckEmpty(); });
}
PageDelegate::~PageDelegate() {}
void PageDelegate::Init(fit::function<void(storage::Status)> on_done) {
storage::Status status = branch_tracker_.Init();
if (status != storage::Status::OK) {
on_done(status);
return;
}
page_impl_->SetPageDelegate(this);
on_done(storage::Status::OK);
}
void PageDelegate::GetSnapshot(
fidl::InterfaceRequest<PageSnapshot> snapshot_request,
std::vector<uint8_t> key_prefix, fidl::InterfaceHandle<PageWatcher> watcher,
fit::function<void(Status)> callback) {
// TODO(qsr): Update this so that only |GetCurrentCommitId| is done in a the
// operation serializer.
operation_serializer_.Serialize<storage::Status>(
PageUtils::AdaptStatusCallback(std::move(callback)),
[this, snapshot_request = std::move(snapshot_request),
key_prefix = std::move(key_prefix), watcher = std::move(watcher)](
fit::function<void(storage::Status)> callback) mutable {
std::unique_ptr<const storage::Commit> commit =
branch_tracker_.GetBranchHead();
std::string prefix = convert::ToString(key_prefix);
if (watcher) {
PageWatcherPtr watcher_ptr = watcher.Bind();
branch_tracker_.RegisterPageWatcher(std::move(watcher_ptr),
commit->Clone(), prefix);
}
manager_->BindPageSnapshot(
std::move(commit), std::move(snapshot_request), std::move(prefix));
callback(storage::Status::OK);
});
}
void PageDelegate::Put(std::vector<uint8_t> key, std::vector<uint8_t> value,
fit::function<void(Status)> callback) {
PutWithPriority(std::move(key), std::move(value), Priority::EAGER,
std::move(callback));
}
void PageDelegate::PutWithPriority(std::vector<uint8_t> key,
std::vector<uint8_t> value,
Priority priority,
fit::function<void(Status)> callback) {
FXL_DCHECK(key.size() <= kMaxKeySize);
auto promise = fxl::MakeRefCounted<
callback::Promise<storage::Status, storage::ObjectIdentifier>>(
storage::Status::ILLEGAL_STATE);
storage_->AddObjectFromLocal(storage::ObjectType::BLOB,
storage::DataSource::Create(std::move(value)),
{}, promise->NewCallback());
operation_serializer_.Serialize<storage::Status>(
PageUtils::AdaptStatusCallback(std::move(callback)),
[this, promise = std::move(promise), key = std::move(key),
priority](fit::function<void(storage::Status)> callback) mutable {
promise->Finalize(callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, key = std::move(key), priority,
callback = std::move(callback)](
storage::Status status,
storage::ObjectIdentifier object_identifier) mutable {
if (status != storage::Status::OK) {
callback(status);
return;
}
PutInCommit(std::move(key), std::move(object_identifier),
priority == Priority::EAGER
? storage::KeyPriority::EAGER
: storage::KeyPriority::LAZY,
std::move(callback));
}));
});
}
void PageDelegate::PutReference(std::vector<uint8_t> key, Reference reference,
Priority priority,
fit::function<void(Status)> callback) {
FXL_DCHECK(key.size() <= kMaxKeySize);
// |ResolveReference| also makes sure that the reference was created for this
// page.
storage::ObjectIdentifier object_identifier;
storage::Status status =
manager_->ResolveReference(std::move(reference), &object_identifier);
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status));
return;
}
operation_serializer_.Serialize<storage::Status>(
PageUtils::AdaptStatusCallback(std::move(callback)),
[this, key = std::move(key),
object_identifier = std::move(object_identifier),
priority](fit::function<void(storage::Status)> callback) mutable {
PutInCommit(std::move(key), std::move(object_identifier),
priority == Priority::EAGER ? storage::KeyPriority::EAGER
: storage::KeyPriority::LAZY,
std::move(callback));
});
}
void PageDelegate::Delete(std::vector<uint8_t> key,
fit::function<void(Status)> callback) {
operation_serializer_.Serialize<storage::Status>(
PageUtils::AdaptStatusCallback(std::move(callback)),
[this, key = std::move(key)](
fit::function<void(storage::Status)> callback) mutable {
RunInTransaction(
[key = std::move(key)](storage::Journal* journal) {
journal->Delete(key);
},
std::move(callback));
});
}
void PageDelegate::Clear(fit::function<void(Status)> callback) {
operation_serializer_.Serialize<storage::Status>(
PageUtils::AdaptStatusCallback(std::move(callback)),
[this](fit::function<void(storage::Status)> callback) mutable {
RunInTransaction([](storage::Journal* journal) { journal->Clear(); },
std::move(callback));
});
}
void PageDelegate::CreateReference(
std::unique_ptr<storage::DataSource> data,
fit::function<void(Status, CreateReferenceStatus, ReferencePtr)> callback) {
storage_->AddObjectFromLocal(
storage::ObjectType::BLOB, std::move(data), {},
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, callback = std::move(callback)](
storage::Status status,
storage::ObjectIdentifier object_identifier) {
if (status != storage::Status::OK &&
status != storage::Status::IO_ERROR) {
callback(PageUtils::ConvertStatus(status),
CreateReferenceStatus::OK, nullptr);
return;
}
// Convert IO_ERROR into INVALID_ARGUMENT.
// TODO(qsr): Refactor status handling so that io error due to
// storage and io error due to invalid argument can be
// distinguished.
// An INVALID_ARGUMENT should not cause the page to get
// disconnected, so use OK as status.
if (status == storage::Status::IO_ERROR) {
callback(Status::OK, CreateReferenceStatus::INVALID_ARGUMENT,
nullptr);
return;
}
callback(Status::OK, CreateReferenceStatus::OK,
fidl::MakeOptional(manager_->CreateReference(
std::move(object_identifier))));
}));
}
void PageDelegate::StartTransaction(fit::function<void(Status)> callback) {
operation_serializer_.Serialize<Status>(
std::move(callback),
[this](fit::function<void(ledger::Status)> callback) {
if (journal_) {
callback(Status::TRANSACTION_ALREADY_IN_PROGRESS);
return;
}
std::unique_ptr<const storage::Commit> commit =
branch_tracker_.GetBranchHead();
journal_ = storage_->StartCommit(std::move(commit));
branch_tracker_.StartTransaction(
[callback = std::move(callback)]() { callback(Status::OK); });
});
}
void PageDelegate::Commit(fit::function<void(Status)> callback) {
operation_serializer_.Serialize<Status>(
std::move(callback),
[this](fit::function<void(ledger::Status)> callback) {
if (!journal_) {
callback(Status::NO_TRANSACTION_IN_PROGRESS);
return;
}
CommitJournal(std::move(journal_),
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) {
branch_tracker_.StopTransaction(std::move(commit));
callback(PageUtils::ConvertStatus(status));
}));
});
}
void PageDelegate::Rollback(fit::function<void(Status)> callback) {
operation_serializer_.Serialize<Status>(
std::move(callback),
[this](fit::function<void(ledger::Status)> callback) {
if (!journal_) {
callback(Status::NO_TRANSACTION_IN_PROGRESS);
return;
}
journal_.reset();
callback(Status::OK);
branch_tracker_.StopTransaction(nullptr);
});
}
void PageDelegate::SetSyncStateWatcher(
fidl::InterfaceHandle<SyncWatcher> watcher,
fit::function<void(Status)> callback) {
SyncWatcherPtr watcher_ptr = watcher.Bind();
watcher_set_->AddSyncWatcher(std::move(watcher_ptr));
callback(Status::OK);
}
void PageDelegate::WaitForConflictResolution(
fit::function<void(Status, ConflictResolutionWaitStatus)> callback) {
if (!merge_resolver_->HasUnfinishedMerges()) {
callback(Status::OK, ConflictResolutionWaitStatus::NO_CONFLICTS);
return;
}
merge_resolver_->RegisterNoConflictCallback(
[callback = std::move(callback)](ConflictResolutionWaitStatus status) {
callback(Status::OK, status);
});
}
void PageDelegate::PutInCommit(std::vector<uint8_t> key,
storage::ObjectIdentifier object_identifier,
storage::KeyPriority priority,
fit::function<void(storage::Status)> callback) {
RunInTransaction(
[key = std::move(key), object_identifier = std::move(object_identifier),
priority](storage::Journal* journal) mutable {
journal->Put(key, std::move(object_identifier), priority);
},
std::move(callback));
}
void PageDelegate::RunInTransaction(
fit::function<void(storage::Journal*)> runnable,
fit::function<void(storage::Status)> callback) {
if (journal_) {
// A transaction is in progress; add this change to it.
runnable(journal_.get());
callback(storage::Status::OK);
return;
}
// No transaction is in progress; create one just for this change.
// TODO(LE-690): Batch together operations outside transactions that have been
// accumulated while waiting for the previous one to be committed.
branch_tracker_.StartTransaction([] {});
std::unique_ptr<const storage::Commit> commit =
branch_tracker_.GetBranchHead();
std::unique_ptr<storage::Journal> journal =
storage_->StartCommit(std::move(commit));
runnable(journal.get());
CommitJournal(
std::move(journal),
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) {
branch_tracker_.StopTransaction(
status == storage::Status::OK ? std::move(commit) : nullptr);
callback(status);
}));
}
void PageDelegate::CommitJournal(
std::unique_ptr<storage::Journal> journal,
fit::function<void(storage::Status, std::unique_ptr<const storage::Commit>)>
callback) {
storage_->CommitJournal(std::move(journal),
[callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) {
callback(status, std::move(commit));
});
}
void PageDelegate::CheckEmpty() {
if (on_empty_callback_ && page_impl_->IsEmpty() &&
branch_tracker_.IsEmpty() && operation_serializer_.empty()) {
on_empty_callback_();
}
}
} // namespace ledger