blob: ca3dbdfa9f7630efa1b5abc1d16392efcf9898fd [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/ledger/bin/storage/impl/journal_impl.h"
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <functional>
#include <map>
#include <string>
#include <utility>
#include "src/ledger/bin/storage/impl/btree/builder.h"
#include "src/ledger/bin/storage/impl/btree/tree_node.h"
#include "src/ledger/bin/storage/impl/commit_impl.h"
#include "src/ledger/bin/storage/public/commit.h"
#include "src/lib/fxl/memory/ref_ptr.h"
namespace storage {
JournalImpl::JournalImpl(Token /* token */, ledger::Environment* environment,
PageStorageImpl* page_storage,
std::unique_ptr<const storage::Commit> base)
: environment_(environment),
page_storage_(page_storage),
base_(std::move(base)),
committed_(false) {}
JournalImpl::~JournalImpl() {}
std::unique_ptr<Journal> JournalImpl::Simple(
ledger::Environment* environment, PageStorageImpl* page_storage,
std::unique_ptr<const storage::Commit> base) {
FXL_DCHECK(base);
return std::make_unique<JournalImpl>(Token(), environment, page_storage,
std::move(base));
}
std::unique_ptr<Journal> JournalImpl::Merge(
ledger::Environment* environment, PageStorageImpl* page_storage,
std::unique_ptr<const storage::Commit> base,
std::unique_ptr<const storage::Commit> other) {
FXL_DCHECK(base);
FXL_DCHECK(other);
auto journal = std::make_unique<JournalImpl>(Token(), environment,
page_storage, std::move(base));
journal->other_ = std::move(other);
return journal;
}
void JournalImpl::Commit(
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
FXL_DCHECK(!committed_);
committed_ = true;
std::vector<std::unique_ptr<const storage::Commit>> parents;
if (other_) {
parents.reserve(2);
parents.push_back(std::move(base_));
parents.push_back(std::move(other_));
} else {
parents.reserve(1);
parents.push_back(std::move(base_));
}
std::vector<storage::EntryChange> changes;
for (auto [key, entry_change] : journal_entries_) {
changes.push_back(std::move(entry_change));
}
if (cleared_ == JournalContainsClearOperation::NO) {
// The journal doesn't contain the clear operation. The changes
// recorded on the journal need to be executed over the content of
// the first parent.
ObjectIdentifier root_identifier = parents[0]->GetRootIdentifier();
CreateCommitFromChanges(std::move(parents), std::move(root_identifier),
std::move(changes), std::move(callback));
return;
}
// The journal contains the clear operation. The changes recorded on the
// journal need to be executed over an empty page.
btree::TreeNode::Empty(
page_storage_,
[this, parents = std::move(parents), changes = std::move(changes),
callback = std::move(callback)](
Status status, ObjectIdentifier root_identifier) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
CreateCommitFromChanges(std::move(parents), std::move(root_identifier),
std::move(changes), std::move(callback));
});
}
void JournalImpl::Put(convert::ExtendedStringView key,
ObjectIdentifier object_identifier,
KeyPriority priority) {
FXL_DCHECK(!committed_);
EntryChange change;
change.entry = {key.ToString(), std::move(object_identifier), priority};
change.deleted = false;
journal_entries_[key.ToString()] = std::move(change);
}
void JournalImpl::Delete(convert::ExtendedStringView key) {
FXL_DCHECK(!committed_);
EntryChange change;
change.entry = {key.ToString(), ObjectIdentifier(), KeyPriority::EAGER};
change.deleted = true;
journal_entries_[key.ToString()] = std::move(change);
}
void JournalImpl::Clear() {
FXL_DCHECK(!committed_);
cleared_ = JournalContainsClearOperation::YES;
journal_entries_.clear();
}
void JournalImpl::CreateCommitFromChanges(
std::vector<std::unique_ptr<const storage::Commit>> parents,
ObjectIdentifier root_identifier, std::vector<EntryChange> changes,
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
btree::ApplyChanges(
environment_->coroutine_service(), page_storage_,
std::move(root_identifier), std::move(changes),
[this, parents = std::move(parents), callback = std::move(callback)](
Status status, ObjectIdentifier object_identifier,
std::set<ObjectIdentifier> new_nodes) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
// If the commit is a no-op, return early, without creating a new
// commit.
if (parents.size() == 1 &&
parents.front()->GetRootIdentifier() == object_identifier) {
// |new_nodes| can be ignored here. If a clear operation has been
// executed and the state has then been restored to the one before the
// transaction, |ApplyChanges| might have re-created some nodes that
// already exist. Because they already exist in a pre-existing commit,
// there is no need to update their state.
callback(Status::OK, nullptr);
return;
}
std::unique_ptr<const storage::Commit> commit =
CommitImpl::FromContentAndParents(environment_->clock(),
page_storage_, object_identifier,
std::move(parents));
GetObjectsToSync(
[this, new_nodes = std::move(new_nodes), commit = std::move(commit),
callback = std::move(callback)](
Status status,
std::vector<ObjectIdentifier> objects_to_sync) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
objects_to_sync.reserve(objects_to_sync.size() +
new_nodes.size());
// TODO(qsr): When using C++17, move data out of the set using
// extract.
objects_to_sync.insert(objects_to_sync.end(), new_nodes.begin(),
new_nodes.end());
page_storage_->AddCommitFromLocal(
commit->Clone(), std::move(objects_to_sync),
[commit = std::move(commit),
callback = std::move(callback)](Status status) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
callback(Status::OK, std::move(commit));
});
});
});
}
void JournalImpl::GetObjectsToSync(
fit::function<void(Status status,
std::vector<ObjectIdentifier> objects_to_sync)>
callback) {
auto waiter = fxl::MakeRefCounted<callback::Waiter<Status, bool>>(Status::OK);
std::vector<ObjectIdentifier> added_values;
for (auto const& journal_entry : journal_entries_) {
if (journal_entry.second.deleted) {
continue;
}
added_values.push_back(journal_entry.second.entry.object_identifier);
page_storage_->ObjectIsUntracked(added_values.back(),
waiter->NewCallback());
}
waiter->Finalize(
[added_values = std::move(added_values), callback = std::move(callback)](
Status status, std::vector<bool> is_untracked) {
if (status != Status::OK) {
callback(status, {});
return;
}
FXL_DCHECK(added_values.size() == is_untracked.size());
// Only untracked objects should be synced.
std::vector<ObjectIdentifier> objects_to_sync;
for (size_t i = 0; i < is_untracked.size(); ++i) {
if (is_untracked[i]) {
objects_to_sync.push_back(std::move(added_values[i]));
}
}
callback(Status::OK, std::move(objects_to_sync));
});
}
} // namespace storage