| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/app/merging/merge_resolver.h" |
| |
| #include <lib/callback/scoped_callback.h> |
| #include <lib/callback/trace_callback.h> |
| #include <lib/callback/waiter.h> |
| #include <lib/fit/defer.h> |
| #include <lib/fit/function.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <queue> |
| #include <set> |
| #include <utility> |
| |
| #include "src/ledger/bin/app/merging/common_ancestor.h" |
| #include "src/ledger/bin/app/merging/ledger_merge_manager.h" |
| #include "src/ledger/bin/app/merging/merge_strategy.h" |
| #include "src/ledger/bin/app/page_manager.h" |
| #include "src/ledger/bin/app/page_utils.h" |
| #include "src/ledger/bin/cobalt/cobalt.h" |
| #include "src/ledger/lib/coroutine/coroutine.h" |
| #include "src/ledger/lib/coroutine/coroutine_waiter.h" |
| #include "src/lib/fxl/memory/ref_ptr.h" |
| #include "src/lib/fxl/memory/weak_ptr.h" |
| |
| namespace ledger { |
| |
| // Enumerates merge candidates' indexes among current head commits. |
| class MergeResolver::MergeCandidates { |
| public: |
| MergeCandidates(); |
| |
| // Resets the MergeCandidates and sets the total number of head commits to |
| // |head_count|. |
| void ResetCandidates(size_t head_count); |
| |
| // Returns whether MergeCandidates should be reset. A reset is necessary |
| // when the head commits have changed, i.e. when there is a successful merge |
| // or on a new commit. |
| bool NeedsReset() { return needs_reset_; } |
| |
| // Returns the current pair on indexes of head commits to be merged. |
| std::pair<size_t, size_t> GetCurrentPair(); |
| |
| // Returns whether there is a merge candidate pair available. |
| bool HasCandidate(); |
| |
| // Returns true if there was a network error in one of the previous merge |
| // attempts. This does not include merges before |ResetCandidates| was |
| // called. |
| bool HadNetworkErrors() { return had_network_errors_; } |
| |
| // Should be called after a successful merge. |
| void OnMergeSuccess(); |
| |
| // Should be called after an unsuccessful merge. |
| void OnMergeError(storage::Status status); |
| |
| // Should be called when new commits are available. |
| void OnNewCommits(); |
| |
| // Returns the number of head commits. |
| size_t head_count() { return head_count_; } |
| |
| private: |
| // Advances to the next available pair of merge candidates. |
| void PrepareNext(); |
| |
| size_t head_count_; |
| std::pair<size_t, size_t> current_pair_; |
| bool needs_reset_ = true; |
| bool had_network_errors_ = false; |
| }; |
| |
| MergeResolver::MergeCandidates::MergeCandidates() {} |
| |
| void MergeResolver::MergeCandidates::ResetCandidates(size_t head_count) { |
| head_count_ = head_count; |
| current_pair_ = {0, 1}; |
| needs_reset_ = false; |
| had_network_errors_ = false; |
| } |
| |
| bool MergeResolver::MergeCandidates::HasCandidate() { |
| return current_pair_.first != head_count_ - 1; |
| } |
| |
| std::pair<size_t, size_t> MergeResolver::MergeCandidates::GetCurrentPair() { |
| return current_pair_; |
| } |
| |
| void MergeResolver::MergeCandidates::OnMergeSuccess() { needs_reset_ = true; } |
| |
| void MergeResolver::MergeCandidates::OnMergeError(storage::Status status) { |
| if (status == storage::Status::NETWORK_ERROR) { |
| // The contents of the common ancestor are unavailable locally and it wasn't |
| // possible to retrieve them through the network: Ignore this pair of heads |
| // for now. |
| had_network_errors_ = true; |
| PrepareNext(); |
| } else { |
| FXL_LOG(WARNING) << "Merging failed. Will try again later."; |
| } |
| } |
| |
| void MergeResolver::MergeCandidates::OnNewCommits() { needs_reset_ = true; } |
| |
| void MergeResolver::MergeCandidates::PrepareNext() { |
| ++current_pair_.second; |
| if (current_pair_.second == head_count_) { |
| ++current_pair_.first; |
| current_pair_.second = current_pair_.first + 1; |
| } |
| } |
| |
| MergeResolver::MergeResolver(fit::closure on_destroyed, |
| Environment* environment, |
| storage::PageStorage* storage, |
| std::unique_ptr<backoff::Backoff> backoff) |
| : coroutine_service_(environment->coroutine_service()), |
| storage_(storage), |
| backoff_(std::move(backoff)), |
| merge_candidates_(std::make_unique<MergeCandidates>()), |
| on_destroyed_(std::move(on_destroyed)), |
| task_runner_(environment->dispatcher()) { |
| storage_->AddCommitWatcher(this); |
| PostCheckConflicts(DelayedStatus::DONT_DELAY); |
| } |
| |
| MergeResolver::~MergeResolver() { |
| storage_->RemoveCommitWatcher(this); |
| on_destroyed_(); |
| } |
| |
| void MergeResolver::set_on_empty(fit::closure on_empty_callback) { |
| on_empty_callback_ = std::move(on_empty_callback); |
| } |
| |
| bool MergeResolver::IsEmpty() { return !merge_in_progress_; } |
| |
| bool MergeResolver::HasUnfinishedMerges() { |
| return merge_in_progress_ || check_conflicts_in_progress_ || |
| check_conflicts_task_count_ != 0 || in_delay_ || |
| merge_candidates_->HadNetworkErrors(); |
| } |
| |
| void MergeResolver::SetMergeStrategy(std::unique_ptr<MergeStrategy> strategy) { |
| if (merge_in_progress_) { |
| FXL_DCHECK(strategy_); |
| // The new strategy can be the empty strategy (nullptr), so we need a |
| // separate boolean to know if we have a pending strategy change to make. |
| has_next_strategy_ = true; |
| next_strategy_ = std::move(strategy); |
| strategy_->Cancel(); |
| return; |
| } |
| strategy_.swap(strategy); |
| if (strategy_) { |
| PostCheckConflicts(DelayedStatus::DONT_DELAY); |
| } |
| } |
| |
| void MergeResolver::SetPageManager(PageManager* page_manager) { |
| FXL_DCHECK(page_manager_ == nullptr); |
| page_manager_ = page_manager; |
| } |
| |
| void MergeResolver::RegisterNoConflictCallback( |
| fit::function<void(ConflictResolutionWaitStatus)> callback) { |
| no_conflict_callbacks_.push_back(std::move(callback)); |
| } |
| |
| void MergeResolver::OnNewCommits( |
| const std::vector<std::unique_ptr<const storage::Commit>>& /*commits*/, |
| storage::ChangeSource source) { |
| merge_candidates_->OnNewCommits(); |
| PostCheckConflicts(source == storage::ChangeSource::LOCAL |
| ? DelayedStatus::DONT_DELAY |
| // We delay remote commits. |
| : DelayedStatus::MAY_DELAY); |
| } |
| |
| void MergeResolver::PostCheckConflicts(DelayedStatus delayed_status) { |
| check_conflicts_task_count_++; |
| task_runner_.PostTask([this, delayed_status] { |
| check_conflicts_task_count_--; |
| CheckConflicts(delayed_status); |
| }); |
| } |
| |
| void MergeResolver::CheckConflicts(DelayedStatus delayed_status) { |
| if (!strategy_ || merge_in_progress_ || check_conflicts_in_progress_ || |
| in_delay_) { |
| // No strategy is set, or a merge is already in progress, or we are already |
| // checking for conflicts, or we are delaying merges. Let's bail out early. |
| return; |
| } |
| check_conflicts_in_progress_ = true; |
| std::vector<std::unique_ptr<const storage::Commit>> heads; |
| storage::Status s = storage_->GetHeadCommits(&heads); |
| check_conflicts_in_progress_ = false; |
| |
| if (merge_candidates_->NeedsReset()) { |
| merge_candidates_->ResetCandidates(heads.size()); |
| } |
| FXL_DCHECK(merge_candidates_->head_count() == heads.size()) |
| << merge_candidates_->head_count() << " != " << heads.size(); |
| |
| if (s != storage::Status::OK || heads.size() == 1 || |
| !(merge_candidates_->HasCandidate())) { |
| // An error occurred, or there is no conflict we can resolve. In |
| // either case, return early. |
| if (s != storage::Status::OK) { |
| FXL_LOG(ERROR) << "Failed to get head commits with status " << s; |
| } else if (heads.size() == 1) { |
| for (auto& callback : no_conflict_callbacks_) { |
| callback(has_merged_ ? ConflictResolutionWaitStatus::CONFLICTS_RESOLVED |
| : ConflictResolutionWaitStatus::NO_CONFLICTS); |
| } |
| no_conflict_callbacks_.clear(); |
| has_merged_ = false; |
| } |
| if (on_empty_callback_) { |
| on_empty_callback_(); |
| } |
| return; |
| } |
| if (!strategy_) { |
| if (on_empty_callback_) { |
| on_empty_callback_(); |
| } |
| return; |
| } |
| merge_in_progress_ = true; |
| std::pair<size_t, size_t> head_indexes = merge_candidates_->GetCurrentPair(); |
| ResolveConflicts(delayed_status, std::move(heads[head_indexes.first]), |
| std::move(heads[head_indexes.second])); |
| } |
| |
| void MergeResolver::ResolveConflicts( |
| DelayedStatus delayed_status, std::unique_ptr<const storage::Commit> head1, |
| std::unique_ptr<const storage::Commit> head2) { |
| auto cleanup = fit::defer(task_runner_.MakeScoped([this, delayed_status] { |
| // |merge_in_progress_| must be reset before calling |
| // |on_empty_callback_|. |
| merge_in_progress_ = false; |
| |
| if (has_next_strategy_) { |
| strategy_ = std::move(next_strategy_); |
| next_strategy_.reset(); |
| has_next_strategy_ = false; |
| } |
| PostCheckConflicts(delayed_status); |
| // Call on_empty_callback_ at the very end as it might delete the |
| // resolver. |
| if (on_empty_callback_) { |
| on_empty_callback_(); |
| } |
| })); |
| uint64_t id = TRACE_NONCE(); |
| TRACE_ASYNC_BEGIN("ledger", "merge", id); |
| auto tracing = fit::defer([id] { TRACE_ASYNC_END("ledger", "merge", id); }); |
| |
| FXL_DCHECK(storage::Commit::TimestampOrdered(head1, head2)); |
| |
| if (head1->GetParentIds().size() == 2 && head2->GetParentIds().size() == 2) { |
| if (delayed_status == DelayedStatus::MAY_DELAY) { |
| // If trying to merge 2 merge commits, add some delay with |
| // exponential backoff. |
| auto delay_callback = [this] { |
| in_delay_ = false; |
| CheckConflicts(DelayedStatus::DONT_DELAY); |
| }; |
| in_delay_ = true; |
| task_runner_.PostDelayedTask( |
| TRACE_CALLBACK(std::move(delay_callback), "ledger", "merge_delay"), |
| backoff_->GetNext()); |
| cleanup.cancel(); |
| merge_in_progress_ = false; |
| // We don't want to continue merging if nobody is interested |
| // (all clients disconnected). |
| if (on_empty_callback_) { |
| on_empty_callback_(); |
| } |
| return; |
| } |
| // If delayed_status is not initial, report the merge. |
| ReportEvent(CobaltEvent::MERGED_COMMITS_MERGED); |
| } else { |
| // No longer merging 2 merge commits, reinitialize the exponential |
| // backoff. |
| backoff_->Reset(); |
| } |
| |
| // Merge the first two commits using the most recent one as the |
| // base. |
| RecursiveMergeOneStep( |
| std::move(head1), std::move(head2), |
| [cleanup = std::move(cleanup), tracing = std::move(tracing)] { |
| ReportEvent(CobaltEvent::COMMITS_MERGED); |
| }); |
| } |
| |
| void MergeResolver::RecursiveMergeOneStep( |
| std::unique_ptr<const storage::Commit> left, |
| std::unique_ptr<const storage::Commit> right, |
| fit::closure on_successful_merge) { |
| coroutine_service_->StartCoroutine( |
| [this, left = std::move(left), right = std::move(right), |
| on_successful_merge = std::move(on_successful_merge)]( |
| coroutine::CoroutineHandler* handler) mutable { |
| TRACE_DURATION("ledger", "recursive_merge"); |
| storage::Status status = |
| RecursiveMergeSync(handler, std::move(left), std::move(right)); |
| if (status == storage::Status::INTERRUPTED) { |
| return; |
| } |
| if (status != storage::Status::OK) { |
| FXL_LOG(ERROR) << "Recursive merge failed"; |
| return; |
| } |
| on_successful_merge(); |
| }); |
| } |
| |
| storage::Status MergeResolver::MergeCommitsToContentOfLeftSync( |
| coroutine::CoroutineHandler* handler, |
| std::unique_ptr<const storage::Commit> left, |
| std::unique_ptr<const storage::Commit> right) { |
| std::unique_ptr<storage::Journal> journal = |
| storage_->StartMergeCommit(std::move(left), std::move(right)); |
| has_merged_ = true; |
| |
| storage::Status status; |
| std::unique_ptr<const storage::Commit> commit; |
| auto sync_call_status = coroutine::SyncCall( |
| handler, |
| [this, journal = std::move(journal)]( |
| fit::function<void(storage::Status status, |
| std::unique_ptr<const storage::Commit>)> |
| callback) mutable { |
| storage_->CommitJournal(std::move(journal), std::move(callback)); |
| }, |
| &status, &commit); |
| if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) { |
| return storage::Status::INTERRUPTED; |
| } |
| return status; |
| } |
| |
| // Synchronously get the commit with id |commit_id|. Try |candidate| if it has |
| // the right id, otherwise fetch it from storage. |
| storage::Status MergeResolver::GetCommitSync( |
| coroutine::CoroutineHandler* handler, storage::CommitIdView commit_id, |
| std::unique_ptr<const storage::Commit> candidate, |
| std::unique_ptr<const storage::Commit>* result) { |
| // Exit early if we already have the commit. |
| if (candidate->GetId() == commit_id) { |
| *result = std::move(candidate); |
| return storage::Status::OK; |
| } |
| |
| storage::Status status; |
| auto sync_call_status = coroutine::SyncCall( |
| handler, |
| [this, |
| commit_id](fit::function<void(storage::Status status, |
| std::unique_ptr<const storage::Commit>)> |
| callback) { |
| storage_->GetCommit(commit_id, std::move(callback)); |
| }, |
| &status, result); |
| if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) { |
| return storage::Status::INTERRUPTED; |
| } |
| // If the strategy has been changed, bail early. |
| if (has_next_strategy_) { |
| return storage::Status::INTERRUPTED; |
| } |
| return status; |
| } |
| |
| // Requests the merges of |right_commit| and any element of |left_commits|, and |
| // return them in |merges|. |
| storage::Status MergeResolver::FindMergesSync( |
| coroutine::CoroutineHandler* handler, |
| const std::vector<storage::CommitId>& left_commits, |
| storage::CommitId right_commit, std::vector<storage::CommitId>* merges) { |
| auto waiter = fxl::MakeRefCounted< |
| callback::Waiter<storage::Status, std::vector<storage::CommitId>>>( |
| storage::Status::OK); |
| for (const auto& left_commit : left_commits) { |
| storage_->GetMergeCommitIds(left_commit, right_commit, |
| waiter->NewCallback()); |
| } |
| storage::Status status; |
| std::vector<std::vector<storage::CommitId>> merge_lists; |
| if (coroutine::Wait(handler, std::move(waiter), &status, &merge_lists) == |
| coroutine::ContinuationStatus::INTERRUPTED) { |
| return storage::Status::INTERRUPTED; |
| } |
| // If the strategy has been changed, bail early. |
| if (has_next_strategy_) { |
| return storage::Status::INTERRUPTED; |
| } |
| if (status != storage::Status::OK) { |
| return status; |
| } |
| for (auto& merge_list : merge_lists) { |
| merges->insert(merges->end(), std::make_move_iterator(merge_list.begin()), |
| std::make_move_iterator(merge_list.end())); |
| } |
| return storage::Status::OK; |
| } |
| |
| // Try to build a merge of all commits in |ancestors|. Either the merge already |
| // exists and is returned in |final_merge| or one intermediate merge is |
| // constructed. |
| storage::Status MergeResolver::MergeSetSync( |
| coroutine::CoroutineHandler* handler, |
| std::vector<std::unique_ptr<const storage::Commit>> ancestors, |
| std::unique_ptr<const storage::Commit>* final_merge) { |
| FXL_DCHECK(!ancestors.empty()); |
| |
| // Sort ancestors by timestamp. This guarantees that, when we call the merge |
| // strategy, the right-hand side commit is always the most recent, and also |
| // matches (as much as possible) the order in which heads would be merged. |
| std::sort(ancestors.begin(), ancestors.end(), |
| storage::Commit::TimestampOrdered); |
| |
| // Build a merge of the first N ancestors. This holds the list of available |
| // merges of all the ancestors examined until now. Since merges have the |
| // maximum timestamp of their parents as timestamps, all commits in this list |
| // are older than the Nth ancestor, but they may have lower or higher commit |
| // ids. |
| std::vector<storage::CommitId> merges; |
| // The first ancestor is a merge of itself. |
| merges.push_back(ancestors[0]->GetId()); |
| |
| for (auto it = ancestors.begin() + 1, end = ancestors.end(); it < end; it++) { |
| // Request the merges of the ancestor |*it| and any element of |merges|. |
| std::vector<storage::CommitId> next_merges; |
| auto& next_ancestor = *it; |
| |
| storage::Status status = |
| FindMergesSync(handler, merges, next_ancestor->GetId(), &next_merges); |
| if (status != storage::Status::OK) { |
| return status; |
| } |
| // If |next_merges| is empty, the merges we need are not present yet. We |
| // call RecursiveMergeOneStep recursively. |
| if (next_merges.empty()) { |
| // Try to create the merge in a deterministic way: order merges by id. |
| std::sort(merges.begin(), merges.end()); |
| |
| // Get |merge[0]| from storage, or from |ancestors[0]| if they are the |
| // same commit. |
| std::unique_ptr<const storage::Commit> last_merge; |
| storage::Status status = GetCommitSync( |
| handler, merges[0], std::move(ancestors[0]), &last_merge); |
| if (status != storage::Status::OK) { |
| return status; |
| } |
| // We know that |last_merge->GetTimestamp() <= |
| // next_ancestor->GetTimestamp()| but the commit id of |last_merge| may be |
| // higher. In case of equality we need to reorder the calls. |
| if (!storage::Commit::TimestampOrdered(last_merge, next_ancestor)) { |
| FXL_DCHECK(last_merge->GetTimestamp() == next_ancestor->GetTimestamp()); |
| return RecursiveMergeSync(handler, std::move(next_ancestor), |
| std::move(last_merge)); |
| } |
| return RecursiveMergeSync(handler, std::move(last_merge), |
| std::move(next_ancestor)); |
| } |
| merges = std::move(next_merges); |
| } |
| |
| FXL_DCHECK(!merges.empty()); |
| |
| // Try to create the merge in a deterministic way: order by id. |
| std::sort(merges.begin(), merges.end()); |
| return GetCommitSync(handler, merges[0], std::move(ancestors[0]), |
| final_merge); |
| } |
| |
| // Does one step of recursive merging: tries to merge |left| and |right| and |
| // either produces a merge commit, or calls itself recursively to merge some |
| // common ancestors. Assumes that |left| is older than |right| according to |
| // |storage::Commit::TimestampOrdered|. |
| storage::Status MergeResolver::RecursiveMergeSync( |
| coroutine::CoroutineHandler* handler, |
| std::unique_ptr<const storage::Commit> left, |
| std::unique_ptr<const storage::Commit> right) { |
| FXL_DCHECK(storage::Commit::TimestampOrdered(left, right)); |
| |
| CommitComparison comparison; |
| std::vector<std::unique_ptr<const storage::Commit>> common_ancestors; |
| storage::Status status; |
| { |
| TRACE_DURATION("ledger", "merge_common_ancestor"); |
| status = |
| FindCommonAncestors(handler, storage_, left->Clone(), right->Clone(), |
| &comparison, &common_ancestors); |
| } |
| if (status != storage::Status::OK) { |
| return status; |
| } |
| // If the strategy has been changed, bail early. |
| if (has_next_strategy_) { |
| return storage::Status::INTERRUPTED; |
| } |
| |
| if (comparison == CommitComparison::LEFT_SUBSET_OF_RIGHT) { |
| return MergeCommitsToContentOfLeftSync(handler, std::move(right), |
| std::move(left)); |
| } else if (comparison == CommitComparison::RIGHT_SUBSET_OF_LEFT) { |
| return MergeCommitsToContentOfLeftSync(handler, std::move(left), |
| std::move(right)); |
| } else if (comparison == CommitComparison::EQUIVALENT) { |
| // The commits are equivalent so we can merge to the content |
| // of either of them. |
| return MergeCommitsToContentOfLeftSync(handler, std::move(left), |
| std::move(right)); |
| } |
| |
| FXL_DCHECK(!common_ancestors.empty()); |
| |
| // MergeSetSync has 3 possible results: |
| // - a non-OK storage::Status |
| // - a commit returned in merge_base |
| // - OK with an empty merge_base |
| std::unique_ptr<const storage::Commit> merge_base; |
| status = MergeSetSync(handler, std::move(common_ancestors), &merge_base); |
| if (status != storage::Status::OK) { |
| return status; |
| } |
| if (!merge_base) { |
| // A commit was made, resume when notified of it. |
| return storage::Status::OK; |
| } |
| |
| has_merged_ = true; |
| |
| storage::Status merge_status; |
| auto sync_call_status = coroutine::SyncCall( |
| handler, |
| [this, left = std::move(left), right = std::move(right), |
| merge_base = std::move(merge_base)]( |
| fit::function<void(storage::Status)> callback) mutable { |
| strategy_->Merge(storage_, page_manager_, std::move(left), |
| std::move(right), std::move(merge_base), |
| TRACE_CALLBACK(std::move(callback), "ledger", |
| "merge_strategy_merge")); |
| }, |
| &merge_status); |
| if (sync_call_status == coroutine::ContinuationStatus::INTERRUPTED) { |
| return storage::Status::INTERRUPTED; |
| } |
| if (merge_status != storage::Status::OK) { |
| merge_candidates_->OnMergeError(merge_status); |
| return storage::Status::ILLEGAL_STATE; |
| } |
| merge_candidates_->OnMergeSuccess(); |
| return storage::Status::OK; |
| } |
| |
| } // namespace ledger |