blob: 405624ab0519f146260e2d31ecf92ccd74aee44f [file] [log] [blame]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "peridot/bin/ledger/storage/public/data_source.h"
#include <lib/fit/function.h>
#include <lib/fsl/socket/socket_drainer.h>
#include <lib/zx/vmar.h>
#include "peridot/lib/convert/convert.h"
namespace storage {
namespace {
template <typename S>
class StringLikeDataChunk : public DataSource::DataChunk {
public:
explicit StringLikeDataChunk(S value) : value_(std::move(value)) {}
private:
fxl::StringView Get() override { return convert::ExtendedStringView(value_); }
S value_;
};
template <typename S>
class StringLikeDataSource : public DataSource {
public:
explicit StringLikeDataSource(S value)
: value_(std::move(value)), size_(value_.size()) {}
private:
uint64_t GetSize() override { return size_; }
void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback)
override {
#ifndef NDEBUG
FXL_DCHECK(!called_);
called_ = true;
#endif
callback(std::make_unique<StringLikeDataChunk<S>>(std::move(value_)),
Status::DONE);
}
S value_;
uint64_t size_;
#ifndef NDEBUG
bool called_ = false;
#endif
};
class VmoDataChunk : public DataSource::DataChunk {
public:
explicit VmoDataChunk(fsl::SizedVmo vmo) : vmo_(std::move(vmo)) {}
zx_status_t Init() {
uintptr_t allocate_address;
zx_status_t status = zx::vmar::root_self()->allocate(
0, ToFullPages(vmo_.size()), ZX_VM_CAN_MAP_READ, &vmar_,
&allocate_address);
if (status != ZX_OK) {
return status;
}
return vmar_.map(0, vmo_.vmo(), 0, vmo_.size(), ZX_VM_PERM_READ,
&mapped_address_);
}
private:
uint64_t ToFullPages(uint64_t value) {
return (value + PAGE_SIZE - 1) & (~(PAGE_SIZE - 1));
}
fxl::StringView Get() override {
return fxl::StringView(reinterpret_cast<char*>(mapped_address_),
vmo_.size());
}
fsl::SizedVmo vmo_;
zx::vmar vmar_;
uintptr_t mapped_address_;
};
class VmoDataSource : public DataSource {
public:
explicit VmoDataSource(fsl::SizedVmo vmo) : vmo_(std::move(vmo)) {
FXL_DCHECK(vmo_);
}
private:
uint64_t GetSize() override { return vmo_.size(); }
void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback)
override {
#ifndef NDEBUG
FXL_DCHECK(!called_);
called_ = true;
#endif
if (!vmo_) {
callback(nullptr, Status::ERROR);
return;
}
auto data = std::make_unique<VmoDataChunk>(std::move(vmo_));
if (data->Init() != ZX_OK) {
callback(nullptr, Status::ERROR);
return;
}
callback(std::move(data), Status::DONE);
}
fsl::SizedVmo vmo_;
#ifndef NDEBUG
bool called_ = false;
#endif
};
class SocketDataSource : public DataSource, public fsl::SocketDrainer::Client {
public:
SocketDataSource(zx::socket socket, uint64_t expected_size)
: socket_(std::move(socket)),
expected_size_(expected_size),
remaining_bytes_(expected_size) {
FXL_DCHECK(socket_);
}
private:
uint64_t GetSize() override { return expected_size_; }
void Get(fit::function<void(std::unique_ptr<DataChunk>, Status)> callback)
override {
FXL_DCHECK(socket_);
callback_ = std::move(callback);
socket_drainer_ = std::make_unique<fsl::SocketDrainer>(this);
socket_drainer_->Start(std::move(socket_));
socket_.reset();
}
void OnDataAvailable(const void* data, size_t num_bytes) override {
if (num_bytes > remaining_bytes_) {
FXL_LOG(ERROR) << "Received incorrect number of bytes. Expected: "
<< expected_size_ << ", but received at least "
<< (num_bytes - remaining_bytes_) << " more.";
socket_drainer_.reset();
callback_(nullptr, Status::ERROR);
return;
}
remaining_bytes_ -= num_bytes;
callback_(std::make_unique<StringLikeDataChunk<std::string>>(
std::string(reinterpret_cast<const char*>(data), num_bytes)),
Status::TO_BE_CONTINUED);
}
void OnDataComplete() override {
socket_drainer_.reset();
if (remaining_bytes_ != 0) {
FXL_LOG(ERROR) << "Received incorrect number of bytes. Expected: "
<< expected_size_ << ", but received "
<< (expected_size_ - remaining_bytes_);
callback_(nullptr, Status::ERROR);
return;
}
callback_(std::make_unique<StringLikeDataChunk<std::string>>(std::string()),
Status::DONE);
}
zx::socket socket_;
uint64_t expected_size_;
uint64_t remaining_bytes_;
std::unique_ptr<fsl::SocketDrainer> socket_drainer_;
fit::function<void(std::unique_ptr<DataChunk>, Status)> callback_;
};
class FlatBufferDataChunk : public DataSource::DataChunk {
public:
explicit FlatBufferDataChunk(
std::unique_ptr<flatbuffers::FlatBufferBuilder> value)
: value_(std::move(value)) {}
private:
fxl::StringView Get() override {
return fxl::StringView(reinterpret_cast<char*>(value_->GetBufferPointer()),
value_->GetSize());
}
std::unique_ptr<flatbuffers::FlatBufferBuilder> value_;
};
} // namespace
std::unique_ptr<DataSource::DataChunk> DataSource::DataChunk::Create(
std::string value) {
return std::make_unique<StringLikeDataChunk<std::string>>(std::move(value));
}
std::unique_ptr<DataSource::DataChunk> DataSource::DataChunk::Create(
std::unique_ptr<flatbuffers::FlatBufferBuilder> builder) {
return std::make_unique<FlatBufferDataChunk>(std::move(builder));
}
std::unique_ptr<DataSource> DataSource::Create(std::string value) {
return std::make_unique<StringLikeDataSource<std::string>>(std::move(value));
}
std::unique_ptr<DataSource> DataSource::Create(std::vector<uint8_t> value) {
return std::make_unique<StringLikeDataSource<std::vector<uint8_t>>>(
std::move(value));
}
std::unique_ptr<DataSource> DataSource::Create(fsl::SizedVmo vmo) {
return std::make_unique<VmoDataSource>(std::move(vmo));
}
std::unique_ptr<DataSource> DataSource::Create(zx::socket socket,
uint64_t size) {
return std::make_unique<SocketDataSource>(std::move(socket), size);
}
} // namespace storage