blob: 2ba2cc972634faf5f77b72882557335c58604fac [file] [log] [blame]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/storage/blobfs/blob_creator.h"
#include <fidl/fuchsia.fxfs/cpp/wire.h>
#include <lib/fidl/cpp/wire/channel.h>
#include <lib/fidl/cpp/wire/status.h>
#include <lib/fzl/vmo-mapper.h>
#include <lib/zx/channel.h>
#include <lib/zx/result.h>
#include <lib/zx/vmo.h>
#include <zircon/assert.h>
#include <zircon/errors.h>
#include <zircon/rights.h>
#include <zircon/status.h>
#include <zircon/types.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <fbl/ref_ptr.h>
#include "src/storage/blobfs/blob.h"
#include "src/storage/blobfs/blobfs.h"
#include "src/storage/blobfs/delivery_blob_private.h"
#include "src/storage/blobfs/format.h"
#include "src/storage/lib/vfs/cpp/service.h"
namespace blobfs {
namespace {
constexpr uint64_t kRingBufferSize = 256ul * 1024;
} // namespace
class BlobWriter final : public fidl::WireServer<fuchsia_fxfs::BlobWriter> {
public:
explicit BlobWriter(fbl::RefPtr<Blob> blob, zx::unowned_channel server_channel)
: blob_(std::move(blob)), server_channel_(std::move(server_channel)) {
blob_->SetBlobWriterHandler(this);
}
~BlobWriter() {
if (blob_) {
blob_->SetBlobWriterHandler(nullptr);
}
}
void GetVmo(fuchsia_fxfs::wire::BlobWriterGetVmoRequest* request,
GetVmoCompleter::Sync& completer) final {
if (!blob_) {
completer.ReplyError(ZX_ERR_BAD_STATE);
completer.Close(ZX_ERR_BAD_STATE);
return;
}
auto result = GetVmoImpl(request->size);
if (result.is_error()) {
completer.ReplyError(result.status_value());
} else {
completer.ReplySuccess(std::move(result).value());
}
}
void BytesReady(fuchsia_fxfs::wire::BlobWriterBytesReadyRequest* request,
BytesReadyCompleter::Sync& completer) final {
if (!blob_) {
completer.ReplyError(ZX_ERR_BAD_STATE);
completer.Close(ZX_ERR_BAD_STATE);
} else if (auto result = BytesReadyImpl(request->bytes_written); result.is_error()) {
completer.ReplyError(result.status_value());
completer.Close(result.status_value());
} else {
completer.ReplySuccess();
}
}
// Checks if the server channel is still active. If the client side has closed the associated
// blob will be dropped synchronously, failing any future requests.
bool ChannelActive() {
if (!blob_) {
return false;
}
zx_status_t status = server_channel_->signal_peer(0, 0);
FX_LOGS(INFO) << "Checking BlobWriter channel active: " << zx_status_get_string(status);
if (status != ZX_OK) {
blob_.reset();
return false;
}
return true;
}
private:
zx::result<zx::vmo> GetVmoImpl(uint64_t size) {
if (expected_size_.has_value() || vmo_.is_valid()) {
// GetVmo was already called.
return zx::error(ZX_ERR_BAD_STATE);
}
if (size < MetadataType1::kHeader.header_length) {
// The number of bytes being transferred is less than the size of the delivery blob header.
return zx::error(ZX_ERR_BAD_STATE);
}
expected_size_ = size;
if (zx_status_t status = blob_->Truncate(size); status != ZX_OK) {
return zx::error(status);
}
zx::vmo vmo;
if (zx_status_t status = zx::vmo::create(kRingBufferSize, 0, &vmo); status != ZX_OK) {
return zx::error(status);
}
zx::vmo vmo_dup;
if (zx_status_t status = vmo.duplicate(ZX_RIGHT_SAME_RIGHTS, &vmo_dup); status != ZX_OK) {
return zx::error(status);
}
if (zx_status_t status = vmo_mapper_.Map(vmo); status != ZX_OK) {
return zx::error(status);
}
buffer_ = static_cast<uint8_t*>(vmo_mapper_.start());
vmo_ = std::move(vmo);
return zx::ok(std::move(vmo_dup));
}
zx::result<> WriteBufferToBlob(uint64_t buffer_offset, uint64_t len) {
size_t actual = 0;
if (zx_status_t status = blob_->Write(buffer_ + buffer_offset, len, total_written_, &actual);
status != ZX_OK) {
return zx::error(status);
}
// BlobWriter doesn't support partial writes.
ZX_DEBUG_ASSERT(actual == len);
total_written_ += len;
return zx::ok();
}
zx::result<> BytesReadyImpl(uint64_t bytes_written) {
if (bytes_written > kRingBufferSize) {
return zx::error(ZX_ERR_OUT_OF_RANGE);
}
if (!vmo_.is_valid()) {
return zx::error(ZX_ERR_BAD_STATE);
}
if (!expected_size_.has_value()) {
return zx::error(ZX_ERR_BAD_STATE);
}
uint64_t expected_size = *expected_size_;
if (total_written_ + bytes_written > expected_size) {
return zx::error(ZX_ERR_BUFFER_TOO_SMALL);
}
uint64_t vmo_offset = total_written_ % kRingBufferSize;
// |bytes_ready| may wrap around the end of ring buffer.
uint64_t to_write = std::min(bytes_written, kRingBufferSize - vmo_offset);
if (zx::result result = WriteBufferToBlob(vmo_offset, to_write); result.is_error()) {
return result.take_error();
}
if (to_write < bytes_written) {
// Write the wrapped bytes.
if (zx::result result = WriteBufferToBlob(0, bytes_written - to_write); result.is_error()) {
return result.take_error();
}
}
if (total_written_ == expected_size) {
// The blob should now be fully written and in the readable state.
ZX_DEBUG_ASSERT(blob_->IsReadable());
}
return zx::ok();
}
static zx_status_t CleanupInactiveBlobWritersInternal(Blob* blob) { return ZX_OK; }
fbl::RefPtr<Blob> blob_;
// Holds a borrow of the server_channel, it should be valid as long as the binding is. Used to
// internally check if the client end is still open or not.
zx::unowned_channel server_channel_;
std::optional<uint64_t> expected_size_;
uint64_t total_written_ = 0;
zx::vmo vmo_;
fzl::VmoMapper vmo_mapper_;
uint8_t* buffer_ = nullptr;
}; // class BlobWriter
BlobCreator::BlobCreator(Blobfs& blobfs)
: fs::Service([this](fidl::ServerEnd<fuchsia_fxfs::BlobCreator> server_end) {
bindings_.AddBinding(blobfs_.dispatcher(), std::move(server_end), this,
fidl::kIgnoreBindingClosure);
return ZX_OK;
}),
blobfs_(blobfs) {}
void BlobCreator::Create(fuchsia_fxfs::wire::BlobCreatorCreateRequest* request,
CreateCompleter::Sync& completer) {
Digest digest(request->hash.data_);
auto result = CreateImpl(digest, request->allow_existing);
if (result.is_error()) {
if (result.status_value() == ZX_ERR_ALREADY_EXISTS) {
completer.ReplyError(fuchsia_fxfs::wire::CreateBlobError::kAlreadyExists);
} else {
completer.ReplyError(fuchsia_fxfs::wire::CreateBlobError::kInternal);
}
} else {
completer.ReplySuccess(std::move(result).value());
}
}
zx::result<fidl::ClientEnd<fuchsia_fxfs::BlobWriter>> BlobCreator::CreateImpl(const Digest& digest,
bool allow_existing) {
std::optional<fbl::RefPtr<Blob>> to_overwrite;
fbl::RefPtr<CacheNode> found;
if (zx_status_t status = blobfs_.GetCache().Lookup(digest, &found); status == ZX_OK) {
auto blob = fbl::RefPtr<Blob>::Downcast(std::move(found));
if (allow_existing) {
Blob* overwriting_by = blob->GetOverwritingBy();
if (overwriting_by) {
if (BlobWriter* handler = overwriting_by->GetBlobWriterHandler(); handler) {
// If there is an active writer, check if it is still open.
if (handler->ChannelActive()) {
return zx::error(ZX_ERR_ALREADY_EXISTS);
}
}
}
// Check the cache if a previous version is here. If so, save a ref to it to replace it. To
// keep the number of states low, only allow this if the existing version is readable, and
// there is not another overwrite already in-flight. Also blocks if the blob is already marked
// deleted and has not yet been purged. It would be confusing to allow overwrite to start
// after a purge has been queued, but will later purge both versions.
if (!blob->IsReadable() || blob->GetOverwritingBy() || blob->DeletionQueued()) {
return zx::error(ZX_ERR_ALREADY_EXISTS);
}
to_overwrite = std::move(blob);
} else if (blob->IsReadable()) {
// The blob has finished writing and is persistent, but they didn't set allow_existing.
return zx::error(ZX_ERR_ALREADY_EXISTS);
} else if (BlobWriter* handler = blob->GetBlobWriterHandler(); handler) {
// Drop the local reference, either we're exiting because a write is in-flight or we're going
// let the blob get cleaned up by removing the binding.
blob.reset();
// If there is an active writer, check if it is still open.
if (handler->ChannelActive()) {
return zx::error(ZX_ERR_ALREADY_EXISTS);
}
}
}
fbl::RefPtr new_blob = fbl::AdoptRef(new Blob(blobfs_, digest, true));
if (to_overwrite.has_value()) {
// Don't put it in the cache if it is not the canonical version yet.
if (zx_status_t status = to_overwrite.value()->SetOverwritingBy(new_blob.get());
status != ZX_OK) {
// This should never happen due to earlier checks.
ZX_DEBUG_ASSERT(status == ZX_OK);
return zx::error(status);
}
new_blob->SetBlobToOverwrite(std::move(to_overwrite.value()));
} else if (zx_status_t status = blobfs_.GetCache().Add(new_blob); status != ZX_OK) {
// This should never happen due to earlier checks.
ZX_DEBUG_ASSERT(status == ZX_OK);
return zx::error(status);
}
auto endpoints = fidl::CreateEndpoints<fuchsia_fxfs::BlobWriter>();
if (endpoints.is_error()) {
return endpoints.take_error();
}
auto writer =
std::make_unique<BlobWriter>(std::move(new_blob), endpoints->server.handle()->borrow());
writer_bindings_.AddBinding(blobfs_.dispatcher(), std::move(endpoints->server), writer.get(),
[writer = std::move(writer)](fidl::UnbindInfo info) {
// The lambda owns the writer. When the binding is closed, the
// writer will be destroyed.
});
return zx::ok(std::move(endpoints->client));
}
} // namespace blobfs