blob: e38690d2c44030d36b5065df57ed5616642a7191 [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/app/merging/conflict_resolver_client.h"
#include <algorithm>
#include <memory>
#include <string>
#include <vector>
#include <lib/callback/scoped_callback.h>
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fxl/memory/ref_ptr.h>
#include <lib/fxl/memory/weak_ptr.h>
#include "peridot/bin/ledger/app/diff_utils.h"
#include "peridot/bin/ledger/app/fidl/serialization_size.h"
#include "peridot/bin/ledger/app/page_manager.h"
#include "peridot/bin/ledger/app/page_utils.h"
#include "peridot/lib/util/ptr.h"
namespace ledger {
ConflictResolverClient::ConflictResolverClient(
storage::PageStorage* storage, PageManager* page_manager,
ConflictResolver* conflict_resolver,
std::unique_ptr<const storage::Commit> left,
std::unique_ptr<const storage::Commit> right,
std::unique_ptr<const storage::Commit> ancestor,
fit::function<void(Status)> callback)
: storage_(storage),
manager_(page_manager),
conflict_resolver_(conflict_resolver),
left_(std::move(left)),
right_(std::move(right)),
ancestor_(std::move(ancestor)),
callback_(std::move(callback)),
merge_result_provider_binding_(this),
weak_factory_(this) {
FXL_DCHECK(left_->GetTimestamp() >= right_->GetTimestamp());
FXL_DCHECK(callback_);
}
ConflictResolverClient::~ConflictResolverClient() {
if (journal_) {
storage_->RollbackJournal(std::move(journal_),
[](storage::Status /*status*/) {});
}
}
void ConflictResolverClient::Start() {
// Prepare the journal for the merge commit.
storage_->StartMergeCommit(
left_->GetId(), right_->GetId(),
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this](storage::Status status,
std::unique_ptr<storage::Journal> journal) {
if (cancelled_) {
Finalize(Status::INTERNAL_ERROR);
return;
}
journal_ = std::move(journal);
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to start merge commit: "
<< fidl::ToUnderlying(status);
Finalize(PageUtils::ConvertStatus(status));
return;
}
PageSnapshotPtr page_snapshot_ancestor;
manager_->BindPageSnapshot(ancestor_->Clone(),
page_snapshot_ancestor.NewRequest(), "");
PageSnapshotPtr page_snapshot_left;
manager_->BindPageSnapshot(left_->Clone(),
page_snapshot_left.NewRequest(), "");
PageSnapshotPtr page_snapshot_right;
manager_->BindPageSnapshot(right_->Clone(),
page_snapshot_right.NewRequest(), "");
in_client_request_ = true;
conflict_resolver_->Resolve(
std::move(page_snapshot_left), std::move(page_snapshot_right),
std::move(page_snapshot_ancestor),
merge_result_provider_binding_.NewBinding());
}));
}
void ConflictResolverClient::Cancel() {
cancelled_ = true;
if (in_client_request_) {
Finalize(Status::INTERNAL_ERROR);
}
}
void ConflictResolverClient::OnNextMergeResult(
const MergedValue& merged_value,
const fxl::RefPtr<
callback::Waiter<storage::Status, storage::ObjectIdentifier>>& waiter) {
switch (merged_value.source) {
case ValueSource::RIGHT: {
std::string key = convert::ToString(merged_value.key);
storage_->GetEntryFromCommit(
*right_, key,
[key, callback = waiter->NewCallback()](storage::Status status,
storage::Entry entry) {
if (status != storage::Status::OK) {
if (status == storage::Status::NOT_FOUND) {
FXL_LOG(ERROR)
<< "Key " << key
<< " is not present in the right change. Unable to proceed";
}
callback(status, {});
return;
}
callback(storage::Status::OK, entry.object_identifier);
});
break;
}
case ValueSource::NEW: {
if (merged_value.new_value->is_bytes()) {
storage_->AddObjectFromLocal(storage::ObjectType::BLOB,
storage::DataSource::Create(std::move(
merged_value.new_value->bytes())),
waiter->NewCallback());
} else {
storage::ObjectIdentifier object_identifier;
Status status = manager_->ResolveReference(
std::move(merged_value.new_value->reference()), &object_identifier);
if (status != Status::OK) {
waiter->NewCallback()(storage::Status::NOT_FOUND, {});
return;
}
waiter->NewCallback()(storage::Status::OK,
std::move(object_identifier));
}
break;
}
case ValueSource::DELETE: {
journal_->Delete(merged_value.key,
[callback = waiter->NewCallback()](
storage::Status status) { callback(status, {}); });
break;
}
}
}
void ConflictResolverClient::Finalize(Status status) {
FXL_DCHECK(callback_) << "Finalize must only be called once.";
if (journal_) {
storage_->RollbackJournal(std::move(journal_),
[](storage::Status /*rollback_status*/) {});
journal_.reset();
}
merge_result_provider_binding_.Close(status);
auto callback = std::move(callback_);
callback_ = nullptr;
callback(status);
}
void ConflictResolverClient::GetFullDiffNew(
std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<DiffEntry>,
std::unique_ptr<Token>)>
callback) {
GetDiff(diff_utils::DiffType::FULL, std::move(token), std::move(callback));
}
void ConflictResolverClient::GetFullDiff(
std::unique_ptr<Token> token,
fit::function<void(Status, Status, std::vector<DiffEntry>,
std::unique_ptr<Token>)>
callback) {
GetFullDiffNew(
std::move(token),
[callback = std::move(callback)](
Status status, IterationStatus diff_status,
std::vector<DiffEntry> entries, std::unique_ptr<Token> token) {
if (status != Status::OK && status != Status::PARTIAL_RESULT) {
callback(status, status, std::move(entries), std::move(token));
return;
}
callback(Status::OK,
diff_status == IterationStatus::OK ? Status::OK
: Status::PARTIAL_RESULT,
std::move(entries), std::move(token));
});
}
void ConflictResolverClient::GetConflictingDiffNew(
std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<DiffEntry>,
std::unique_ptr<Token>)>
callback) {
GetDiff(diff_utils::DiffType::CONFLICTING, std::move(token),
std::move(callback));
}
void ConflictResolverClient::GetConflictingDiff(
std::unique_ptr<Token> token,
fit::function<void(Status, Status, std::vector<DiffEntry>,
std::unique_ptr<Token>)>
callback) {
GetConflictingDiffNew(
std::move(token),
[callback = std::move(callback)](
Status status, IterationStatus diff_status,
std::vector<DiffEntry> entries, std::unique_ptr<Token> token) {
if (status != Status::OK && status != Status::PARTIAL_RESULT) {
callback(status, status, std::move(entries), std::move(token));
return;
}
callback(Status::OK,
diff_status == IterationStatus::OK ? Status::OK
: Status::PARTIAL_RESULT,
std::move(entries), std::move(token));
});
}
void ConflictResolverClient::GetDiff(
diff_utils::DiffType type, std::unique_ptr<Token> token,
fit::function<void(Status, IterationStatus, std::vector<DiffEntry>,
std::unique_ptr<Token>)>
callback) {
diff_utils::ComputeThreeWayDiff(
storage_, *ancestor_, *left_, *right_, "",
token ? convert::ToString(token->opaque_id) : "", type,
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, callback = std::move(callback)](
Status status,
std::pair<std::vector<DiffEntry>, std::string> page_change) {
if (cancelled_) {
callback(Status::INTERNAL_ERROR, IterationStatus::OK,
{}, nullptr);
Finalize(Status::INTERNAL_ERROR);
return;
}
if (status != Status::OK) {
FXL_LOG(ERROR) << "Unable to compute diff due to error "
<< fidl::ToUnderlying(status) << ", aborting.";
callback(status, IterationStatus::OK,
{}, nullptr);
Finalize(status);
return;
}
const std::string& next_token = page_change.second;
IterationStatus diff_status = next_token.empty()
? IterationStatus::OK
: IterationStatus::PARTIAL_RESULT;
std::unique_ptr<Token> token;
if (!next_token.empty()) {
token = std::make_unique<Token>();
token->opaque_id = convert::ToArray(next_token);
}
callback(Status::OK, diff_status, std::move(page_change.first),
std::move(token));
}));
}
void ConflictResolverClient::MergeNew(
std::vector<MergedValue> merged_values,
fit::function<void(Status)> callback) {
has_merged_values_ = true;
operation_serializer_.Serialize<Status>(
std::move(callback), [this, weak_this = weak_factory_.GetWeakPtr(),
merged_values = std::move(merged_values)](
fit::function<void(Status)> callback) mutable {
if (!IsInValidStateAndNotify(weak_this, callback)) {
return;
}
auto waiter = fxl::MakeRefCounted<
callback::Waiter<storage::Status, storage::ObjectIdentifier>>(
storage::Status::OK);
for (const MergedValue& merged_value : merged_values) {
OnNextMergeResult(merged_value, waiter);
}
waiter->Finalize([this, weak_this,
merged_values = std::move(merged_values),
callback = std::move(callback)](
storage::Status status,
std::vector<storage::ObjectIdentifier>
object_identifiers) mutable {
if (!IsInValidStateAndNotify(weak_this, callback, status)) {
return;
}
auto waiter =
fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>(
storage::Status::OK);
for (size_t i = 0; i < object_identifiers.size(); ++i) {
// DELETE is encoded with an invalid digest in OnNextMergeResult.
if (!object_identifiers[i].object_digest().IsValid()) {
continue;
}
journal_->Put(merged_values.at(i).key, object_identifiers[i],
merged_values.at(i).priority == Priority::EAGER
? storage::KeyPriority::EAGER
: storage::KeyPriority::LAZY,
waiter->NewCallback());
}
waiter->Finalize(
[callback = std::move(callback)](storage::Status status) {
callback(PageUtils::ConvertStatus(status));
});
});
});
}
void ConflictResolverClient::Merge(
std::vector<MergedValue> merged_values,
fit::function<void(Status, Status)> callback) {
MergeNew(std::move(merged_values),
[callback = std::move(callback)](Status status) {
callback(status, status);
});
}
void ConflictResolverClient::MergeNonConflictingEntriesNew(
fit::function<void(Status)> callback) {
operation_serializer_.Serialize<Status>(
std::move(callback), [this, weak_this = weak_factory_.GetWeakPtr()](
fit::function<void(Status)> callback) mutable {
if (!IsInValidStateAndNotify(weak_this, callback)) {
return;
}
auto waiter =
fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>(
storage::Status::OK);
auto on_next = [this, weak_this,
waiter](storage::ThreeWayChange change) {
if (!weak_this) {
return false;
}
// When |MergeNonConflictingEntries| is called first, we know that the
// base state of |journal_| is equal to the left version. In that
// case, we only want to merge diffs where the change is only on the
// right side: no change means no diff, 3 different versions means
// conflict (so we skip), and left-only changes are already taken into
// account.
if (util::EqualPtr(change.base, change.left)) {
if (change.right) {
journal_->Put(change.right->key, change.right->object_identifier,
change.right->priority, waiter->NewCallback());
} else {
journal_->Delete(change.base->key, waiter->NewCallback());
}
} else if (util::EqualPtr(change.base, change.right) &&
has_merged_values_) {
if (change.left) {
journal_->Put(change.left->key, change.left->object_identifier,
change.left->priority, waiter->NewCallback());
} else {
journal_->Delete(change.base->key, waiter->NewCallback());
}
}
return true;
};
auto on_done = [waiter, callback = std::move(callback)](
storage::Status status) mutable {
if (status != storage::Status::OK) {
callback(PageUtils::ConvertStatus(status));
return;
}
waiter->Finalize(
[callback = std::move(callback)](storage::Status status) {
callback(PageUtils::ConvertStatus(status));
});
};
storage_->GetThreeWayContentsDiff(*ancestor_, *left_, *right_, "",
std::move(on_next),
std::move(on_done));
});
}
void ConflictResolverClient::MergeNonConflictingEntries(
fit::function<void(Status, Status)> callback) {
MergeNonConflictingEntriesNew(
[callback = std::move(callback)](Status status) {
callback(status, status);
});
}
void ConflictResolverClient::DoneNew(fit::function<void(Status)> callback) {
operation_serializer_.Serialize<Status>(
std::move(callback), [this, weak_this = weak_factory_.GetWeakPtr()](
fit::function<void(Status)> callback) mutable {
if (!IsInValidStateAndNotify(weak_this, callback)) {
return;
}
in_client_request_ = false;
FXL_DCHECK(!cancelled_);
FXL_DCHECK(journal_);
storage_->CommitJournal(
std::move(journal_),
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, weak_this, callback = std::move(callback)](
storage::Status status,
std::unique_ptr<const storage::Commit>) {
if (!IsInValidStateAndNotify(weak_this, callback, status)) {
return;
}
callback(Status::OK);
Finalize(Status::OK);
}));
});
}
void ConflictResolverClient::Done(
fit::function<void(Status, Status)> callback) {
DoneNew([callback = std::move(callback)](Status status) {
callback(status, status);
});
}
bool ConflictResolverClient::IsInValidStateAndNotify(
const fxl::WeakPtr<ConflictResolverClient>& weak_this,
const fit::function<void(Status)>& callback, storage::Status status) {
if (!weak_this) {
callback(Status::INTERNAL_ERROR);
return false;
}
if (!weak_this->cancelled_ && status == storage::Status::OK) {
return true;
}
Status ledger_status =
weak_this->cancelled_
? Status::INTERNAL_ERROR
// The only not found error that can occur is a key not
// found when processing a MergedValue with
// ValueSource::RIGHT.
: PageUtils::ConvertStatus(status, Status::KEY_NOT_FOUND);
// An eventual error was logged before, no need to do it again here.
callback(ledger_status);
// Finalize destroys this object; we need to do it after executing
// the callback.
weak_this->Finalize(ledger_status);
return false;
}
} // namespace ledger