blob: e9cb94233d3102f84d5ccd80d241bb011a72967d [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/app/page_manager.h"
#include <algorithm>
#include <lib/callback/trace_callback.h>
#include <lib/fit/function.h>
#include <lib/fxl/logging.h>
#include <zircon/syscalls.h>
#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/bin/ledger/fidl/include/types.h"
#include "peridot/bin/ledger/storage/impl/data_serialization.h"
namespace ledger {
PageManager::PageManager(Environment* environment,
std::unique_ptr<storage::PageStorage> page_storage,
std::unique_ptr<sync_coordinator::PageSync> page_sync,
std::unique_ptr<MergeResolver> merge_resolver,
PageManager::PageStorageState state,
zx::duration sync_timeout)
: environment_(environment),
page_storage_(std::move(page_storage)),
page_sync_(std::move(page_sync)),
merge_resolver_(std::move(merge_resolver)),
sync_timeout_(sync_timeout),
task_runner_(environment->dispatcher()) {
pages_.set_on_empty([this] { CheckEmpty(); });
snapshots_.set_on_empty([this] { CheckEmpty(); });
page_debug_bindings_.set_empty_set_handler([this] { CheckEmpty(); });
if (page_sync_) {
page_sync_->SetSyncWatcher(&watchers_);
page_sync_->SetOnIdle([this] { CheckEmpty(); });
page_sync_->SetOnBacklogDownloaded([this] { OnSyncBacklogDownloaded(); });
page_sync_->Start();
if (state == PageManager::PageStorageState::NEEDS_SYNC) {
// The page storage was created locally. We wait a bit in order to get the
// initial state from the network before accepting requests.
task_runner_.PostDelayedTask(
[this] {
if (!sync_backlog_downloaded_) {
FXL_LOG(INFO) << "Initial sync will continue in background, "
<< "in the meantime binding to local page data "
<< "(might be stale or empty).";
OnSyncBacklogDownloaded();
}
},
sync_timeout_);
} else {
sync_backlog_downloaded_ = true;
}
} else {
sync_backlog_downloaded_ = true;
}
merge_resolver_->set_on_empty([this] { CheckEmpty(); });
merge_resolver_->SetPageManager(this);
}
PageManager::~PageManager() {
for (const auto& delaying_facade : delaying_facades_) {
delaying_facade.second(Status::INTERNAL_ERROR);
}
delaying_facades_.clear();
}
void PageManager::AddPageDelayingFacade(
std::unique_ptr<PageDelayingFacade> delaying_facade,
fit::function<void(Status)> on_done) {
auto traced_on_done = TRACE_CALLBACK(std::move(on_done), "ledger",
"page_manager_add_page_delaying_facade");
if (!sync_backlog_downloaded_) {
delaying_facades_.emplace_back(std::move(delaying_facade),
std::move(traced_on_done));
return;
}
pages_
.emplace(environment_->coroutine_service(), this, page_storage_.get(),
merge_resolver_.get(), &watchers_, std::move(delaying_facade))
.Init(std::move(traced_on_done));
}
void PageManager::BindPageDebug(fidl::InterfaceRequest<PageDebug> page_debug,
fit::function<void(Status)> callback) {
page_debug_bindings_.AddBinding(this, std::move(page_debug));
callback(Status::OK);
}
void PageManager::BindPageSnapshot(
std::unique_ptr<const storage::Commit> commit,
fidl::InterfaceRequest<PageSnapshot> snapshot_request,
std::string key_prefix) {
snapshots_.emplace(std::move(snapshot_request), page_storage_.get(),
std::move(commit), std::move(key_prefix));
}
Reference PageManager::CreateReference(
storage::ObjectIdentifier object_identifier) {
uint64_t index = environment_->random()->Draw<uint64_t>();
FXL_DCHECK(references_.find(index) == references_.end());
references_[index] = std::move(object_identifier);
Reference reference;
reference.opaque_id = convert::ToArray(storage::SerializeData(index));
return reference;
}
Status PageManager::ResolveReference(
Reference reference, storage::ObjectIdentifier* object_identifier) {
if (reference.opaque_id.size() != sizeof(uint64_t)) {
return Status::REFERENCE_NOT_FOUND;
}
uint64_t index = storage::DeserializeData<uint64_t>(
convert::ToStringView(reference.opaque_id));
auto iterator = references_.find(index);
if (iterator == references_.end()) {
return Status::REFERENCE_NOT_FOUND;
}
*object_identifier = iterator->second;
return Status::OK;
}
void PageManager::IsSynced(fit::function<void(Status, bool)> callback) {
page_storage_->IsSynced(
[callback = std::move(callback)](storage::Status status, bool is_synced) {
callback(PageUtils::ConvertStatus(status), is_synced);
});
}
void PageManager::IsOfflineAndEmpty(
fit::function<void(Status, bool)> callback) {
if (page_storage_->IsOnline()) {
callback(Status::OK, false);
return;
}
// The page is offline. Check and return if it's also empty.
page_storage_->IsEmpty(
[callback = std::move(callback)](storage::Status status, bool is_empty) {
callback(PageUtils::ConvertStatus(status), is_empty);
});
}
bool PageManager::IsEmpty() {
return pages_.empty() && snapshots_.empty() && delaying_facades_.empty() &&
merge_resolver_->IsEmpty() && (!page_sync_ || page_sync_->IsIdle()) &&
page_debug_bindings_.size() == 0;
}
void PageManager::CheckEmpty() {
if (on_empty_callback_ && IsEmpty()) {
on_empty_callback_();
}
}
void PageManager::OnSyncBacklogDownloaded() {
if (sync_backlog_downloaded_) {
FXL_LOG(INFO) << "Initial sync in background finished. "
<< "Clients will receive a change notification.";
}
sync_backlog_downloaded_ = true;
for (auto& delaying_facade : delaying_facades_) {
AddPageDelayingFacade(std::move(delaying_facade.first),
std::move(delaying_facade.second));
}
delaying_facades_.clear();
}
void PageManager::GetHeadCommitsIds(GetHeadCommitsIdsCallback callback) {
page_storage_->GetHeadCommitIds(
[callback = std::move(callback)](storage::Status status,
std::vector<storage::CommitId> heads) {
fidl::VectorPtr<ledger_internal::CommitId> result;
result.resize(0);
for (const auto& head : heads) {
ledger_internal::CommitId commit_id;
commit_id.id = convert::ToArray(head);
result.push_back(std::move(commit_id));
}
callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT),
std::move(result));
});
}
void PageManager::GetSnapshot(
ledger_internal::CommitId commit_id,
fidl::InterfaceRequest<PageSnapshot> snapshot_request,
GetSnapshotCallback callback) {
page_storage_->GetCommit(
convert::ToStringView(commit_id.id),
[this, snapshot_request = std::move(snapshot_request),
callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) mutable {
if (status == storage::Status::OK) {
BindPageSnapshot(std::move(commit), std::move(snapshot_request), "");
}
callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT));
});
}
void PageManager::GetCommit(ledger_internal::CommitId commit_id,
GetCommitCallback callback) {
page_storage_->GetCommit(
convert::ToStringView(commit_id.id),
[callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit> commit) mutable {
ledger_internal::CommitPtr commit_struct = nullptr;
if (status == storage::Status::OK) {
commit_struct = ledger_internal::Commit::New();
commit_struct->commit_id.id = convert::ToArray(commit->GetId());
commit_struct->parents_ids.resize(0);
for (const storage::CommitIdView& parent : commit->GetParentIds()) {
ledger_internal::CommitId id;
id.id = convert::ToArray(parent);
commit_struct->parents_ids.push_back(std::move(id));
}
commit_struct->timestamp = commit->GetTimestamp().get();
commit_struct->generation = commit->GetGeneration();
}
callback(PageUtils::ConvertStatus(status, Status::INVALID_ARGUMENT),
std::move(commit_struct));
});
}
} // namespace ledger