blob: 3fe3451f3436fefa3e9f016ac9ffdea50c8444fc [file] [log] [blame]
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/impl/journal_impl.h"
#include <functional>
#include <map>
#include <string>
#include <utility>
#include <lib/callback/waiter.h>
#include <lib/fit/function.h>
#include <lib/fxl/memory/ref_ptr.h>
#include "peridot/bin/ledger/storage/impl/btree/builder.h"
#include "peridot/bin/ledger/storage/impl/btree/tree_node.h"
#include "peridot/bin/ledger/storage/impl/commit_impl.h"
#include "peridot/bin/ledger/storage/public/commit.h"
namespace storage {
JournalImpl::JournalImpl(Token /* token */, JournalType type,
ledger::Environment* environment,
PageStorageImpl* page_storage, JournalId id,
CommitId base)
: type_(type),
environment_(environment),
page_storage_(page_storage),
id_(std::move(id)),
base_(std::move(base)),
valid_(true),
failed_operation_(false) {}
JournalImpl::~JournalImpl() {
// Log a warning if the journal was not committed or rolled back.
if (valid_) {
FXL_LOG(WARNING) << "Journal not committed or rolled back.";
}
}
std::unique_ptr<Journal> JournalImpl::Simple(JournalType type,
ledger::Environment* environment,
PageStorageImpl* page_storage,
const JournalId& id,
const CommitId& base) {
return std::make_unique<JournalImpl>(Token(), type, environment, page_storage,
id, base);
}
std::unique_ptr<Journal> JournalImpl::Merge(ledger::Environment* environment,
PageStorageImpl* page_storage,
const JournalId& id,
const CommitId& base,
const CommitId& other) {
auto journal = std::make_unique<JournalImpl>(
Token(), JournalType::EXPLICIT, environment, page_storage, id, base);
journal->other_ = std::make_unique<CommitId>(other);
return journal;
}
const JournalId& JournalImpl::GetId() const { return id_; }
void JournalImpl::Commit(
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
serializer_.Serialize<Status, std::unique_ptr<const storage::Commit>>(
std::move(callback),
[this](fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
if (!StateAllowsMutation()) {
callback(Status::ILLEGAL_STATE, nullptr);
return;
}
GetParents([this, callback = std::move(callback)](
Status status,
std::vector<std::unique_ptr<const storage::Commit>>
parents) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
page_storage_->GetJournalEntries(
id_, [this, parents = std::move(parents),
callback = std::move(callback)](
Status status,
std::unique_ptr<Iterator<const EntryChange>> changes,
JournalContainsClearOperation
contains_clear_operation) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
if (contains_clear_operation ==
JournalContainsClearOperation::NO) {
// The journal doesn't contain the clear operation. The
// changes recorded on the journal need to be executed
// over the content of the first parent.
ObjectIdentifier root_identifier =
parents[0]->GetRootIdentifier();
CreateCommitFromChanges(
std::move(parents), std::move(root_identifier),
std::move(changes), std::move(callback));
return;
}
// The journal contains the clear operation. The changes
// recorded on the journal need to be executed over an empty
// page.
btree::TreeNode::Empty(
page_storage_,
[this, parents = std::move(parents),
changes = std::move(changes),
callback = std::move(callback)](
Status status,
ObjectIdentifier root_identifier) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
CreateCommitFromChanges(
std::move(parents), std::move(root_identifier),
std::move(changes), std::move(callback));
});
});
});
});
}
void JournalImpl::Rollback(fit::function<void(Status)> callback) {
serializer_.Serialize<Status>(std::move(callback),
[this](fit::function<void(Status)> callback) {
RollbackInternal(std::move(callback));
});
}
void JournalImpl::Put(convert::ExtendedStringView key,
ObjectIdentifier object_identifier, KeyPriority priority,
fit::function<void(Status)> callback) {
serializer_.Serialize<Status>(
std::move(callback),
[this, key = key.ToString(),
object_identifier = std::move(object_identifier),
priority](fit::function<void(Status)> callback) mutable {
if (!StateAllowsMutation()) {
callback(Status::ILLEGAL_STATE);
return;
}
page_storage_->AddJournalEntry(
id_, key, std::move(object_identifier), priority,
[this, callback = std::move(callback)](Status s) {
if (s != Status::OK) {
failed_operation_ = true;
}
callback(s);
});
});
}
void JournalImpl::Delete(convert::ExtendedStringView key,
fit::function<void(Status)> callback) {
serializer_.Serialize<Status>(
std::move(callback),
[this, key = key.ToString()](fit::function<void(Status)> callback) {
if (!StateAllowsMutation()) {
callback(Status::ILLEGAL_STATE);
return;
}
page_storage_->RemoveJournalEntry(
id_, key, [this, callback = std::move(callback)](Status s) {
if (s != Status::OK) {
failed_operation_ = true;
}
callback(s);
});
});
}
void JournalImpl::Clear(fit::function<void(Status)> callback) {
serializer_.Serialize<Status>(
std::move(callback), [this](fit::function<void(Status)> callback) {
if (!StateAllowsMutation()) {
callback(Status::ILLEGAL_STATE);
return;
}
page_storage_->EmptyJournalAndMarkContainsClearOperation(
id_, [this, callback = std::move(callback)](Status s) {
if (s != Status::OK) {
failed_operation_ = true;
}
callback(s);
});
});
}
void JournalImpl::GetParents(
fit::function<void(Status,
std::vector<std::unique_ptr<const storage::Commit>>)>
callback) {
auto waiter = fxl::MakeRefCounted<
callback::Waiter<Status, std::unique_ptr<const storage::Commit>>>(
Status::OK);
page_storage_->GetCommit(base_, waiter->NewCallback());
if (other_) {
page_storage_->GetCommit(*other_, waiter->NewCallback());
}
waiter->Finalize(std::move(callback));
}
void JournalImpl::CreateCommitFromChanges(
std::vector<std::unique_ptr<const storage::Commit>> parents,
ObjectIdentifier root_identifier,
std::unique_ptr<Iterator<const EntryChange>> changes,
fit::function<void(Status, std::unique_ptr<const storage::Commit>)>
callback) {
btree::ApplyChanges(
environment_->coroutine_service(), page_storage_,
std::move(root_identifier), std::move(changes),
[this, parents = std::move(parents), callback = std::move(callback)](
Status status, ObjectIdentifier object_identifier,
std::set<ObjectIdentifier> new_nodes) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
// If the commit is a no-op, return early.
if (parents.size() == 1 &&
parents.front()->GetRootIdentifier() == object_identifier) {
// |new_nodes| can be ignored here. If a clear operation has been
// executed and the state has then been restored to the one before the
// transaction, |ApplyChanges| might have re-created some nodes that
// already exist. Because they already exist in a pre-existing commit,
// there is no need to update their state.
// We are in an operation from the serializer: make sure not to sent
// the rollback operation in the serializer as well, or a deadlock
// will be created.
RollbackInternal(
[parent = std::move(parents.front()),
callback = std::move(callback)](Status status) mutable {
callback(status, std::move(parent));
});
return;
}
std::unique_ptr<const storage::Commit> commit =
CommitImpl::FromContentAndParents(environment_->clock(),
page_storage_, object_identifier,
std::move(parents));
GetObjectsToSync([this, new_nodes = std::move(new_nodes),
commit = std::move(commit),
callback = std::move(callback)](
Status status, std::vector<ObjectIdentifier>
objects_to_sync) mutable {
if (status != Status::OK) {
callback(status, nullptr);
return;
}
objects_to_sync.reserve(objects_to_sync.size() + new_nodes.size());
// TODO(qsr): When using C++17, move data out of the set using
// extract.
objects_to_sync.insert(objects_to_sync.end(), new_nodes.begin(),
new_nodes.end());
page_storage_->AddCommitFromLocal(
commit->Clone(), std::move(objects_to_sync),
[this, commit = std::move(commit),
callback = std::move(callback)](Status status) mutable {
valid_ = false;
if (status != Status::OK) {
callback(status, nullptr);
return;
}
page_storage_->RemoveJournal(
id_,
[commit = std::move(commit),
callback = std::move(callback)](Status status) mutable {
if (status != Status::OK) {
FXL_LOG(INFO)
<< "Commit created, but failed to delete journal.";
}
callback(Status::OK, std::move(commit));
});
});
});
});
}
void JournalImpl::GetObjectsToSync(
fit::function<void(Status status,
std::vector<ObjectIdentifier> objects_to_sync)>
callback) {
page_storage_->GetJournalEntries(
id_, [this, callback = std::move(callback)](
Status s, std::unique_ptr<Iterator<const EntryChange>> entries,
JournalContainsClearOperation contains_clear_operation) mutable {
if (s != Status::OK) {
callback(s, {});
return;
}
// Compute the key-value pairs added in this journal.
std::map<std::string, ObjectIdentifier> key_values;
while (entries->Valid()) {
const Entry& entry = (*entries)->entry;
if ((*entries)->deleted) {
key_values.erase(entry.key);
} else {
key_values[entry.key] = entry.object_identifier;
}
entries->Next();
}
auto waiter =
fxl::MakeRefCounted<callback::Waiter<Status, bool>>(Status::OK);
for (const auto& key_value : key_values) {
page_storage_->ObjectIsUntracked(key_value.second,
waiter->NewCallback());
}
waiter->Finalize([key_values = std::move(key_values),
callback = std::move(callback)](
Status s, std::vector<bool> is_untracked) {
if (s != Status::OK) {
callback(s, {});
return;
}
// Compute the set of values.
std::set<ObjectIdentifier> result_set;
size_t i = 0;
for (const auto& key_value : key_values) {
// Only untracked objects should be synced.
if (is_untracked[i++]) {
result_set.insert(key_value.second);
}
}
std::vector<ObjectIdentifier> objects_to_sync;
std::copy(result_set.begin(), result_set.end(),
std::back_inserter(objects_to_sync));
callback(Status::OK, std::move(objects_to_sync));
});
});
}
void JournalImpl::RollbackInternal(fit::function<void(Status)> callback) {
if (!valid_) {
callback(Status::ILLEGAL_STATE);
return;
}
page_storage_->RemoveJournal(
id_, [this, callback = std::move(callback)](Status s) {
if (s == Status::OK) {
valid_ = false;
}
callback(s);
});
}
bool JournalImpl::StateAllowsMutation() {
return valid_ && (type_ == JournalType::IMPLICIT || !failed_operation_);
}
} // namespace storage