blob: 6eda97d7114cc2109cfec020350e93a9995216eb [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/lib/ledger_client/page_client.h"
#include <memory>
#include <utility>
#include "lib/fsl/vmo/strings.h"
#include "lib/fxl/functional/make_copyable.h"
#include "peridot/lib/fidl/array_to_string.h"
#include "peridot/lib/ledger_client/ledger_client.h"
namespace modular {
PageClient::PageClient(std::string context,
LedgerClient* ledger_client,
LedgerPageId page_id,
std::string prefix)
: binding_(this),
context_(std::move(context)),
ledger_client_(ledger_client),
page_id_(std::move(page_id)),
page_(ledger_client_->GetPage(this, context_, page_id_)),
prefix_(std::move(prefix)) {
page_->GetSnapshot(NewRequest(), to_array(prefix_), binding_.NewBinding(),
[this](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR)
<< context_ << " Page.GetSnapshot() " << status;
}
});
}
PageClient::~PageClient() {
// We assume ledger client always outlives page client.
ledger_client_->DropPageClient(this);
}
fidl::InterfaceRequest<ledger::PageSnapshot> PageClient::NewRequest() {
page_snapshot_ = std::make_shared<ledger::PageSnapshotPtr>();
auto ret = (*page_snapshot_).NewRequest();
(*page_snapshot_).set_error_handler([this] {
FXL_LOG(ERROR) << context_ << ": "
<< "PageSnapshot connection unexpectedly closed.";
});
return ret;
}
fidl::InterfaceRequest<ledger::PageSnapshot> PageClient::Update(
const ledger::ResultState result_state) {
switch (result_state) {
case ledger::ResultState::PARTIAL_CONTINUED:
case ledger::ResultState::PARTIAL_STARTED:
return nullptr;
case ledger::ResultState::COMPLETED:
case ledger::ResultState::PARTIAL_COMPLETED:
return NewRequest();
}
}
// |PageWatcher|
void PageClient::OnChange(ledger::PageChange page,
ledger::ResultState result_state,
OnChangeCallback callback) {
// According to their fidl spec, neither page nor page->changed_entries
// should be null.
FXL_DCHECK(page.changed_entries);
for (auto& entry : *page.changed_entries) {
// Remove prefix maybe?
const std::string key = to_string(entry.key);
std::string value;
if (!fsl::StringFromVmo(*entry.value, &value)) {
FXL_LOG(ERROR) << "PageClient::OnChange() " << context_ << ": "
<< "Unable to extract data.";
continue;
}
OnPageChange(key, value);
}
for (auto& key : *page.deleted_keys) {
OnPageDelete(to_string(key));
}
// Every time we receive a group of OnChange notifications, we update the root
// page snapshot so we see the current state. Note that pending Operation
// instances may hold on to the previous value until they finish. New
// Operation instances created after the update receive the new snapshot.
//
// For continued updates, we only request the snapshot once, in the last
// OnChange() notification.
callback(Update(result_state));
}
void PageClient::OnPageChange(const std::string& /*key*/,
const std::string& /*value*/) {}
void PageClient::OnPageDelete(const std::string& /*key*/) {}
void PageClient::OnPageConflict(Conflict* const conflict) {
FXL_LOG(INFO) << "PageClient::OnPageConflict() " << context_ << " "
<< to_hex_string(conflict->key) << " " << conflict->left << " "
<< conflict->right;
};
namespace {
void GetEntriesRecursive(ledger::PageSnapshot* const snapshot,
std::vector<ledger::Entry>* const entries,
fidl::VectorPtr<uint8_t> next_token,
std::function<void(ledger::Status)> callback) {
snapshot->GetEntries(
nullptr /* key_start */, std::move(next_token),
fxl::MakeCopyable([snapshot, entries, callback = std::move(callback)](
ledger::Status status, auto new_entries,
auto next_token) mutable {
if (status != ledger::Status::OK &&
status != ledger::Status::PARTIAL_RESULT) {
callback(status);
return;
}
for (size_t i = 0; i < new_entries->size(); ++i) {
entries->push_back(std::move(new_entries->at(i)));
}
if (status == ledger::Status::OK) {
callback(ledger::Status::OK);
return;
}
GetEntriesRecursive(snapshot, entries, std::move(next_token),
std::move(callback));
}));
}
} // namespace
void GetEntries(ledger::PageSnapshot* const snapshot,
std::vector<ledger::Entry>* const entries,
std::function<void(ledger::Status)> callback) {
GetEntriesRecursive(snapshot, entries, nullptr /* next_token */,
std::move(callback));
}
} // namespace modular