blob: 071d5d2c48ce47776307728772d8b56c5fde135c [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/app/merging/auto_merge_strategy.h"
#include <memory>
#include <string>
#include <vector>
#include <lib/callback/scoped_callback.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>
#include <lib/fxl/memory/weak_ptr.h>
#include "peridot/bin/ledger/app/merging/conflict_resolver_client.h"
#include "peridot/bin/ledger/app/page_manager.h"
#include "peridot/bin/ledger/app/page_utils.h"
namespace ledger {
class AutoMergeStrategy::AutoMerger {
public:
AutoMerger(storage::PageStorage* storage, PageManager* page_manager,
ConflictResolver* conflict_resolver,
std::unique_ptr<const storage::Commit> left,
std::unique_ptr<const storage::Commit> right,
std::unique_ptr<const storage::Commit> ancestor,
fit::function<void(Status)> callback);
~AutoMerger();
void Start();
void Cancel();
void Done(Status status);
private:
void OnRightChangeReady(
storage::Status status,
std::unique_ptr<std::vector<storage::EntryChange>> right_change);
void OnComparisonDone(
storage::Status status,
std::unique_ptr<std::vector<storage::EntryChange>> right_changes,
bool distinct);
void ApplyDiffOnJournal(
std::unique_ptr<storage::Journal> journal,
std::unique_ptr<std::vector<storage::EntryChange>> diff);
storage::PageStorage* const storage_;
PageManager* const manager_;
ConflictResolver* const conflict_resolver_;
std::unique_ptr<const storage::Commit> left_;
std::unique_ptr<const storage::Commit> right_;
std::unique_ptr<const storage::Commit> ancestor_;
std::unique_ptr<ConflictResolverClient> delegated_merge_;
fit::function<void(Status)> callback_;
bool cancelled_ = false;
// This must be the last member of the class.
fxl::WeakPtrFactory<AutoMergeStrategy::AutoMerger> weak_factory_;
};
AutoMergeStrategy::AutoMerger::AutoMerger(
storage::PageStorage* storage, PageManager* page_manager,
ConflictResolver* conflict_resolver,
std::unique_ptr<const storage::Commit> left,
std::unique_ptr<const storage::Commit> right,
std::unique_ptr<const storage::Commit> ancestor,
fit::function<void(Status)> callback)
: storage_(storage),
manager_(page_manager),
conflict_resolver_(conflict_resolver),
left_(std::move(left)),
right_(std::move(right)),
ancestor_(std::move(ancestor)),
callback_(std::move(callback)),
weak_factory_(this) {
FXL_DCHECK(callback_);
}
AutoMergeStrategy::AutoMerger::~AutoMerger() {}
void AutoMergeStrategy::AutoMerger::Start() {
auto changes = std::make_unique<std::vector<storage::EntryChange>>();
auto on_next = [weak_this = weak_factory_.GetWeakPtr(),
changes = changes.get()](storage::EntryChange change) {
if (!weak_this) {
return false;
}
if (weak_this->cancelled_) {
return false;
}
changes->push_back(change);
return true;
};
auto callback = callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, changes = std::move(changes)](storage::Status status) mutable {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
OnRightChangeReady(status, std::move(changes));
});
storage_->GetCommitContentsDiff(*ancestor_, *right_, "", std::move(on_next),
std::move(callback));
}
void AutoMergeStrategy::AutoMerger::OnRightChangeReady(
storage::Status status,
std::unique_ptr<std::vector<storage::EntryChange>> right_change) {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to compute right diff due to error " << status
<< ", aborting.";
Done(PageUtils::ConvertStatus(status));
return;
}
if (right_change->empty()) {
OnComparisonDone(storage::Status::OK, std::move(right_change), true);
return;
}
struct PageChangeIndex {
size_t entry_index = 0;
bool distinct = true;
};
auto index = std::make_unique<PageChangeIndex>();
auto on_next = [weak_this = weak_factory_.GetWeakPtr(), index = index.get(),
right_change =
right_change.get()](storage::EntryChange change) {
if (!weak_this || weak_this->cancelled_) {
return false;
}
while (change.entry.key > (*right_change)[index->entry_index].entry.key) {
index->entry_index++;
if (index->entry_index >= right_change->size()) {
return false;
}
}
if (change.entry.key == (*right_change)[index->entry_index].entry.key) {
if (change == (*right_change)[index->entry_index]) {
return true;
}
index->distinct = false;
return false;
}
return true;
};
// |callback| is called when the full diff is computed.
auto callback = callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, right_change = std::move(right_change),
index = std::move(index)](storage::Status status) mutable {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
OnComparisonDone(status, std::move(right_change), index->distinct);
});
storage_->GetCommitContentsDiff(*ancestor_, *left_, "", std::move(on_next),
std::move(callback));
}
void AutoMergeStrategy::AutoMerger::OnComparisonDone(
storage::Status status,
std::unique_ptr<std::vector<storage::EntryChange>> right_changes,
bool distinct) {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
if (status != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to compute left diff due to error " << status
<< ", aborting.";
Done(PageUtils::ConvertStatus(status));
return;
}
if (!distinct) {
// Some keys are overlapping, so we need to proceed like the CUSTOM
// strategy. We could be more efficient if we reused |right_changes| instead
// of re-computing the diff inside |ConflictResolverClient|.
delegated_merge_ = std::make_unique<ConflictResolverClient>(
storage_, manager_, conflict_resolver_, std::move(left_),
std::move(right_), std::move(ancestor_),
callback::MakeScoped(weak_factory_.GetWeakPtr(), [this](Status status) {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
Done(status);
}));
delegated_merge_->Start();
return;
}
// Here, we reuse the diff we computed before to create the merge commit. As
// StartMergeCommit uses the left commit (first parameter) as its base, we
// only have to apply the right diff to it and we are done.
storage_->StartMergeCommit(
left_->GetId(), right_->GetId(),
callback::MakeScoped(
weak_factory_.GetWeakPtr(),
[this, right_changes = std::move(right_changes)](
storage::Status s,
std::unique_ptr<storage::Journal> journal) mutable {
if (cancelled_) {
Done(Status::INTERNAL_ERROR);
return;
}
if (s != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to start merge commit: " << s;
Done(PageUtils::ConvertStatus(s));
return;
}
ApplyDiffOnJournal(std::move(journal), std::move(right_changes));
}));
}
void AutoMergeStrategy::AutoMerger::ApplyDiffOnJournal(
std::unique_ptr<storage::Journal> journal,
std::unique_ptr<std::vector<storage::EntryChange>> diff) {
auto waiter = fxl::MakeRefCounted<callback::StatusWaiter<storage::Status>>(
storage::Status::OK);
for (const storage::EntryChange& change : *diff) {
if (change.deleted) {
journal->Delete(change.entry.key, waiter->NewCallback());
} else {
journal->Put(change.entry.key, change.entry.object_identifier,
change.entry.priority, waiter->NewCallback());
}
}
waiter->Finalize([weak_this = weak_factory_.GetWeakPtr(),
journal = std::move(journal)](storage::Status s) mutable {
if (!weak_this) {
return;
}
if (weak_this->cancelled_) {
weak_this->Done(Status::INTERNAL_ERROR);
return;
}
if (s != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to commit merge journal: " << s;
weak_this->Done(PageUtils::ConvertStatus(s));
return;
}
weak_this->storage_->CommitJournal(
std::move(journal),
[weak_this = std::move(weak_this)](
storage::Status s,
std::unique_ptr<const storage::Commit> /*commit*/) {
if (s != storage::Status::OK) {
FXL_LOG(ERROR) << "Unable to commit merge journal: " << s;
}
if (weak_this) {
weak_this->Done(PageUtils::ConvertStatus(s));
}
});
});
}
void AutoMergeStrategy::AutoMerger::Cancel() {
cancelled_ = true;
if (delegated_merge_) {
delegated_merge_->Cancel();
}
}
void AutoMergeStrategy::AutoMerger::Done(Status status) {
delegated_merge_.reset();
auto callback = std::move(callback_);
callback_ = nullptr;
callback(status);
}
AutoMergeStrategy::AutoMergeStrategy(ConflictResolverPtr conflict_resolver)
: conflict_resolver_(std::move(conflict_resolver)) {
conflict_resolver_.set_error_handler([this](zx_status_t status) {
// If a merge is in progress, it must be terminated.
if (in_progress_merge_) {
// The actual cleanup of in_progress_merge_ will happen in its callback
// callback.
in_progress_merge_->Cancel();
}
if (on_error_) {
// It is safe to call |on_error_| because the error handler waits for the
// merges to finish before deleting this object.
on_error_();
}
});
}
AutoMergeStrategy::~AutoMergeStrategy() {}
void AutoMergeStrategy::SetOnError(fit::closure on_error) {
on_error_ = std::move(on_error);
}
void AutoMergeStrategy::Merge(storage::PageStorage* storage,
PageManager* page_manager,
std::unique_ptr<const storage::Commit> head_1,
std::unique_ptr<const storage::Commit> head_2,
std::unique_ptr<const storage::Commit> ancestor,
fit::function<void(Status)> callback) {
FXL_DCHECK(head_1->GetTimestamp() <= head_2->GetTimestamp());
FXL_DCHECK(!in_progress_merge_);
in_progress_merge_ = std::make_unique<AutoMergeStrategy::AutoMerger>(
storage, page_manager, conflict_resolver_.get(), std::move(head_2),
std::move(head_1), std::move(ancestor),
[this, callback = std::move(callback)](Status status) {
in_progress_merge_.reset();
callback(status);
});
in_progress_merge_->Start();
}
void AutoMergeStrategy::Cancel() {
if (in_progress_merge_) {
in_progress_merge_->Cancel();
}
}
} // namespace ledger