blob: f9f3ae5c9556b096d391d53f20be8ecb3677ebd5 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/impl/commit_pruner.h"
#include <lib/async/cpp/task.h>
#include <utility>
#include <variant>
#include "src/ledger/bin/storage/public/types.h"
#include "src/ledger/lib/callback/waiter.h"
#include "src/ledger/lib/coroutine/coroutine_waiter.h"
#include "src/ledger/lib/logging/logging.h"
#include "src/ledger/lib/memory/ref_ptr.h"
namespace storage {
namespace {
// Extracts the parents of commits with the highest generation in |frontier|, removing those commits
// from |frontier| and returning their parents in |parents|.
Status ExploreGeneration(
coroutine::CoroutineHandler* handler, CommitPruner::CommitPrunerDelegate* delegate,
std::set<std::unique_ptr<const Commit>, storage::GenerationComparator>* frontier,
std::vector<std::unique_ptr<const Commit>>* parents) {
uint64_t expected_generation = (*frontier->begin())->GetGeneration();
auto waiter =
ledger::MakeRefCounted<ledger::Waiter<Status, std::unique_ptr<const Commit>>>(Status::OK);
while (!frontier->empty() && expected_generation == (*frontier->begin())->GetGeneration()) {
// Pop the newest commit.
const std::unique_ptr<const Commit>& commit = *frontier->begin();
// Request its parents.
for (const auto& parent_id : commit->GetParentIds()) {
delegate->GetCommit(parent_id, waiter->NewCallback());
}
frontier->erase(frontier->begin());
}
Status status;
if (coroutine::Wait(handler, std::move(waiter), &status, parents) ==
coroutine::ContinuationStatus::INTERRUPTED) {
return Status::INTERRUPTED;
}
return status;
}
} // namespace
CommitPruner::CommitPruner(ledger::Environment* environment, CommitPrunerDelegate* delegate,
LiveCommitTracker* commit_tracker, CommitPruningPolicy policy)
: environment_(environment),
delegate_(delegate),
commit_tracker_(commit_tracker),
policy_(policy),
coroutine_manager_(environment_->coroutine_service()) {}
CommitPruner::~CommitPruner() = default;
void CommitPruner::SchedulePruning() {
if (state_ == PruningState::IDLE) {
Prune();
} else if (state_ == PruningState::PRUNING) {
state_ = PruningState::PRUNING_AND_SCHEDULED;
} else {
LEDGER_DCHECK(state_ == PruningState::PRUNING_AND_SCHEDULED);
}
}
void CommitPruner::Prune() {
LEDGER_DCHECK(state_ == PruningState::IDLE);
state_ = PruningState::PRUNING;
coroutine_manager_.StartCoroutine([this](coroutine::CoroutineHandler* handler) {
// Yield to be resumed as a task.
if (coroutine::SyncCall(handler, [this](fit::closure on_done) {
async::PostTask(environment_->dispatcher(), std::move(on_done));
}) == coroutine::ContinuationStatus::INTERRUPTED) {
return;
}
Status s = SynchronousPrune(handler);
if (s != Status::OK) {
if (s != Status::INTERRUPTED) {
LEDGER_LOG(ERROR) << "Commit pruning failed with status " << s;
}
state_ = PruningState::IDLE;
return;
}
if (state_ == PruningState::PRUNING_AND_SCHEDULED) {
state_ = PruningState::IDLE;
Prune();
} else {
LEDGER_DCHECK(state_ == PruningState::PRUNING);
state_ = PruningState::IDLE;
}
});
}
void CommitPruner::CommitPruner::LoadClock(clocks::DeviceId self_id, Clock clock) {
self_id_ = std::move(self_id);
clock_ = std::move(clock);
}
Status CommitPruner::SynchronousPrune(coroutine::CoroutineHandler* handler) {
if (policy_ == CommitPruningPolicy::NEVER) {
return Status::OK;
}
LEDGER_DCHECK(policy_ == CommitPruningPolicy::LOCAL_IMMEDIATE);
std::unique_ptr<const storage::Commit> luca;
RETURN_ON_ERROR(FindLatestUniqueCommonAncestorSync(handler, &luca));
auto it = clock_.find(self_id_);
if (it == clock_.end()) {
clock_[self_id_] = DeviceEntry{};
} else if (std::holds_alternative<ClockTombstone>(it->second)) {
it->second = DeviceEntry{};
}
std::get<DeviceEntry>(clock_[self_id_]).head = {luca->GetId(), luca->GetGeneration()};
RETURN_ON_ERROR(delegate_->SetClock(handler, clock_));
std::vector<std::unique_ptr<const Commit>> commits;
RETURN_ON_ERROR(GetAllAncestors(handler, std::move(luca), &commits));
if (commits.empty()) {
return Status::OK;
}
return delegate_->DeleteCommits(handler, std::move(commits));
}
// The algorithm goes as follows: we keep a set of "active" commits, ordered
// by generation order. Until this set has only one element, we take the
// commit with the greater generation (the one deepest in the commit graph)
// and replace it by its parent. If we seed the initial set with two commits,
// we get their unique lowest common ancestor.
// At each step of the iteration we request the parent commits of all commits
// with the same generation.
Status CommitPruner::FindLatestUniqueCommonAncestorSync(
coroutine::CoroutineHandler* handler, std::unique_ptr<const storage::Commit>* result) {
auto live_commits = commit_tracker_->GetLiveCommits();
std::set<std::unique_ptr<const storage::Commit>, storage::GenerationComparator> commits(
std::move_iterator(live_commits.begin()), std::move_iterator(live_commits.end()));
while (commits.size() > 1) {
// Pop the newest commits and retrieve their parents.
std::vector<std::unique_ptr<const Commit>> parents;
RETURN_ON_ERROR(ExploreGeneration(handler, delegate_, &commits, &parents));
// Once the parents have been retrieved, add these in the set.
for (auto& parent : parents) {
commits.insert(std::move(parent));
}
}
LEDGER_DCHECK(commits.size() == 1);
*result = std::move(commits.extract(commits.begin()).value());
return Status::OK;
}
Status CommitPruner::GetAllAncestors(coroutine::CoroutineHandler* handler,
std::unique_ptr<const Commit> base,
std::vector<std::unique_ptr<const Commit>>* result) {
std::set<std::unique_ptr<const Commit>, storage::GenerationComparator> frontier;
frontier.insert(std::move(base));
std::set<std::unique_ptr<const Commit>, storage::GenerationComparator> ancestor_set;
while (!frontier.empty()) {
std::vector<std::unique_ptr<const Commit>> parents;
Status status = ExploreGeneration(handler, delegate_, &frontier, &parents);
if (status == Status::INTERNAL_NOT_FOUND) {
// We are at the end of the commits to be pruned (the other commits are already pruned), exit
// the loop.
break;
}
RETURN_ON_ERROR(status);
for (auto& parent : parents) {
ancestor_set.insert(parent->Clone());
frontier.insert(std::move(parent));
}
}
result->clear();
result->reserve(ancestor_set.size());
for (auto it = ancestor_set.begin(); it != ancestor_set.end();) {
result->push_back(std::move(ancestor_set.extract(it++).value()));
}
return Status::OK;
}
} // namespace storage