| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/storage/impl/journal_impl.h" |
| |
| #include <lib/fit/function.h> |
| |
| #include <functional> |
| #include <map> |
| #include <string> |
| #include <utility> |
| |
| #include "src/ledger/bin/storage/impl/btree/builder.h" |
| #include "src/ledger/bin/storage/impl/btree/encoding.h" |
| #include "src/ledger/bin/storage/impl/btree/tree_node.h" |
| #include "src/ledger/bin/storage/impl/commit_factory.h" |
| #include "src/ledger/bin/storage/impl/data_serialization.h" |
| #include "src/ledger/bin/storage/impl/object_identifier_encoding.h" |
| #include "src/ledger/bin/storage/public/commit.h" |
| #include "src/ledger/lib/callback/waiter.h" |
| #include "src/ledger/lib/convert/convert.h" |
| #include "src/ledger/lib/logging/logging.h" |
| #include "src/ledger/lib/memory/ref_ptr.h" |
| |
| namespace storage { |
| |
| JournalImpl::JournalImpl(Token /* token */, ledger::Environment* environment, |
| PageStorageImpl* page_storage, std::unique_ptr<const storage::Commit> base) |
| : environment_(environment), |
| page_storage_(page_storage), |
| base_(std::move(base)), |
| committed_(false) {} |
| |
| JournalImpl::~JournalImpl() = default; |
| |
| std::unique_ptr<Journal> JournalImpl::Simple(ledger::Environment* environment, |
| PageStorageImpl* page_storage, |
| std::unique_ptr<const storage::Commit> base) { |
| LEDGER_DCHECK(base); |
| |
| return std::make_unique<JournalImpl>(Token(), environment, page_storage, std::move(base)); |
| } |
| |
| std::unique_ptr<Journal> JournalImpl::Merge(ledger::Environment* environment, |
| PageStorageImpl* page_storage, |
| std::unique_ptr<const storage::Commit> base, |
| std::unique_ptr<const storage::Commit> other) { |
| LEDGER_DCHECK(base); |
| LEDGER_DCHECK(other); |
| auto journal = std::make_unique<JournalImpl>(Token(), environment, page_storage, std::move(base)); |
| journal->other_ = std::move(other); |
| return journal; |
| } |
| |
| Status JournalImpl::Commit(coroutine::CoroutineHandler* handler, |
| std::unique_ptr<const storage::Commit>* commit, |
| std::vector<ObjectIdentifier>* objects_to_sync) { |
| LEDGER_DCHECK(!committed_); |
| committed_ = true; |
| objects_to_sync->clear(); |
| |
| std::vector<storage::EntryChange> changes; |
| for (auto [key, entry_change] : journal_entries_) { |
| changes.push_back(std::move(entry_change)); |
| } |
| SetEntryIds(&changes); |
| |
| // |base_| and |other_| must be cloned, because they keep the potential bases for diffs alive, and |
| // |parents| will be discarded before this JournalImpl is destructed. |
| std::vector<std::unique_ptr<const storage::Commit>> parents; |
| if (other_) { |
| parents.reserve(2); |
| parents.push_back(base_->Clone()); |
| parents.push_back(other_->Clone()); |
| } else { |
| parents.reserve(1); |
| parents.push_back(base_->Clone()); |
| } |
| |
| if (cleared_ == JournalContainsClearOperation::NO) { |
| // The journal doesn't contain the clear operation. The changes |
| // recorded on the journal need to be executed over the content of |
| // the first parent. |
| ObjectIdentifier root_identifier = parents[0]->GetRootIdentifier(); |
| std::string parent_id = parents[0]->GetId(); |
| return CreateCommitFromChanges( |
| handler, std::move(parents), |
| {std::move(root_identifier), PageStorage::Location::TreeNodeFromNetwork(parent_id)}, |
| std::move(changes), commit, objects_to_sync); |
| } |
| |
| // The journal contains the clear operation. The changes recorded on the |
| // journal need to be executed over an empty page. |
| Status status; |
| ObjectIdentifier root_identifier; |
| if (coroutine::SyncCall( |
| handler, |
| [this](fit::function<void(Status status, ObjectIdentifier root_identifier)> callback) { |
| btree::TreeNode::Empty(page_storage_, std::move(callback)); |
| }, |
| &status, &root_identifier) == coroutine::ContinuationStatus::INTERRUPTED) { |
| return Status::INTERRUPTED; |
| } |
| RETURN_ON_ERROR(status); |
| return CreateCommitFromChanges(handler, std::move(parents), |
| {std::move(root_identifier), PageStorage::Location::Local()}, |
| std::move(changes), commit, objects_to_sync); |
| } |
| |
| void JournalImpl::Put(convert::ExtendedStringView key, ObjectIdentifier object_identifier, |
| KeyPriority priority) { |
| LEDGER_DCHECK(!committed_); |
| EntryChange change; |
| change.entry = {convert::ToString(key), std::move(object_identifier), priority, EntryId()}; |
| change.deleted = false; |
| journal_entries_[convert::ToString(key)] = std::move(change); |
| } |
| |
| void JournalImpl::Delete(convert::ExtendedStringView key) { |
| LEDGER_DCHECK(!committed_); |
| EntryChange change; |
| change.entry = {convert::ToString(key), ObjectIdentifier(), KeyPriority::EAGER, EntryId()}; |
| change.deleted = true; |
| journal_entries_[convert::ToString(key)] = std::move(change); |
| } |
| |
| void JournalImpl::Clear() { |
| LEDGER_DCHECK(!committed_); |
| cleared_ = JournalContainsClearOperation::YES; |
| journal_entries_.clear(); |
| } |
| |
| Status JournalImpl::CreateCommitFromChanges( |
| coroutine::CoroutineHandler* handler, |
| std::vector<std::unique_ptr<const storage::Commit>> parents, |
| btree::LocatedObjectIdentifier root_identifier, std::vector<EntryChange> changes, |
| std::unique_ptr<const storage::Commit>* commit, |
| std::vector<ObjectIdentifier>* objects_to_sync) { |
| ObjectIdentifier object_identifier; |
| std::set<ObjectIdentifier> new_nodes; |
| RETURN_ON_ERROR(btree::ApplyChanges(handler, page_storage_, std::move(root_identifier), |
| std::move(changes), &object_identifier, &new_nodes)); |
| // If the commit is a no-op, return early, without creating a new |
| // commit. |
| if (parents.size() == 1 && parents.front()->GetRootIdentifier() == object_identifier) { |
| // |new_nodes| can be ignored here. If a clear operation has been |
| // executed and the state has then been restored to the one before the |
| // transaction, |ApplyChanges| might have re-created some nodes that |
| // already exist. Because they already exist in a pre-existing commit, |
| // there is no need to update their state. |
| *commit = nullptr; |
| return Status::OK; |
| } |
| |
| std::unique_ptr<const storage::Commit> new_commit = |
| page_storage_->GetCommitFactory()->FromContentAndParents( |
| environment_->clock(), environment_->random(), object_identifier, std::move(parents)); |
| |
| Status status; |
| if (coroutine::SyncCall( |
| handler, |
| [this](fit::function<void(Status, std::vector<ObjectIdentifier>)> callback) { |
| GetObjectsToSync(std::move(callback)); |
| }, |
| &status, objects_to_sync) == coroutine::ContinuationStatus::INTERRUPTED) { |
| return Status::INTERRUPTED; |
| } |
| RETURN_ON_ERROR(status); |
| |
| // TODO(12356): remove compatibility flag. |
| if (environment_->diff_compatibility_policy() == |
| DiffCompatibilityPolicy::USE_DIFFS_AND_TREE_NODES) { |
| objects_to_sync->reserve(objects_to_sync->size() + new_nodes.size()); |
| // TODO(qsr): When using C++17, move data out of the set using |
| // extract. |
| objects_to_sync->insert(objects_to_sync->end(), new_nodes.begin(), new_nodes.end()); |
| } |
| |
| *commit = std::move(new_commit); |
| return Status::OK; |
| } |
| |
| void JournalImpl::GetObjectsToSync( |
| fit::function<void(Status status, std::vector<ObjectIdentifier> objects_to_sync)> callback) { |
| auto waiter = ledger::MakeRefCounted<ledger::Waiter<Status, bool>>(Status::OK); |
| std::vector<ObjectIdentifier> added_values; |
| for (auto const& journal_entry : journal_entries_) { |
| if (journal_entry.second.deleted) { |
| continue; |
| } |
| added_values.push_back(journal_entry.second.entry.object_identifier); |
| page_storage_->ObjectIsUntracked(added_values.back(), waiter->NewCallback()); |
| } |
| waiter->Finalize([added_values = std::move(added_values), callback = std::move(callback)]( |
| Status status, std::vector<bool> is_untracked) { |
| if (status != Status::OK) { |
| callback(status, {}); |
| return; |
| } |
| LEDGER_DCHECK(added_values.size() == is_untracked.size()); |
| |
| // Only untracked objects should be synced. |
| std::vector<ObjectIdentifier> objects_to_sync; |
| for (size_t i = 0; i < is_untracked.size(); ++i) { |
| if (is_untracked[i]) { |
| objects_to_sync.push_back(std::move(added_values[i])); |
| } |
| } |
| callback(Status::OK, std::move(objects_to_sync)); |
| }); |
| } |
| |
| void JournalImpl::SetEntryIds(std::vector<EntryChange>* changes) { |
| if (other_) { |
| SetEntryIdsMergeCommit(changes); |
| } else { |
| SetEntryIdsSimpleCommit(changes); |
| } |
| } |
| |
| void JournalImpl::SetEntryIdsSimpleCommit(std::vector<EntryChange>* changes) { |
| LEDGER_DCHECK(!other_); |
| |
| for (EntryChange& change : *changes) { |
| if (!change.deleted) { |
| change.entry.entry_id = page_storage_->GetEntryId(); |
| } |
| } |
| } |
| |
| void JournalImpl::SetEntryIdsMergeCommit(std::vector<EntryChange>* changes) { |
| LEDGER_DCHECK(other_); |
| |
| // Serialize the list of changes. |
| std::string operation_list; |
| if (cleared_ == JournalContainsClearOperation::YES) { |
| operation_list = "cleared"; |
| } |
| for (const EntryChange& change : *changes) { |
| const Entry& entry = change.entry; |
| std::string entry_content; |
| if (!change.deleted) { |
| entry_content = SafeConcatenation({entry.priority == KeyPriority::EAGER ? "E" : "L", |
| EncodeObjectIdentifier(entry.object_identifier)}); |
| } |
| operation_list.append( |
| SafeConcatenation({entry.key, change.deleted ? "D" : "U", entry_content})); |
| } |
| |
| for (EntryChange& change : *changes) { |
| if (!change.deleted) { |
| change.entry.entry_id = page_storage_->GetEntryIdForMerge(change.entry.key, base_->GetId(), |
| other_->GetId(), operation_list); |
| } |
| } |
| } |
| |
| } // namespace storage |