blob: 1bc4d8cee0f30c34468c1cc0640ef8cca68918f7 [file] [log] [blame] [edit]
// Copyright 2020 The Fuchsia Authors
//
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT
#include "object/stream_dispatcher.h"
#include <lib/counters.h>
#include <zircon/errors.h>
#include <zircon/rights.h>
#include <zircon/types.h>
#include <fbl/alloc_checker.h>
#include <ktl/algorithm.h>
#include <ktl/atomic.h>
#include <object/vm_object_dispatcher.h>
#include <ktl/enforce.h>
KCOUNTER(dispatcher_stream_create_count, "dispatcher.stream.create")
KCOUNTER(dispatcher_stream_destroy_count, "dispatcher.stream.destroy")
// static
zx_status_t StreamDispatcher::parse_create_syscall_flags(uint32_t flags, uint32_t* out_flags,
zx_rights_t* out_required_vmo_rights) {
uint32_t res = 0;
zx_rights_t required_vmo_rights = ZX_RIGHT_NONE;
if (flags & ZX_STREAM_MODE_READ) {
res |= kModeRead;
required_vmo_rights |= ZX_RIGHT_READ;
flags &= ~ZX_STREAM_MODE_READ;
}
if (flags & ZX_STREAM_MODE_WRITE) {
res |= kModeWrite;
required_vmo_rights |= ZX_RIGHT_WRITE;
flags &= ~ZX_STREAM_MODE_WRITE;
}
if (flags & ZX_STREAM_MODE_APPEND) {
res |= kModeAppend;
flags &= ~ZX_STREAM_MODE_APPEND;
}
if (flags) {
return ZX_ERR_INVALID_ARGS;
}
*out_flags = res;
*out_required_vmo_rights = required_vmo_rights;
return ZX_OK;
}
// static
zx_status_t StreamDispatcher::Create(uint32_t options, fbl::RefPtr<VmObjectPaged> vmo,
fbl::RefPtr<StreamSizeManager> ssm, zx_off_t seek,
KernelHandle<StreamDispatcher>* handle, zx_rights_t* rights) {
fbl::AllocChecker ac;
KernelHandle new_handle(
fbl::AdoptRef(new (&ac) StreamDispatcher(options, ktl::move(vmo), ktl::move(ssm), seek)));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
zx_rights_t new_rights = default_rights();
if (options & kModeRead) {
new_rights |= ZX_RIGHT_READ;
}
if (options & kModeWrite) {
new_rights |= ZX_RIGHT_WRITE;
}
*rights = new_rights;
*handle = ktl::move(new_handle);
return ZX_OK;
}
StreamDispatcher::StreamDispatcher(uint32_t options, fbl::RefPtr<VmObjectPaged> vmo,
fbl::RefPtr<StreamSizeManager> stream_size_mgr, zx_off_t seek)
: options_(options),
vmo_(ktl::move(vmo)),
stream_size_mgr_(ktl::move(stream_size_mgr)),
seek_(seek) {
kcounter_add(dispatcher_stream_create_count, 1);
(void)options_;
}
StreamDispatcher::~StreamDispatcher() { kcounter_add(dispatcher_stream_destroy_count, 1); }
ktl::pair<zx_status_t, size_t> StreamDispatcher::ReadVector(user_out_iovec_t user_data) {
canary_.Assert();
size_t total_capacity = 0;
{
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return {status, 0};
}
if (total_capacity == 0) {
return {ZX_OK, 0};
}
}
size_t length = 0u;
uint64_t offset = 0u;
StreamSizeManager::Operation op(stream_size_mgr_.get());
Guard<Mutex> seek_guard{&seek_lock_};
{
Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()};
uint64_t size_limit = 0u;
stream_size_mgr_->BeginReadLocked(seek_ + total_capacity, &size_limit, &op);
if (size_limit <= seek_) {
// Return |ZX_OK| since there is nothing to be read.
op.CancelLocked();
return {ZX_OK, 0};
}
offset = seek_;
length = size_limit - offset;
}
auto [status, read_bytes] = vmo_->ReadUserVector(user_data, offset, length);
seek_ += read_bytes;
// Reacquire the lock to commit the operation.
Guard<Mutex> stream_size_guard{op.lock()};
op.CommitLocked();
return {read_bytes > 0 ? ZX_OK : status, read_bytes};
}
ktl::pair<zx_status_t, size_t> StreamDispatcher::ReadVectorAt(user_out_iovec_t user_data,
zx_off_t offset) {
canary_.Assert();
size_t total_capacity = 0;
{
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return {status, 0};
}
if (total_capacity == 0) {
return {ZX_OK, 0};
}
}
size_t length = 0u;
StreamSizeManager::Operation op(stream_size_mgr_.get());
{
Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()};
uint64_t size_limit = 0u;
stream_size_mgr_->BeginReadLocked(offset + total_capacity, &size_limit, &op);
if (size_limit <= offset) {
// Return |ZX_OK| since there is nothing to be read.
op.CancelLocked();
return {ZX_OK, 0};
}
length = size_limit - offset;
}
auto [status, read_bytes] = vmo_->ReadUserVector(user_data, offset, length);
// Reacquire the lock to commit the operation.
Guard<Mutex> stream_size_guard{op.lock()};
op.CommitLocked();
return {read_bytes > 0 ? ZX_OK : status, read_bytes};
}
ktl::pair<zx_status_t, size_t> StreamDispatcher::WriteVector(user_in_iovec_t user_data) {
canary_.Assert();
if (IsInAppendMode()) {
return AppendVector(user_data);
}
size_t total_capacity = 0;
{
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return {status, 0};
}
// Return early if writing zero bytes since there's nothing to do.
if (total_capacity == 0) {
return {ZX_OK, 0};
}
}
size_t length = 0u;
StreamSizeManager::Operation op(stream_size_mgr_.get());
ktl::optional<uint64_t> prev_stream_size;
Guard<Mutex> seek_guard{&seek_lock_};
{
zx_status_t status = CreateWriteOp(total_capacity, seek_, &length, &prev_stream_size, &op);
if (status != ZX_OK) {
return {status, 0};
}
}
auto [status, written] = vmo_->WriteUserVector(
user_data, seek_, length,
prev_stream_size ? [&prev_stream_size, &op](const uint64_t write_offset, const size_t len) {
if (write_offset + len > *prev_stream_size) {
op.UpdateStreamSizeFromProgress(write_offset + len);
}
} : VmObject::OnWriteBytesTransferredCallback());
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> stream_size_guard{op.lock()};
// Update the stream size operation if operation was partially successful.
if (written < length) {
DEBUG_ASSERT(status != ZX_OK);
if (written == 0u) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return {status, written};
} else {
op.ShrinkSizeLocked(seek_ + written);
}
}
seek_ += written;
op.CommitLocked();
return {written > 0 ? ZX_OK : status, written};
}
ktl::pair<zx_status_t, size_t> StreamDispatcher::WriteVectorAt(user_in_iovec_t user_data,
zx_off_t offset) {
canary_.Assert();
size_t total_capacity = 0;
{
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return {status, 0};
}
// Return early if writing zero bytes
if (total_capacity == 0) {
return {ZX_OK, 0};
}
}
size_t length = 0u;
StreamSizeManager::Operation op(stream_size_mgr_.get());
ktl::optional<uint64_t> prev_stream_size;
{
zx_status_t status = CreateWriteOp(total_capacity, offset, &length, &prev_stream_size, &op);
if (status != ZX_OK) {
return {status, 0};
}
}
auto [status, written] = vmo_->WriteUserVector(
user_data, offset, length,
prev_stream_size ? [&prev_stream_size, &op](const uint64_t write_offset, const size_t len) {
if (write_offset + len > *prev_stream_size) {
op.UpdateStreamSizeFromProgress(write_offset + len);
}
} : VmObject::OnWriteBytesTransferredCallback());
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> stream_size_guard{op.lock()};
// Update the stream size operation if operation was partially successful.
if (written < length) {
DEBUG_ASSERT(status != ZX_OK);
if (written == 0u) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return {status, written};
} else {
op.ShrinkSizeLocked(offset + written);
}
}
op.CommitLocked();
return {written > 0 ? ZX_OK : status, written};
}
ktl::pair<zx_status_t, size_t> StreamDispatcher::AppendVector(user_in_iovec_t user_data) {
canary_.Assert();
size_t total_capacity = 0;
{
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return {status, 0};
}
// Return early if writing zero bytes since there's nothing to do.
if (total_capacity == 0) {
return {ZX_OK, 0};
}
}
size_t length = 0u;
uint64_t offset = 0u;
StreamSizeManager::Operation op(stream_size_mgr_.get());
Guard<Mutex> seek_guard{&seek_lock_};
// This section expands the VMO if necessary and bumps the |seek_| pointer if successful.
{
Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()};
zx_status_t status =
stream_size_mgr_->BeginAppendLocked(total_capacity, &stream_size_guard, &op);
if (status != ZX_OK) {
return {status, 0};
}
uint64_t new_stream_size = op.GetSizeLocked();
offset = new_stream_size - total_capacity;
uint64_t vmo_size = vmo_->size();
if (vmo_size <= offset) {
// We can't even perform a partial write
op.CancelLocked();
return {ZX_ERR_OUT_OF_RANGE, 0};
}
if (vmo_size < new_stream_size) {
// Unable to expand to requested size but able to perform a partial write.
op.ShrinkSizeLocked(vmo_size);
}
length = ktl::min(vmo_size, new_stream_size) - offset;
}
auto [status, written] = vmo_->WriteUserVector(
user_data, offset, length, [&op](const uint64_t write_offset, const size_t len) {
op.UpdateStreamSizeFromProgress(write_offset + len);
});
seek_ = offset + written;
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()};
// Update the stream size operation if operation was partially successful.
if (written < length) {
DEBUG_ASSERT(status != ZX_OK);
if (written == 0) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return {status, written};
} else {
op.ShrinkSizeLocked(offset + written);
}
}
op.CommitLocked();
return {written > 0 ? ZX_OK : status, written};
}
zx_status_t StreamDispatcher::Seek(zx_stream_seek_origin_t whence, int64_t offset,
zx_off_t* out_seek) {
canary_.Assert();
Guard<Mutex> seek_guard{&seek_lock_};
zx_off_t target;
switch (whence) {
case ZX_STREAM_SEEK_ORIGIN_START: {
if (offset < 0) {
return ZX_ERR_INVALID_ARGS;
}
target = static_cast<zx_off_t>(offset);
break;
}
case ZX_STREAM_SEEK_ORIGIN_CURRENT: {
if (add_overflow(seek_, offset, &target)) {
return ZX_ERR_INVALID_ARGS;
}
break;
}
case ZX_STREAM_SEEK_ORIGIN_END: {
uint64_t stream_size = stream_size_mgr_->GetStreamSize();
if (add_overflow(stream_size, offset, &target)) {
return ZX_ERR_INVALID_ARGS;
}
break;
}
default: {
return ZX_ERR_INVALID_ARGS;
}
}
seek_ = target;
*out_seek = seek_;
return ZX_OK;
}
zx_status_t StreamDispatcher::SetAppendMode(bool value) {
Guard<CriticalMutex> guard{get_lock()};
options_ = (options_ & ~kModeAppend) | (value ? kModeAppend : 0);
return ZX_OK;
}
bool StreamDispatcher::IsInAppendMode() const {
Guard<CriticalMutex> guard{get_lock()};
return options_ & kModeAppend;
}
zx_info_stream_t StreamDispatcher::GetInfo() const {
canary_.Assert();
Guard<CriticalMutex> options_guard{get_lock()};
Guard<Mutex> seek_guard{&seek_lock_};
uint32_t options = 0;
if (options_ & kModeRead) {
options |= ZX_STREAM_MODE_READ;
}
if (options_ & kModeWrite) {
options |= ZX_STREAM_MODE_WRITE;
}
if (options_ & kModeAppend) {
options |= ZX_STREAM_MODE_APPEND;
}
return {
.options = options,
.seek = seek_,
// |content_size| is the legacy name for the stream size
.content_size = stream_size_mgr_->GetStreamSize(),
};
}
bool StreamDispatcher::CanResizeVmo() const {
Guard<CriticalMutex> guard{get_lock()};
return options_ & kCanResizeVmo;
}
zx_status_t StreamDispatcher::CreateWriteOp(size_t total_capacity, zx_off_t offset,
uint64_t* out_length,
ktl::optional<uint64_t>* out_prev_stream_size,
StreamSizeManager::Operation* out_op) {
DEBUG_ASSERT(out_op);
DEBUG_ASSERT(out_length);
zx_status_t status = ZX_OK;
{
Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), out_op->lock()};
size_t requested_stream_size;
if (add_overflow(offset, total_capacity, &requested_stream_size)) {
return ZX_ERR_FILE_BIG;
}
stream_size_mgr_->BeginWriteLocked(requested_stream_size, &stream_size_guard,
out_prev_stream_size, out_op);
uint64_t vmo_size = vmo_->size();
if (vmo_size <= offset) {
// We can't even perform a partial write
out_op->CancelLocked();
return ZX_ERR_OUT_OF_RANGE;
}
// Allow writing up to the minimum of the VMO size and requested stream size, since we want to
// write at most the requested size but don't want to write beyond the VMO size.
const uint64_t target_stream_size = ktl::min(vmo_size, requested_stream_size);
*out_length = target_stream_size - offset;
if (target_stream_size != requested_stream_size) {
out_op->ShrinkSizeLocked(target_stream_size);
}
}
// Zero content between the previous stream size and the start of the write.
if (out_prev_stream_size->has_value() && out_prev_stream_size->value() < offset) {
status = vmo_->ZeroRange(out_prev_stream_size->value(), offset - out_prev_stream_size->value());
if (status != ZX_OK) {
Guard<Mutex> stream_size_guard{out_op->lock()};
out_op->CancelLocked();
return status;
}
}
return ZX_OK;
}