| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/ledger/bin/storage/fake/fake_page_storage.h" |
| |
| #include <lib/async/cpp/task.h> |
| #include <lib/async/default.h> |
| #include <lib/fit/function.h> |
| #include <lib/fsl/socket/strings.h> |
| #include <lib/fsl/vmo/strings.h> |
| #include <lib/zx/time.h> |
| |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "src/ledger/bin/encryption/primitives/hash.h" |
| #include "src/ledger/bin/storage/fake/fake_commit.h" |
| #include "src/ledger/bin/storage/fake/fake_journal.h" |
| #include "src/ledger/bin/storage/fake/fake_object.h" |
| #include "src/ledger/bin/storage/public/constants.h" |
| #include "src/ledger/bin/storage/public/types.h" |
| #include "src/ledger/bin/storage/testing/commit_empty_impl.h" |
| #include "src/lib/fxl/logging.h" |
| #include "src/lib/fxl/strings/concatenate.h" |
| |
| namespace storage { |
| namespace fake { |
| |
| namespace { |
| |
| Status ToBuffer(convert::ExtendedStringView value, int64_t offset, |
| int64_t max_size, fsl::SizedVmo* buffer) { |
| size_t start = value.size(); |
| // Valid indices are between -N and N-1. |
| if (offset >= -static_cast<int64_t>(value.size()) && |
| offset < static_cast<int64_t>(value.size())) { |
| start = offset < 0 ? value.size() + offset : offset; |
| } |
| size_t length = max_size < 0 ? value.size() : max_size; |
| bool result = fsl::VmoFromString(value.substr(start, length), buffer); |
| return result ? Status::OK : Status::INTERNAL_ERROR; |
| } |
| |
| class FakeRootCommit : public CommitEmptyImpl { |
| public: |
| FakeRootCommit() : id_(storage::kFirstPageCommitId.ToString()) {} |
| |
| std::unique_ptr<const Commit> Clone() const override { |
| return std::make_unique<const FakeRootCommit>(); |
| } |
| |
| const CommitId& GetId() const override { return id_; } |
| |
| std::vector<CommitIdView> GetParentIds() const override { |
| return std::vector<CommitIdView>(); |
| } |
| |
| zx::time_utc GetTimestamp() const override { return zx::time_utc(); } |
| |
| uint64_t GetGeneration() const override { return 0; } |
| |
| private: |
| const CommitId id_; |
| }; |
| |
| } // namespace |
| |
| FakePageStorage::FakePageStorage(ledger::Environment* environment, |
| PageId page_id) |
| : page_id_(std::move(page_id)), |
| environment_(environment), |
| encryption_service_(environment_->dispatcher()) {} |
| |
| FakePageStorage::~FakePageStorage() {} |
| |
| PageId FakePageStorage::GetId() { return page_id_; } |
| |
| Status FakePageStorage::GetHeadCommits( |
| std::vector<std::unique_ptr<const Commit>>* head_commits) { |
| std::vector<std::pair<CommitId, zx::time_utc>> heads(heads_.begin(), |
| heads_.end()); |
| std::sort(heads.begin(), heads.end(), [](const auto& p1, const auto& p2) { |
| return std::tie(p1.second, p1.first) < std::tie(p2.second, p2.first); |
| }); |
| std::vector<std::unique_ptr<const Commit>> commits; |
| commits.reserve(heads.size()); |
| for (const auto& [commit_id, _] : heads) { |
| commits.push_back(std::make_unique<FakeCommit>(journals_[commit_id].get())); |
| } |
| if (commits.empty()) { |
| commits.push_back(std::make_unique<const FakeRootCommit>()); |
| } |
| head_commits->swap(commits); |
| return Status::OK; |
| } |
| |
| void FakePageStorage::GetMergeCommitIds( |
| CommitIdView parent1_id, CommitIdView parent2_id, |
| fit::function<void(Status, std::vector<CommitId>)> callback) { |
| auto [parent_min_id, parent_max_id] = std::minmax(parent1_id, parent2_id); |
| auto it = merges_.find( |
| std::make_pair(parent_min_id.ToString(), parent_max_id.ToString())); |
| auto parents = it != merges_.end() ? it->second : std::vector<CommitId>{}; |
| callback(Status::OK, std::move(parents)); |
| } |
| |
| void FakePageStorage::GetCommit( |
| CommitIdView commit_id, |
| fit::function<void(Status, std::unique_ptr<const Commit>)> callback) { |
| if (commit_id == storage::kFirstPageCommitId) { |
| callback(Status::OK, std::make_unique<const FakeRootCommit>()); |
| return; |
| } |
| auto it = journals_.find(commit_id.ToString()); |
| if (it == journals_.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, nullptr); |
| return; |
| } |
| |
| async::PostTask( |
| environment_->dispatcher(), |
| [this, commit_id = commit_id.ToString(), callback = std::move(callback)] { |
| callback(Status::OK, |
| std::make_unique<FakeCommit>(journals_[commit_id].get())); |
| }); |
| } |
| |
| std::unique_ptr<Journal> FakePageStorage::StartCommit( |
| std::unique_ptr<const Commit> commit) { |
| CommitId commit_id = commit->GetId(); |
| uint64_t next_generation = 0; |
| FakeJournalDelegate::Data data; |
| if (journals_.find(commit_id) != journals_.end()) { |
| next_generation = journals_[commit_id].get()->GetGeneration() + 1; |
| data = journals_[commit_id].get()->GetData(); |
| } |
| auto delegate = std::make_unique<FakeJournalDelegate>( |
| environment_->random(), std::move(data), commit_id, autocommit_, |
| next_generation); |
| auto journal = std::make_unique<FakeJournal>(delegate.get()); |
| journals_[delegate->GetId()] = std::move(delegate); |
| return journal; |
| } |
| |
| std::unique_ptr<Journal> FakePageStorage::StartMergeCommit( |
| std::unique_ptr<const Commit> left, std::unique_ptr<const Commit> right) { |
| const CommitId& left_id = left->GetId(); |
| const CommitId& right_id = right->GetId(); |
| |
| auto delegate = std::make_unique<FakeJournalDelegate>( |
| environment_->random(), journals_[left_id].get()->GetData(), left_id, |
| right_id, autocommit_, |
| 1 + std::max(journals_[left_id].get()->GetGeneration(), |
| journals_[right_id].get()->GetGeneration())); |
| auto journal = std::make_unique<FakeJournal>(delegate.get()); |
| journals_[delegate->GetId()] = std::move(delegate); |
| return journal; |
| } |
| |
| void FakePageStorage::CommitJournal( |
| std::unique_ptr<Journal> journal, |
| fit::function<void(Status, std::unique_ptr<const storage::Commit>)> |
| callback) { |
| static_cast<FakeJournal*>(journal.get()) |
| ->Commit([this, callback = std::move(callback)]( |
| Status status, |
| std::unique_ptr<const storage::Commit> commit) { |
| std::vector<storage::CommitIdView> parent_ids = commit->GetParentIds(); |
| if (parent_ids.size() == 2) { |
| merges_[std::minmax(parent_ids[0].ToString(), |
| parent_ids[1].ToString())] |
| .push_back(commit->GetId()); |
| } |
| for (const storage::CommitIdView& parent_id : parent_ids) { |
| auto it = heads_.find(parent_id.ToString()); |
| if (it != heads_.end()) { |
| heads_.erase(it); |
| } |
| } |
| heads_.emplace(commit->GetId(), commit->GetTimestamp()); |
| if (!drop_commit_notifications_) { |
| for (CommitWatcher* watcher : watchers_) { |
| async::PostTask( |
| environment_->dispatcher(), |
| [this, watcher, commit = commit->Clone()]() mutable { |
| // Check that watcher was not unregistered. |
| if (watchers_.find(watcher) == watchers_.end()) { |
| return; |
| } |
| std::vector<std::unique_ptr<const Commit>> commits; |
| commits.push_back(std::move(commit)); |
| watcher->OnNewCommits(commits, ChangeSource::LOCAL); |
| }); |
| } |
| } |
| callback(status, std::move(commit)); |
| }); |
| } |
| |
| void FakePageStorage::AddCommitWatcher(CommitWatcher* watcher) { |
| watchers_.emplace(watcher); |
| } |
| |
| void FakePageStorage::RemoveCommitWatcher(CommitWatcher* watcher) { |
| auto it = watchers_.find(watcher); |
| if (it != watchers_.end()) { |
| watchers_.erase(it); |
| } |
| } |
| |
| void FakePageStorage::IsSynced(fit::function<void(Status, bool)> callback) { |
| callback(Status::OK, is_synced_); |
| } |
| |
| void FakePageStorage::AddObjectFromLocal( |
| ObjectType /*object_type*/, std::unique_ptr<DataSource> data_source, |
| ObjectReferencesAndPriority tree_references, |
| fit::function<void(Status, ObjectIdentifier)> callback) { |
| auto value = std::make_unique<std::string>(); |
| auto data_source_ptr = data_source.get(); |
| data_source_ptr->Get([this, data_source = std::move(data_source), |
| value = std::move(value), |
| tree_references = std::move(tree_references), |
| callback = std::move(callback)]( |
| std::unique_ptr<DataSource::DataChunk> chunk, |
| DataSource::Status status) mutable { |
| if (status == DataSource::Status::ERROR) { |
| callback(Status::IO_ERROR, {}); |
| return; |
| } |
| auto view = chunk->Get(); |
| value->append(view.data(), view.size()); |
| if (status == DataSource::Status::DONE) { |
| ObjectIdentifier object_identifier = |
| encryption_service_.MakeObjectIdentifier(FakeDigest(*value)); |
| objects_[object_identifier] = std::move(*value); |
| references_[object_identifier.object_digest()] = |
| std::move(tree_references); |
| callback(Status::OK, std::move(object_identifier)); |
| } |
| }); |
| } |
| |
| void FakePageStorage::GetObject( |
| ObjectIdentifier object_identifier, Location /*location*/, |
| fit::function<void(Status, std::unique_ptr<const Object>)> callback) { |
| GetPiece(object_identifier, |
| [callback = std::move(callback)]( |
| Status status, std::unique_ptr<const Piece> piece) { |
| return callback( |
| status, piece == nullptr |
| ? nullptr |
| : std::make_unique<FakeObject>(std::move(piece))); |
| }); |
| } |
| |
| void FakePageStorage::GetObjectPart( |
| ObjectIdentifier object_identifier, int64_t offset, int64_t max_size, |
| Location location, fit::function<void(Status, fsl::SizedVmo)> callback) { |
| GetPiece(object_identifier, |
| [offset, max_size, callback = std::move(callback)]( |
| Status status, std::unique_ptr<const Piece> piece) { |
| if (status != Status::OK) { |
| callback(status, nullptr); |
| return; |
| } |
| fxl::StringView data = piece->GetData(); |
| fsl::SizedVmo buffer; |
| Status buffer_status = ToBuffer(data, offset, max_size, &buffer); |
| if (buffer_status != Status::OK) { |
| callback(buffer_status, nullptr); |
| return; |
| } |
| callback(Status::OK, std::move(buffer)); |
| }); |
| } |
| |
| void FakePageStorage::GetPiece( |
| ObjectIdentifier object_identifier, |
| fit::function<void(Status, std::unique_ptr<const Piece>)> callback) { |
| object_requests_.emplace_back( |
| [this, object_identifier = std::move(object_identifier), |
| callback = std::move(callback)] { |
| auto it = objects_.find(object_identifier); |
| if (it == objects_.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, nullptr); |
| return; |
| } |
| |
| callback(Status::OK, |
| std::make_unique<FakePiece>(object_identifier, it->second)); |
| }); |
| async::PostDelayedTask( |
| environment_->dispatcher(), [this] { SendNextObject(); }, |
| kFakePageStorageDelay); |
| } |
| |
| void FakePageStorage::GetCommitContents(const Commit& commit, |
| std::string min_key, |
| fit::function<bool(Entry)> on_next, |
| fit::function<void(Status)> on_done) { |
| FakeJournalDelegate* journal = journals_[commit.GetId()].get(); |
| if (!journal) { |
| on_done(Status::INTERNAL_NOT_FOUND); |
| return; |
| } |
| |
| for (auto it = journal->GetData().lower_bound(min_key); |
| it != journal->GetData().end(); ++it) { |
| if (!on_next(it->second)) { |
| break; |
| } |
| } |
| on_done(Status::OK); |
| } |
| |
| void FakePageStorage::GetEntryFromCommit( |
| const Commit& commit, std::string key, |
| fit::function<void(Status, Entry)> callback) { |
| FakeJournalDelegate* journal = journals_[commit.GetId()].get(); |
| if (!journal) { |
| callback(Status::INTERNAL_NOT_FOUND, Entry()); |
| return; |
| } |
| const fake::FakeJournalDelegate::Data& data = journal->GetData(); |
| auto it = data.find(key); |
| if (it == data.end()) { |
| callback(Status::INTERNAL_NOT_FOUND, Entry()); |
| return; |
| } |
| callback(Status::OK, it->second); |
| } |
| |
| const std::map<std::string, std::unique_ptr<FakeJournalDelegate>>& |
| FakePageStorage::GetJournals() const { |
| return journals_; |
| } |
| |
| const std::map<ObjectIdentifier, std::string>& FakePageStorage::GetObjects() |
| const { |
| return objects_; |
| } |
| |
| const std::map<ObjectDigest, ObjectReferencesAndPriority>& |
| FakePageStorage::GetReferences() const { |
| return references_; |
| } |
| |
| ObjectDigest FakePageStorage::FakeDigest(fxl::StringView value) const { |
| // Builds a fake ObjectDigest by computing the hash of |value|, and prefixes |
| // it with 0xFACEFEED to intentionally make it longer than real object |
| // digests, start with a 1 bit, and easy to spot in logs. This is incompatible |
| // with real object digests, but is enough for a fake because all clients of |
| // the fake should treat object digests as opaque blobs. |
| return ObjectDigest(fxl::Concatenate( |
| {"\xFA\xCE\xFE\xED", encryption::SHA256WithLengthHash(value)})); |
| } |
| |
| void FakePageStorage::SendNextObject() { |
| auto rng = environment_->random()->NewBitGenerator<uint64_t>(); |
| std::uniform_int_distribution<size_t> distribution( |
| 0u, object_requests_.size() - 1); |
| auto it = object_requests_.begin() + distribution(rng); |
| auto closure = std::move(*it); |
| object_requests_.erase(it); |
| closure(); |
| } |
| |
| void FakePageStorage::DeleteObjectFromLocal( |
| const ObjectIdentifier& object_identifier) { |
| objects_.erase(object_identifier); |
| } |
| |
| void FakePageStorage::SetDropCommitNotifications(bool drop) { |
| drop_commit_notifications_ = drop; |
| } |
| |
| } // namespace fake |
| } // namespace storage |