blob: 589a9f242b25431dc935d246cc0c8dbb530da9ea [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/testing/cloud_provider/fake_page_cloud.h"
#include <functional>
#include <lib/fidl/cpp/clone.h>
#include <lib/fidl/cpp/optional.h>
#include <lib/fit/function.h>
#include <lib/fsl/socket/strings.h>
#include <lib/fsl/vmo/strings.h>
#include "peridot/lib/convert/convert.h"
#include "third_party/murmurhash/murmurhash.h"
namespace ledger {
namespace {
// Number of errors to inject before allowing a request to succeed when
// configured to inject network errors.
constexpr size_t kInitialRemainingErrorsToInject = 2;
// Seed for the murmur hash algorithm to ensure different request signatures.
constexpr uint32_t kAddCommitsSeed = 1u;
constexpr uint32_t kGetCommitsSeed = 2u;
constexpr uint32_t kAddObjectSeed = 3u;
constexpr uint32_t kGetObjectSeed = 4u;
cloud_provider::Token PositionToToken(size_t position) {
std::string bytes(
std::string(reinterpret_cast<char*>(&position), sizeof(position)));
cloud_provider::Token result;
result.opaque_id = convert::ToArray(bytes);
return result;
}
bool TokenToPosition(const std::unique_ptr<cloud_provider::Token>& token,
size_t* result) {
if (!token) {
*result = 0u;
return true;
}
if (token->opaque_id.size() != sizeof(*result)) {
return false;
}
memcpy(result, token->opaque_id.data(), sizeof(*result));
return true;
}
uint64_t GetVectorSignature(const std::vector<uint8_t>& vector,
uint32_t seed) {
return murmurhash(reinterpret_cast<const char*>(vector.data()),
vector.size(), seed);
}
uint64_t GetCommitsSignature(
const std::vector<cloud_provider::CommitPackEntry>& commits) {
uint64_t result = 0;
for (const auto& commit : commits) {
result = result ^
GetVectorSignature(convert::ToArray(commit.id), kAddCommitsSeed);
}
return result;
}
} // namespace
class FakePageCloud::WatcherContainer {
public:
WatcherContainer(cloud_provider::PageCloudWatcherPtr watcher,
size_t next_commit_index);
void SendCommits(std::vector<cloud_provider::CommitPackEntry> commits,
size_t next_commit_index, fit::closure on_ack);
size_t NextCommitIndex() { return next_commit_index_; }
bool WaitingForWatcherAck() { return waiting_for_watcher_ack_; }
void set_on_empty(fit::closure on_empty) {
watcher_.set_error_handler(
[on_empty = std::move(on_empty)](zx_status_t status) { on_empty(); });
}
private:
cloud_provider::PageCloudWatcherPtr watcher_;
// Whether we're still waiting for the watcher to ack the previous commit
// notification.
bool waiting_for_watcher_ack_ = false;
// Index of the first commit to be sent to the watcher.
size_t next_commit_index_ = 0;
FXL_DISALLOW_COPY_AND_ASSIGN(WatcherContainer);
};
FakePageCloud::WatcherContainer::WatcherContainer(
cloud_provider::PageCloudWatcherPtr watcher, size_t next_commit_index)
: watcher_(std::move(watcher)), next_commit_index_(next_commit_index) {}
void FakePageCloud::WatcherContainer::SendCommits(
std::vector<cloud_provider::CommitPackEntry> commits,
size_t next_commit_index, fit::closure on_ack) {
FXL_DCHECK(watcher_.is_bound());
FXL_DCHECK(!waiting_for_watcher_ack_);
FXL_DCHECK(!commits.empty());
waiting_for_watcher_ack_ = true;
next_commit_index_ = next_commit_index;
cloud_provider::CommitPack commit_pack;
if (!cloud_provider::EncodeCommitPack(commits, &commit_pack)) {
watcher_->OnError(cloud_provider::Status::INTERNAL_ERROR);
return;
}
watcher_->OnNewCommits(std::move(commit_pack),
PositionToToken(next_commit_index),
[this, on_ack = std::move(on_ack)] {
waiting_for_watcher_ack_ = false;
on_ack();
});
}
FakePageCloud::FakePageCloud(InjectNetworkError inject_network_error)
: inject_network_error_(inject_network_error) {
bindings_.set_empty_set_handler([this] {
if (on_empty_) {
on_empty_();
}
});
}
FakePageCloud::~FakePageCloud() {}
void FakePageCloud::Bind(
fidl::InterfaceRequest<cloud_provider::PageCloud> request) {
bindings_.AddBinding(this, std::move(request));
}
void FakePageCloud::SendPendingCommits() {
for (auto& container : containers_) {
if (container.WaitingForWatcherAck() ||
container.NextCommitIndex() >= commits_.size()) {
continue;
}
std::vector<cloud_provider::CommitPackEntry> commits;
for (size_t i = container.NextCommitIndex(); i < commits_.size(); i++) {
commits.push_back(commits_[i]);
}
container.SendCommits(std::move(commits), commits_.size(),
[this] { SendPendingCommits(); });
}
}
bool FakePageCloud::MustReturnError(uint64_t request_signature) {
switch (inject_network_error_) {
case InjectNetworkError::NO:
return false;
case InjectNetworkError::YES:
auto it = remaining_errors_to_inject_.find(request_signature);
if (it == remaining_errors_to_inject_.end()) {
remaining_errors_to_inject_[request_signature] =
kInitialRemainingErrorsToInject;
it = remaining_errors_to_inject_.find(request_signature);
}
if (it->second) {
it->second--;
return true;
}
remaining_errors_to_inject_.erase(it);
return false;
}
}
void FakePageCloud::AddCommits(cloud_provider::CommitPack commits,
AddCommitsCallback callback) {
std::vector<cloud_provider::CommitPackEntry> commit_entries;
if (!cloud_provider::DecodeCommitPack(commits, &commit_entries)) {
callback(cloud_provider::Status::INTERNAL_ERROR);
return;
}
if (MustReturnError(GetCommitsSignature(commit_entries))) {
callback(cloud_provider::Status::NETWORK_ERROR);
return;
}
std::move(commit_entries.begin(), commit_entries.end(),
std::back_inserter(commits_));
SendPendingCommits();
callback(cloud_provider::Status::OK);
}
void FakePageCloud::GetCommits(
std::unique_ptr<cloud_provider::Token> min_position_token,
GetCommitsCallback callback) {
if (MustReturnError(GetVectorSignature(
min_position_token ? min_position_token->opaque_id : std::vector<uint8_t>(),
kGetCommitsSeed))) {
callback(cloud_provider::Status::NETWORK_ERROR, nullptr, nullptr);
return;
}
std::vector<cloud_provider::CommitPackEntry> result;
size_t start = 0u;
if (!TokenToPosition(min_position_token, &start)) {
callback(cloud_provider::Status::ARGUMENT_ERROR, nullptr, nullptr);
return;
}
for (size_t i = start; i < commits_.size(); i++) {
result.push_back(commits_[i]);
}
std::unique_ptr<cloud_provider::Token> token;
if (!result.empty()) {
// This will cause the last commit to be delivered again when the token is
// used for the next GetCommits() call. This is allowed by the FIDL contract
// and should be handled correctly by the client.
token = fidl::MakeOptional(PositionToToken(commits_.size() - 1));
}
cloud_provider::CommitPack commit_pack;
if (!cloud_provider::EncodeCommitPack(result, &commit_pack)) {
callback(cloud_provider::Status::INTERNAL_ERROR, nullptr, nullptr);
return;
}
callback(cloud_provider::Status::OK,
fidl::MakeOptional(std::move(commit_pack)), std::move(token));
}
void FakePageCloud::AddObject(std::vector<uint8_t> id,
fuchsia::mem::Buffer data,
AddObjectCallback callback) {
if (MustReturnError(GetVectorSignature(id, kAddObjectSeed))) {
callback(cloud_provider::Status::NETWORK_ERROR);
return;
}
std::string bytes;
if (!fsl::StringFromVmo(data, &bytes)) {
callback(cloud_provider::Status::INTERNAL_ERROR);
return;
}
objects_[convert::ToString(id)] = bytes;
callback(cloud_provider::Status::OK);
}
void FakePageCloud::GetObject(std::vector<uint8_t> id,
GetObjectCallback callback) {
if (MustReturnError(GetVectorSignature(id, kGetObjectSeed))) {
callback(cloud_provider::Status::NETWORK_ERROR, nullptr);
return;
}
std::string id_str = convert::ToString(id);
if (!objects_.count(id_str)) {
callback(cloud_provider::Status::NOT_FOUND, nullptr);
return;
}
::fuchsia::mem::Buffer buffer;
if (!fsl::VmoFromString(objects_[id_str], &buffer)) {
callback(cloud_provider::Status::INTERNAL_ERROR, nullptr);
return;
}
callback(cloud_provider::Status::OK, fidl::MakeOptional(std::move(buffer)));
}
void FakePageCloud::SetWatcher(
std::unique_ptr<cloud_provider::Token> min_position_token,
fidl::InterfaceHandle<cloud_provider::PageCloudWatcher> watcher,
SetWatcherCallback callback) {
// TODO(qsr): Inject errors here when LE-438 is fixed.
auto watcher_ptr = watcher.Bind();
size_t first_pending_commit_index;
if (!TokenToPosition(min_position_token, &first_pending_commit_index)) {
callback(cloud_provider::Status::ARGUMENT_ERROR);
return;
}
containers_.emplace(std::move(watcher_ptr), first_pending_commit_index);
SendPendingCommits();
callback(cloud_provider::Status::OK);
}
} // namespace ledger