blob: 0f33a9e173795bb093412b947b7790da2f2418d1 [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/app/page_manager.h"
#include <lib/callback/trace_callback.h>
#include <lib/fit/function.h>
#include <zircon/syscalls.h>
#include <algorithm>
#include "src/ledger/bin/app/page_utils.h"
#include "src/ledger/bin/fidl/include/types.h"
#include "src/ledger/bin/storage/impl/data_serialization.h"
#include "src/lib/fxl/logging.h"
namespace ledger {
PageManager::PageManager(Environment* environment,
std::unique_ptr<storage::PageStorage> page_storage,
std::unique_ptr<sync_coordinator::PageSync> page_sync,
std::unique_ptr<MergeResolver> merge_resolver,
PageManager::PageStorageState state,
zx::duration sync_timeout)
: environment_(environment),
page_storage_(std::move(page_storage)),
page_sync_(std::move(page_sync)),
merge_resolver_(std::move(merge_resolver)),
sync_timeout_(sync_timeout),
task_runner_(environment->dispatcher()) {
page_delegates_.set_on_empty([this] { CheckEmpty(); });
snapshots_.set_on_empty([this] { CheckEmpty(); });
if (page_sync_) {
page_sync_->SetSyncWatcher(&watchers_);
page_sync_->SetOnIdle([this] { CheckEmpty(); });
page_sync_->SetOnBacklogDownloaded([this] { OnSyncBacklogDownloaded(); });
page_sync_->Start();
if (state == PageManager::PageStorageState::NEEDS_SYNC) {
// The page storage was created locally. We wait a bit in order to get the
// initial state from the network before accepting requests.
task_runner_.PostDelayedTask(
[this] {
if (!sync_backlog_downloaded_) {
FXL_LOG(INFO) << "Initial sync will continue in background, "
<< "in the meantime binding to local page data "
<< "(might be stale or empty).";
OnSyncBacklogDownloaded();
}
},
sync_timeout_);
} else {
sync_backlog_downloaded_ = true;
}
} else {
sync_backlog_downloaded_ = true;
}
merge_resolver_->set_on_empty([this] { CheckEmpty(); });
merge_resolver_->SetPageManager(this);
}
PageManager::~PageManager() {
for (const auto& [page_impl, on_done] : page_impls_) {
on_done(storage::Status::INTERNAL_ERROR);
}
page_impls_.clear();
}
void PageManager::AddPageImpl(std::unique_ptr<PageImpl> page_impl,
fit::function<void(storage::Status)> on_done) {
auto traced_on_done = TRACE_CALLBACK(std::move(on_done), "ledger",
"page_manager_add_page_impl");
if (!sync_backlog_downloaded_) {
page_impls_.emplace_back(std::move(page_impl), std::move(traced_on_done));
return;
}
page_delegates_
.emplace(environment_->coroutine_service(), this, page_storage_.get(),
merge_resolver_.get(), &watchers_, std::move(page_impl))
.Init(std::move(traced_on_done));
}
void PageManager::BindPageSnapshot(
std::unique_ptr<const storage::Commit> commit,
fidl::InterfaceRequest<PageSnapshot> snapshot_request,
std::string key_prefix) {
snapshots_.emplace(std::move(snapshot_request), page_storage_.get(),
std::move(commit), std::move(key_prefix));
}
Reference PageManager::CreateReference(
storage::ObjectIdentifier object_identifier) {
uint64_t index = environment_->random()->Draw<uint64_t>();
FXL_DCHECK(references_.find(index) == references_.end());
references_[index] = std::move(object_identifier);
Reference reference;
reference.opaque_id = convert::ToArray(storage::SerializeData(index));
return reference;
}
storage::Status PageManager::ResolveReference(
Reference reference, storage::ObjectIdentifier* object_identifier) {
if (reference.opaque_id.size() != sizeof(uint64_t)) {
return storage::Status::REFERENCE_NOT_FOUND;
}
uint64_t index = storage::DeserializeData<uint64_t>(
convert::ToStringView(reference.opaque_id));
auto iterator = references_.find(index);
if (iterator == references_.end()) {
return storage::Status::REFERENCE_NOT_FOUND;
}
*object_identifier = iterator->second;
return storage::Status::OK;
}
void PageManager::IsSynced(
fit::function<void(storage::Status, bool)> callback) {
page_storage_->IsSynced(
[callback = std::move(callback)](storage::Status status, bool is_synced) {
callback(status, is_synced);
});
}
void PageManager::IsOfflineAndEmpty(
fit::function<void(storage::Status, bool)> callback) {
if (page_storage_->IsOnline()) {
callback(storage::Status::OK, false);
return;
}
// The page is offline. Check and return if it's also empty.
page_storage_->IsEmpty(
[callback = std::move(callback)](storage::Status status, bool is_empty) {
callback(status, is_empty);
});
}
bool PageManager::IsEmpty() {
return page_delegates_.empty() && snapshots_.empty() && page_impls_.empty() &&
merge_resolver_->IsEmpty() && (!page_sync_ || page_sync_->IsIdle());
}
void PageManager::CheckEmpty() {
if (on_empty_callback_ && IsEmpty()) {
on_empty_callback_();
}
}
void PageManager::OnSyncBacklogDownloaded() {
if (sync_backlog_downloaded_) {
FXL_LOG(INFO) << "Initial sync in background finished. "
<< "Clients will receive a change notification.";
}
sync_backlog_downloaded_ = true;
for (auto& [page_impl, on_done] : page_impls_) {
AddPageImpl(std::move(page_impl), std::move(on_done));
}
page_impls_.clear();
}
} // namespace ledger