blob: 14459f0abc9f9dc352359fb16ae07bcba3d48b35 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LtICENSE file.
#include "peridot/lib/ledger_client/ledger_client.h"
#include <algorithm>
#include "lib/fsl/vmo/strings.h"
#include "lib/fxl/functional/make_copyable.h"
#include "peridot/lib/fidl/array_to_string.h"
#include "peridot/lib/fidl/clone.h"
#include "peridot/lib/ledger_client/page_client.h"
#include "peridot/lib/ledger_client/status.h"
#include "peridot/lib/ledger_client/types.h"
namespace modular {
struct LedgerClient::PageEntry {
LedgerPageId page_id;
ledger::PagePtr page;
std::vector<PageClient*> clients;
};
namespace {
PageClient::Conflict ToConflict(const ledger::DiffEntry* entry) {
PageClient::Conflict conflict;
conflict.key = entry->key.Clone();
if (entry->left) {
conflict.has_left = true;
std::string value;
if (!fsl::StringFromVmo(*entry->left->value, &value)) {
FXL_LOG(ERROR) << "Unable to read vmo for left entry of " << to_hex_string(conflict.key)
<< ".";
return conflict;
}
conflict.left = std::move(value);
} else {
conflict.left_is_deleted = true;
}
if (entry->right) {
conflict.has_right = true;
std::string value;
if (!fsl::StringFromVmo(*entry->right->value, &value)) {
FXL_LOG(ERROR) << "Unable to read vmo for right entry of " << to_hex_string(conflict.key)
<< ".";
return conflict;
}
conflict.right = std::move(value);
} else {
conflict.right_is_deleted = true;
}
return conflict;
}
void GetDiffRecursive(ledger::MergeResultProvider* const result,
std::map<std::string, PageClient::Conflict>* conflicts,
LedgerToken token,
std::function<void(ledger::Status)> callback) {
auto cont = fxl::MakeCopyable(
[result, conflicts, callback = std::move(callback)](
ledger::Status status, fidl::VectorPtr<ledger::DiffEntry> change_delta,
LedgerToken token) mutable {
if (status != ledger::Status::OK &&
status != ledger::Status::PARTIAL_RESULT) {
callback(status);
return;
}
for (auto& diff_entry : *change_delta) {
(*conflicts)[to_string(diff_entry.key)] =
ToConflict(&diff_entry);
}
if (status == ledger::Status::OK) {
callback(ledger::Status::OK);
return;
}
GetDiffRecursive(result, conflicts, std::move(token),
std::move(callback));
});
result->GetConflictingDiff(std::move(token), cont);
}
void GetDiff(ledger::MergeResultProvider* const result,
std::map<std::string, PageClient::Conflict>* conflicts,
std::function<void(ledger::Status)> callback) {
GetDiffRecursive(result, conflicts, nullptr /* token */, std::move(callback));
}
bool HasPrefix(const std::string& value, const std::string& prefix) {
if (value.size() < prefix.size()) {
return false;
}
for (size_t i = 0; i < prefix.size(); ++i) {
if (value[i] != prefix[i]) {
return false;
}
}
return true;
}
} // namespace
class LedgerClient::ConflictResolverImpl::ResolveCall : Operation<> {
public:
ResolveCall(OperationContainer* const container,
ConflictResolverImpl* const impl,
ledger::MergeResultProviderPtr result_provider,
ledger::PageSnapshotPtr left_version,
ledger::PageSnapshotPtr right_version,
ledger::PageSnapshotPtr common_version)
: Operation("LedgerClient::ResolveCall", container, [] {}),
impl_(impl),
result_provider_(std::move(result_provider)),
left_version_(std::move(left_version)),
right_version_(std::move(right_version)),
common_version_(std::move(common_version)) {
Ready();
}
private:
void Run() override {
FlowToken flow{this};
GetDiff(result_provider_.get(), &conflicts_,
[this, flow](ledger::Status status) {
has_diff_ = true;
Cont(flow);
});
result_provider_->MergeNonConflictingEntries(
[this, flow](ledger::Status status) {
merged_non_conflict_ = true;
Cont(flow);
});
GetEntries(
left_version_.get(), &left_entries_,
[this, flow](ledger::Status) { LogEntries("left", left_entries_); });
GetEntries(
right_version_.get(), &right_entries_,
[this, flow](ledger::Status) { LogEntries("right", right_entries_); });
GetEntries(common_version_.get(), &common_entries_,
[this, flow](ledger::Status) {
LogEntries("common", common_entries_);
});
}
void Cont(FlowToken flow) {
if (!has_diff_ || !merged_non_conflict_) {
return;
}
std::vector<PageClient*> page_clients;
impl_->GetPageClients(&page_clients);
for (auto& pair : conflicts_) {
const PageClient::Conflict& conflict = pair.second;
fidl::VectorPtr<ledger::MergedValue> merge_changes;
if (conflict.has_left && conflict.has_right) {
for (PageClient* const page_client : page_clients) {
if (HasPrefix(pair.first, page_client->prefix())) {
page_client->OnPageConflict(&pair.second);
if (pair.second.resolution != PageClient::LEFT) {
ledger::MergedValue merged_value;
merged_value.key = conflict.key.Clone();
if (pair.second.resolution == PageClient::RIGHT) {
merged_value.source = ledger::ValueSource::RIGHT;
} else {
if (pair.second.merged_is_deleted) {
merged_value.source = ledger::ValueSource::DELETE;
} else {
merged_value.source = ledger::ValueSource::NEW;
merged_value.new_value = ledger::BytesOrReference::New();
merged_value.new_value->set_bytes(
to_array(pair.second.merged));
}
}
merge_changes.push_back(std::move(merged_value));
}
// First client handles the conflict.
//
// TODO(mesch): We should order clients reverse-lexicographically by
// prefix, so that longer prefixes are checked first.
//
// TODO(mesch): Default resolution could then be PASS, which would
// pass to the next matching client. Too easy to abuse though.
//
// TODO(mesch): Best would be if overlapping prefixes are
// prohibited.
break;
}
}
}
if (!merge_changes->empty()) {
merge_count_++;
result_provider_->Merge(
std::move(merge_changes), [this, flow](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "ResultProvider.Merge() " << status;
}
MergeDone(flow);
});
}
}
MergeDone(flow);
}
void MergeDone(FlowToken flow) {
if (--merge_count_ == 0) {
result_provider_->Done([this, flow](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "ResultProvider.Done() " << status;
}
impl_->NotifyWatchers();
});
}
}
void LogEntries(const std::string& headline,
const std::vector<ledger::Entry>& entries) {
FXL_VLOG(4) << "Entries " << headline;
for (const ledger::Entry& entry : entries) {
FXL_VLOG(4) << " - " << to_string(entry.key);
}
}
ConflictResolverImpl* const impl_;
ledger::MergeResultProviderPtr result_provider_;
ledger::PageSnapshotPtr left_version_;
ledger::PageSnapshotPtr right_version_;
ledger::PageSnapshotPtr common_version_;
std::vector<ledger::Entry> left_entries_;
std::vector<ledger::Entry> right_entries_;
std::vector<ledger::Entry> common_entries_;
bool has_diff_{};
bool merged_non_conflict_{};
std::map<std::string, PageClient::Conflict> conflicts_;
int merge_count_{1}; // One extra for the call at the end.
FXL_DISALLOW_COPY_AND_ASSIGN(ResolveCall);
};
LedgerClient::LedgerClient(ledger::LedgerPtr ledger)
: ledger_(std::move(ledger)) {
ledger_->SetConflictResolverFactory(
bindings_.AddBinding(this), [](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "Ledger.SetConflictResolverFactory() failed: "
<< LedgerStatusToString(status);
}
});
}
LedgerClient::LedgerClient(ledger_internal::LedgerRepository* const ledger_repository,
const std::string& name,
std::function<void()> error)
: ledger_name_(name) {
ledger_repository->Duplicate(
ledger_repository_.NewRequest(), [error](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "LedgerRepository::Duplicate() failed: "
<< LedgerStatusToString(status);
error();
}
});
// Open Ledger.
ledger_repository->GetLedger(
to_array(name), ledger_.NewRequest(), [error](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "LedgerRepository.GetLedger() failed: "
<< LedgerStatusToString(status);
error();
}
});
// This must be the first call after GetLedger, otherwise the Ledger
// starts with one reconciliation strategy, then switches to another.
ledger_->SetConflictResolverFactory(
bindings_.AddBinding(this), [error](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "Ledger.SetConflictResolverFactory() failed: "
<< LedgerStatusToString(status);
error();
}
});
}
LedgerClient::LedgerClient(ledger_internal::LedgerRepository* const ledger_repository,
const std::string& name)
: ledger_name_(name) {
ledger_repository->Duplicate(
ledger_repository_.NewRequest(), [](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "LedgerRepository.Duplicate() failed: "
<< LedgerStatusToString(status);
// No further error reporting, as this is used only in tests.
}
});
// Open Ledger.
ledger_repository->GetLedger(
to_array(name), ledger_.NewRequest(), [](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "LedgerRepository.GetLedger() failed: "
<< LedgerStatusToString(status);
// No further error reporting, as this is used only in tests.
}
});
// Not registering a conflict resolver here.
}
LedgerClient::~LedgerClient() = default;
std::unique_ptr<LedgerClient> LedgerClient::GetLedgerClientPeer() {
// NOTE(mesch): std::make_unique() requires the constructor to be public.
std::unique_ptr<LedgerClient> ret;
ret.reset(new LedgerClient(ledger_repository_.get(), ledger_name_));
return ret;
}
ledger::Page* LedgerClient::GetPage(PageClient* const page_client,
const std::string& context,
const ledger::PageId& page_id) {
auto i = std::find_if(pages_.begin(), pages_.end(), [&page_id](auto& entry) {
return PageIdsEqual(entry->page_id, page_id);
});
if (i != pages_.end()) {
(*i)->clients.push_back(page_client);
return (*i)->page.get();
}
ledger::PagePtr page;
ledger::PageIdPtr page_id_copy = ledger::PageId::New();;
page_id_copy->id = page_id.id;
ledger_->GetPage(
std::move(page_id_copy), page.NewRequest(), [context](ledger::Status status) {
if (status != ledger::Status::OK) {
FXL_LOG(ERROR) << "Ledger.GetPage() " << context << " " << status;
}
});
auto entry = std::make_unique<PageEntry>();
entry->page_id = CloneStruct(page_id);
entry->clients.push_back(page_client);
entry->page = std::move(page);
entry->page.set_error_handler([context] {
// TODO(mesch): If this happens, larger things are wrong. This should
// probably be signalled up, or at least must be signalled to the page
// client.
FXL_LOG(ERROR) << context << ": "
<< "Page connection unexpectedly closed.";
});
pages_.emplace_back(std::move(entry));
return pages_.back()->page.get();
}
void LedgerClient::DropPageClient(PageClient* const page_client) {
for (auto i = pages_.begin(); i < pages_.end(); ++i) {
std::vector<PageClient*>* const page_clients = &(*i)->clients;
auto ii = std::remove_if(
page_clients->begin(), page_clients->end(),
[page_client](PageClient* item) { return item == page_client; });
if (ii != page_clients->end()) {
page_clients->erase(ii, page_clients->end());
if (page_clients->empty()) {
ClearConflictResolver((*i)->page_id);
pages_.erase(i);
break; // Loop variable now invalid; must not continue loop.
}
}
}
}
void LedgerClient::GetPolicy(LedgerPageId page_id,
GetPolicyCallback callback) {
auto i = std::find_if(pages_.begin(), pages_.end(), [&page_id](auto& entry) {
return PageIdsEqual(entry->page_id, page_id);
});
// This is wrong if GetPolicy() is called for a page before its page client
// has registered. Therefore, if an app keeps multiple connections to a page,
// the ones kept by page clients must be created first.
//
// TODO(mesch): Maybe AUTOMATIC_WITH_FALLBACK should always be used anyway,
// and the resolver should deal with conflicts on pages that don't have a page
// client.
if (i != pages_.end()) {
callback(ledger::MergePolicy::AUTOMATIC_WITH_FALLBACK);
return;
}
callback(ledger::MergePolicy::LAST_ONE_WINS);
}
void LedgerClient::NewConflictResolver(
LedgerPageId page_id,
fidl::InterfaceRequest<ledger::ConflictResolver> request) {
for (auto& i : resolvers_) {
if (PageIdsEqual(i->page_id(), page_id)) {
i->Connect(std::move(request));
return;
}
}
resolvers_.emplace_back(
std::make_unique<ConflictResolverImpl>(this, std::move(page_id)));
resolvers_.back()->Connect(std::move(request));
}
void LedgerClient::ClearConflictResolver(const LedgerPageId& page_id) {
auto i = std::remove_if(
resolvers_.begin(), resolvers_.end(),
[&page_id](auto& item) { return PageIdsEqual(item->page_id(), page_id); });
if (i != resolvers_.end()) {
resolvers_.erase(i, resolvers_.end());
}
}
LedgerClient::ConflictResolverImpl::ConflictResolverImpl(
LedgerClient* const ledger_client,
const LedgerPageId& page_id)
: ledger_client_(ledger_client), page_id_(CloneStruct(page_id)) {}
LedgerClient::ConflictResolverImpl::~ConflictResolverImpl() = default;
void LedgerClient::ConflictResolverImpl::Connect(
fidl::InterfaceRequest<ledger::ConflictResolver> request) {
bindings_.AddBinding(this, std::move(request));
}
void LedgerClient::ConflictResolverImpl::Resolve(
fidl::InterfaceHandle<ledger::PageSnapshot> left_version,
fidl::InterfaceHandle<ledger::PageSnapshot> right_version,
fidl::InterfaceHandle<ledger::PageSnapshot> common_version,
fidl::InterfaceHandle<ledger::MergeResultProvider> result_provider) {
new ResolveCall(&operation_queue_, this, result_provider.Bind(),
left_version.Bind(), right_version.Bind(),
common_version.Bind());
}
void LedgerClient::ConflictResolverImpl::GetPageClients(
std::vector<PageClient*>* page_clients) {
auto i = std::find_if(
ledger_client_->pages_.begin(), ledger_client_->pages_.end(),
[this](auto& item) { return PageIdsEqual(item->page_id, page_id_); });
FXL_CHECK(i != ledger_client_->pages_.end());
*page_clients = (*i)->clients;
}
void LedgerClient::ConflictResolverImpl::NotifyWatchers() const {
for (auto& watcher : ledger_client_->watchers_) {
watcher();
}
}
} // namespace modular