| // Copyright 2017 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "peridot/bin/ledger/testing/cloud_provider/fake_page_cloud.h" |
| |
| #include <functional> |
| |
| #include <lib/fidl/cpp/clone.h> |
| #include <lib/fidl/cpp/optional.h> |
| #include <lib/fit/function.h> |
| #include <lib/fsl/socket/strings.h> |
| #include <lib/fsl/vmo/strings.h> |
| |
| #include "peridot/lib/convert/convert.h" |
| #include "third_party/murmurhash/murmurhash.h" |
| |
| namespace ledger { |
| |
| namespace { |
| |
| // Number of errors to inject before allowing a request to succeed when |
| // configured to inject network errors. |
| constexpr size_t kInitialRemainingErrorsToInject = 2; |
| |
| // Seed for the murmur hash algorithm to ensure different request signatures. |
| constexpr uint32_t kAddCommitsSeed = 1u; |
| constexpr uint32_t kGetCommitsSeed = 2u; |
| constexpr uint32_t kAddObjectSeed = 3u; |
| constexpr uint32_t kGetObjectSeed = 4u; |
| |
| cloud_provider::Token PositionToToken(size_t position) { |
| std::string bytes( |
| std::string(reinterpret_cast<char*>(&position), sizeof(position))); |
| cloud_provider::Token result; |
| result.opaque_id = convert::ToArray(bytes); |
| return result; |
| } |
| |
| bool TokenToPosition(const std::unique_ptr<cloud_provider::Token>& token, |
| size_t* result) { |
| if (!token) { |
| *result = 0u; |
| return true; |
| } |
| |
| if (token->opaque_id->size() != sizeof(*result)) { |
| return false; |
| } |
| |
| memcpy(result, token->opaque_id->data(), sizeof(*result)); |
| return true; |
| } |
| |
| uint64_t GetVectorSignature(const fidl::VectorPtr<uint8_t>& vector, |
| uint32_t seed) { |
| return murmurhash(reinterpret_cast<const char*>(vector.get().data()), |
| vector.get().size(), seed); |
| } |
| |
| uint64_t GetCommitsSignature( |
| const std::vector<cloud_provider::CommitPackEntry>& commits) { |
| uint64_t result = 0; |
| for (const auto& commit : commits) { |
| result = result ^ |
| GetVectorSignature(convert::ToArray(commit.id), kAddCommitsSeed); |
| } |
| return result; |
| } |
| |
| } // namespace |
| |
| class FakePageCloud::WatcherContainer { |
| public: |
| WatcherContainer(cloud_provider::PageCloudWatcherPtr watcher, |
| size_t next_commit_index); |
| |
| void SendCommits(std::vector<cloud_provider::CommitPackEntry> commits, |
| size_t next_commit_index, fit::closure on_ack); |
| |
| size_t NextCommitIndex() { return next_commit_index_; } |
| |
| bool WaitingForWatcherAck() { return waiting_for_watcher_ack_; } |
| |
| void set_on_empty(fit::closure on_empty) { |
| watcher_.set_error_handler( |
| [on_empty = std::move(on_empty)](zx_status_t status) { on_empty(); }); |
| } |
| |
| private: |
| cloud_provider::PageCloudWatcherPtr watcher_; |
| // Whether we're still waiting for the watcher to ack the previous commit |
| // notification. |
| bool waiting_for_watcher_ack_ = false; |
| |
| // Index of the first commit to be sent to the watcher. |
| size_t next_commit_index_ = 0; |
| |
| FXL_DISALLOW_COPY_AND_ASSIGN(WatcherContainer); |
| }; |
| |
| FakePageCloud::WatcherContainer::WatcherContainer( |
| cloud_provider::PageCloudWatcherPtr watcher, size_t next_commit_index) |
| : watcher_(std::move(watcher)), next_commit_index_(next_commit_index) {} |
| |
| void FakePageCloud::WatcherContainer::SendCommits( |
| std::vector<cloud_provider::CommitPackEntry> commits, |
| size_t next_commit_index, fit::closure on_ack) { |
| FXL_DCHECK(watcher_.is_bound()); |
| FXL_DCHECK(!waiting_for_watcher_ack_); |
| FXL_DCHECK(!commits.empty()); |
| |
| waiting_for_watcher_ack_ = true; |
| next_commit_index_ = next_commit_index; |
| cloud_provider::CommitPack commit_pack; |
| if (!cloud_provider::EncodeCommitPack(commits, &commit_pack)) { |
| watcher_->OnError(cloud_provider::Status::INTERNAL_ERROR); |
| return; |
| } |
| watcher_->OnNewCommits(std::move(commit_pack), |
| PositionToToken(next_commit_index), |
| [this, on_ack = std::move(on_ack)] { |
| waiting_for_watcher_ack_ = false; |
| on_ack(); |
| }); |
| } |
| |
| FakePageCloud::FakePageCloud(InjectNetworkError inject_network_error) |
| : inject_network_error_(inject_network_error) { |
| bindings_.set_empty_set_handler([this] { |
| if (on_empty_) { |
| on_empty_(); |
| } |
| }); |
| } |
| |
| FakePageCloud::~FakePageCloud() {} |
| |
| void FakePageCloud::Bind( |
| fidl::InterfaceRequest<cloud_provider::PageCloud> request) { |
| bindings_.AddBinding(this, std::move(request)); |
| } |
| |
| void FakePageCloud::SendPendingCommits() { |
| for (auto& container : containers_) { |
| if (container.WaitingForWatcherAck() || |
| container.NextCommitIndex() >= commits_.size()) { |
| continue; |
| } |
| |
| std::vector<cloud_provider::CommitPackEntry> commits; |
| for (size_t i = container.NextCommitIndex(); i < commits_.size(); i++) { |
| commits.push_back(commits_[i]); |
| } |
| |
| container.SendCommits(std::move(commits), commits_.size(), |
| [this] { SendPendingCommits(); }); |
| } |
| } |
| |
| bool FakePageCloud::MustReturnError(uint64_t request_signature) { |
| switch (inject_network_error_) { |
| case InjectNetworkError::NO: |
| return false; |
| case InjectNetworkError::YES: |
| auto it = remaining_errors_to_inject_.find(request_signature); |
| if (it == remaining_errors_to_inject_.end()) { |
| remaining_errors_to_inject_[request_signature] = |
| kInitialRemainingErrorsToInject; |
| it = remaining_errors_to_inject_.find(request_signature); |
| } |
| if (it->second) { |
| it->second--; |
| return true; |
| } |
| remaining_errors_to_inject_.erase(it); |
| return false; |
| } |
| } |
| |
| void FakePageCloud::AddCommits(cloud_provider::CommitPack commits, |
| AddCommitsCallback callback) { |
| std::vector<cloud_provider::CommitPackEntry> commit_entries; |
| if (!cloud_provider::DecodeCommitPack(commits, &commit_entries)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR); |
| return; |
| } |
| |
| if (MustReturnError(GetCommitsSignature(commit_entries))) { |
| callback(cloud_provider::Status::NETWORK_ERROR); |
| return; |
| } |
| std::move(commit_entries.begin(), commit_entries.end(), |
| std::back_inserter(commits_)); |
| SendPendingCommits(); |
| callback(cloud_provider::Status::OK); |
| } |
| |
| void FakePageCloud::GetCommits( |
| std::unique_ptr<cloud_provider::Token> min_position_token, |
| GetCommitsCallback callback) { |
| if (MustReturnError(GetVectorSignature( |
| min_position_token ? min_position_token->opaque_id.Clone() : nullptr, |
| kGetCommitsSeed))) { |
| callback(cloud_provider::Status::NETWORK_ERROR, nullptr, nullptr); |
| return; |
| } |
| std::vector<cloud_provider::CommitPackEntry> result; |
| size_t start = 0u; |
| if (!TokenToPosition(min_position_token, &start)) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR, nullptr, nullptr); |
| return; |
| } |
| |
| for (size_t i = start; i < commits_.size(); i++) { |
| result.push_back(commits_[i]); |
| } |
| std::unique_ptr<cloud_provider::Token> token; |
| if (!result.empty()) { |
| // This will cause the last commit to be delivered again when the token is |
| // used for the next GetCommits() call. This is allowed by the FIDL contract |
| // and should be handled correctly by the client. |
| token = fidl::MakeOptional(PositionToToken(commits_.size() - 1)); |
| } |
| cloud_provider::CommitPack commit_pack; |
| if (!cloud_provider::EncodeCommitPack(result, &commit_pack)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR, nullptr, nullptr); |
| return; |
| } |
| callback(cloud_provider::Status::OK, |
| fidl::MakeOptional(std::move(commit_pack)), std::move(token)); |
| } |
| |
| void FakePageCloud::AddObject(fidl::VectorPtr<uint8_t> id, |
| fuchsia::mem::Buffer data, |
| AddObjectCallback callback) { |
| if (MustReturnError(GetVectorSignature(id, kAddObjectSeed))) { |
| callback(cloud_provider::Status::NETWORK_ERROR); |
| return; |
| } |
| std::string bytes; |
| if (!fsl::StringFromVmo(data, &bytes)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR); |
| return; |
| } |
| |
| objects_[convert::ToString(id)] = bytes; |
| callback(cloud_provider::Status::OK); |
| } |
| |
| void FakePageCloud::GetObject(fidl::VectorPtr<uint8_t> id, |
| GetObjectCallback callback) { |
| if (MustReturnError(GetVectorSignature(id, kGetObjectSeed))) { |
| callback(cloud_provider::Status::NETWORK_ERROR, nullptr); |
| return; |
| } |
| std::string id_str = convert::ToString(id); |
| if (!objects_.count(id_str)) { |
| callback(cloud_provider::Status::NOT_FOUND, nullptr); |
| return; |
| } |
| ::fuchsia::mem::Buffer buffer; |
| if (!fsl::VmoFromString(objects_[id_str], &buffer)) { |
| callback(cloud_provider::Status::INTERNAL_ERROR, nullptr); |
| return; |
| } |
| callback(cloud_provider::Status::OK, fidl::MakeOptional(std::move(buffer))); |
| } |
| |
| void FakePageCloud::SetWatcher( |
| std::unique_ptr<cloud_provider::Token> min_position_token, |
| fidl::InterfaceHandle<cloud_provider::PageCloudWatcher> watcher, |
| SetWatcherCallback callback) { |
| // TODO(qsr): Inject errors here when LE-438 is fixed. |
| auto watcher_ptr = watcher.Bind(); |
| |
| size_t first_pending_commit_index; |
| if (!TokenToPosition(min_position_token, &first_pending_commit_index)) { |
| callback(cloud_provider::Status::ARGUMENT_ERROR); |
| return; |
| } |
| containers_.emplace(std::move(watcher_ptr), first_pending_commit_index); |
| SendPendingCommits(); |
| callback(cloud_provider::Status::OK); |
| } |
| |
| } // namespace ledger |