blob: ff4d4a9e2a1e7cd249e971d49f23cb8173acef96 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors
//
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT
#include "object/stream_dispatcher.h"
#include <lib/counters.h>
#include <zircon/errors.h>
#include <zircon/rights.h>
#include <zircon/types.h>
#include <fbl/alloc_checker.h>
#include <ktl/algorithm.h>
#include <ktl/atomic.h>
#include <object/vm_object_dispatcher.h>
#include <ktl/enforce.h>
KCOUNTER(dispatcher_stream_create_count, "dispatcher.stream.create")
KCOUNTER(dispatcher_stream_destroy_count, "dispatcher.stream.destroy")
// static
zx_status_t StreamDispatcher::parse_create_syscall_flags(uint32_t flags, uint32_t* out_flags,
zx_rights_t* out_required_vmo_rights) {
uint32_t res = 0;
zx_rights_t required_vmo_rights = ZX_RIGHT_NONE;
if (flags & ZX_STREAM_MODE_READ) {
res |= kModeRead;
required_vmo_rights |= ZX_RIGHT_READ;
flags &= ~ZX_STREAM_MODE_READ;
}
if (flags & ZX_STREAM_MODE_WRITE) {
res |= kModeWrite;
required_vmo_rights |= ZX_RIGHT_WRITE;
flags &= ~ZX_STREAM_MODE_WRITE;
}
if (flags & ZX_STREAM_MODE_APPEND) {
res |= kModeAppend;
flags &= ~ZX_STREAM_MODE_APPEND;
}
if (flags) {
return ZX_ERR_INVALID_ARGS;
}
*out_flags = res;
*out_required_vmo_rights = required_vmo_rights;
return ZX_OK;
}
// static
zx_status_t StreamDispatcher::Create(uint32_t options, fbl::RefPtr<VmObjectDispatcher> vmo,
zx_off_t seek, KernelHandle<StreamDispatcher>* handle,
zx_rights_t* rights) {
fbl::AllocChecker ac;
KernelHandle new_handle(fbl::AdoptRef(new (&ac) StreamDispatcher(options, ktl::move(vmo), seek)));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
}
zx_rights_t new_rights = default_rights();
if (options & kModeRead) {
new_rights |= ZX_RIGHT_READ;
}
if (options & kModeWrite) {
new_rights |= ZX_RIGHT_WRITE;
}
*rights = new_rights;
*handle = ktl::move(new_handle);
return ZX_OK;
}
StreamDispatcher::StreamDispatcher(uint32_t options, fbl::RefPtr<VmObjectDispatcher> vmo,
zx_off_t seek)
: options_(options), vmo_(ktl::move(vmo)), seek_(seek) {
kcounter_add(dispatcher_stream_create_count, 1);
(void)options_;
}
StreamDispatcher::~StreamDispatcher() { kcounter_add(dispatcher_stream_destroy_count, 1); }
zx_status_t StreamDispatcher::ReadVector(user_out_iovec_t user_data, size_t* out_actual) {
canary_.Assert();
DEBUG_ASSERT(out_actual);
DEBUG_ASSERT(*out_actual == 0);
size_t total_capacity;
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return status;
}
if (total_capacity == 0) {
return ZX_OK;
}
size_t length = 0u;
uint64_t offset = 0u;
ContentSizeManager::Operation op;
Guard<Mutex> seek_guard{&seek_lock_};
{
Guard<Mutex> content_size_guard{vmo_->content_size_manager()->lock()};
uint64_t size_limit = 0u;
vmo_->content_size_manager()->BeginReadLocked(seek_ + total_capacity, &size_limit, &op);
if (size_limit <= seek_) {
// Return |ZX_OK| since there is nothing to be read.
op.AssertParentLockHeld();
op.CancelLocked();
return ZX_OK;
}
offset = seek_;
length = size_limit - offset;
}
status = vmo_->ReadVector(user_data, length, offset, out_actual);
seek_ += *out_actual;
// Reacquire the lock to commit the operation.
Guard<Mutex> content_size_guard{op.parent()->lock()};
op.CommitLocked();
return *out_actual > 0 ? ZX_OK : status;
}
zx_status_t StreamDispatcher::ReadVectorAt(user_out_iovec_t user_data, zx_off_t offset,
size_t* out_actual) {
canary_.Assert();
DEBUG_ASSERT(out_actual);
DEBUG_ASSERT(*out_actual == 0);
size_t total_capacity;
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return status;
}
if (total_capacity == 0) {
return ZX_OK;
}
size_t length = 0u;
ContentSizeManager::Operation op;
{
Guard<Mutex> content_size_guard{vmo_->content_size_manager()->lock()};
uint64_t size_limit = 0u;
vmo_->content_size_manager()->BeginReadLocked(offset + total_capacity, &size_limit, &op);
if (size_limit <= offset) {
// Return |ZX_OK| since there is nothing to be read.
op.AssertParentLockHeld();
op.CancelLocked();
return ZX_OK;
}
length = size_limit - offset;
}
status = vmo_->ReadVector(user_data, length, offset, out_actual);
// Reacquire the lock to commit the operation.
Guard<Mutex> content_size_guard{op.parent()->lock()};
op.CommitLocked();
return *out_actual > 0 ? ZX_OK : status;
}
zx_status_t StreamDispatcher::WriteVector(user_in_iovec_t user_data, size_t* out_actual) {
canary_.Assert();
DEBUG_ASSERT(out_actual);
DEBUG_ASSERT(*out_actual == 0);
if (IsInAppendMode()) {
return AppendVector(user_data, out_actual);
}
size_t total_capacity;
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return status;
}
// Return early if writing zero bytes since there's nothing to do.
if (total_capacity == 0) {
return ZX_OK;
}
size_t length = 0u;
ContentSizeManager::Operation op;
ktl::optional<uint64_t> prev_content_size;
Guard<Mutex> seek_guard{&seek_lock_};
status = CreateWriteOpAndExpandVmo(total_capacity, seek_, &length, &prev_content_size, &op);
if (status != ZX_OK) {
return status;
}
if (prev_content_size) {
status =
vmo_->WriteVector(user_data, length, seek_, out_actual,
[&prev_content_size, &op](const uint64_t write_offset, const size_t len) {
if (write_offset + len > *prev_content_size) {
op.UpdateContentSizeFromProgress(write_offset + len);
}
});
} else {
status = vmo_->WriteVector(user_data, length, seek_, out_actual);
}
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> content_size_guard{op.parent()->lock()};
// Update the content size operation if operation was partially successful.
if (*out_actual < length) {
DEBUG_ASSERT(status != ZX_OK);
if (*out_actual == 0u) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return status;
} else {
op.ShrinkSizeLocked(seek_ + *out_actual);
}
}
seek_ += *out_actual;
op.CommitLocked();
return *out_actual > 0 ? ZX_OK : status;
}
zx_status_t StreamDispatcher::WriteVectorAt(user_in_iovec_t user_data, zx_off_t offset,
size_t* out_actual) {
canary_.Assert();
DEBUG_ASSERT(out_actual);
DEBUG_ASSERT(*out_actual == 0);
size_t total_capacity;
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return status;
}
// Return early if writing zero bytes
if (total_capacity == 0) {
return ZX_OK;
}
size_t length = 0u;
ContentSizeManager::Operation op;
ktl::optional<uint64_t> prev_content_size;
status = CreateWriteOpAndExpandVmo(total_capacity, offset, &length, &prev_content_size, &op);
if (status != ZX_OK) {
return status;
}
if (prev_content_size) {
status =
vmo_->WriteVector(user_data, length, offset, out_actual,
[&prev_content_size, &op](const uint64_t write_offset, const size_t len) {
if (write_offset + len > *prev_content_size) {
op.UpdateContentSizeFromProgress(write_offset + len);
}
});
} else {
status = vmo_->WriteVector(user_data, length, offset, out_actual);
}
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> content_size_guard{op.parent()->lock()};
// Update the content size operation if operation was partially successful.
if (*out_actual < length) {
DEBUG_ASSERT(status != ZX_OK);
if (*out_actual == 0u) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return status;
} else {
op.ShrinkSizeLocked(offset + *out_actual);
}
}
op.CommitLocked();
return *out_actual > 0 ? ZX_OK : status;
}
zx_status_t StreamDispatcher::AppendVector(user_in_iovec_t user_data, size_t* out_actual) {
canary_.Assert();
DEBUG_ASSERT(out_actual);
DEBUG_ASSERT(*out_actual == 0);
size_t total_capacity;
zx_status_t status = user_data.GetTotalCapacity(&total_capacity);
if (status != ZX_OK) {
return status;
}
// Return early if writing zero bytes since there's nothing to do.
if (total_capacity == 0) {
return ZX_OK;
}
const bool can_resize_vmo = CanResizeVmo();
size_t length = 0u;
uint64_t offset = 0u;
ContentSizeManager::Operation op;
Guard<Mutex> seek_guard{&seek_lock_};
// This section expands the VMO if necessary and bumps the |seek_| pointer if successful.
{
Guard<Mutex> content_size_guard{vmo_->content_size_manager()->lock()};
status =
vmo_->content_size_manager()->BeginAppendLocked(total_capacity, &content_size_guard, &op);
if (status != ZX_OK) {
return status;
}
op.AssertParentLockHeld();
uint64_t new_content_size = op.GetSizeLocked();
offset = new_content_size - total_capacity;
uint64_t vmo_size = 0u;
status = vmo_->ExpandIfNecessary(new_content_size, can_resize_vmo, &vmo_size);
if (status != ZX_OK) {
if (vmo_size <= offset) {
// Unable to expand to requested size and cannot even perform partial write.
op.CancelLocked();
// Return `ZX_ERR_OUT_OF_RANGE` for range errors. Otherwise, clients expect all other errors
// related to resize failure to be `ZX_ERR_NO_SPACE`.
return status == ZX_ERR_OUT_OF_RANGE ? status : ZX_ERR_NO_SPACE;
}
}
DEBUG_ASSERT(vmo_size > offset);
if (vmo_size < new_content_size) {
// Unable to expand to requested size but able to perform a partial write.
op.ShrinkSizeLocked(vmo_size);
}
length = ktl::min(vmo_size, new_content_size) - offset;
}
status = vmo_->WriteVector(user_data, length, offset, out_actual,
[&op](const uint64_t write_offset, const size_t len) {
op.UpdateContentSizeFromProgress(write_offset + len);
});
seek_ = offset + *out_actual;
// Reacquire the lock to potentially shrink and commit the operation.
Guard<Mutex> content_size_guard{vmo_->content_size_manager()->lock()};
op.AssertParentLockHeld();
// Update the content size operation if operation was partially successful.
if (*out_actual < length) {
DEBUG_ASSERT(status != ZX_OK);
if (*out_actual == 0) {
// Do not commit the operation if nothing was written.
op.CancelLocked();
return status;
} else {
op.ShrinkSizeLocked(offset + *out_actual);
}
}
op.CommitLocked();
return *out_actual > 0 ? ZX_OK : status;
}
zx_status_t StreamDispatcher::Seek(zx_stream_seek_origin_t whence, int64_t offset,
zx_off_t* out_seek) {
canary_.Assert();
Guard<Mutex> seek_guard{&seek_lock_};
zx_off_t target;
switch (whence) {
case ZX_STREAM_SEEK_ORIGIN_START: {
if (offset < 0) {
return ZX_ERR_INVALID_ARGS;
}
target = static_cast<zx_off_t>(offset);
break;
}
case ZX_STREAM_SEEK_ORIGIN_CURRENT: {
if (add_overflow(seek_, offset, &target)) {
return ZX_ERR_INVALID_ARGS;
}
break;
}
case ZX_STREAM_SEEK_ORIGIN_END: {
uint64_t content_size = vmo_->content_size_manager()->GetContentSize();
if (add_overflow(content_size, offset, &target)) {
return ZX_ERR_INVALID_ARGS;
}
break;
}
default: {
return ZX_ERR_INVALID_ARGS;
}
}
seek_ = target;
*out_seek = seek_;
return ZX_OK;
}
zx_status_t StreamDispatcher::SetAppendMode(bool value) {
Guard<CriticalMutex> guard{get_lock()};
options_ = (options_ & ~kModeAppend) | (value ? kModeAppend : 0);
return ZX_OK;
}
bool StreamDispatcher::IsInAppendMode() const {
Guard<CriticalMutex> guard{get_lock()};
return options_ & kModeAppend;
}
void StreamDispatcher::GetInfo(zx_info_stream_t* info) const {
canary_.Assert();
Guard<CriticalMutex> options_guard{get_lock()};
Guard<Mutex> seek_guard{&seek_lock_};
info->options = 0;
if (options_ & kModeRead) {
info->options |= ZX_STREAM_MODE_READ;
}
if (options_ & kModeWrite) {
info->options |= ZX_STREAM_MODE_WRITE;
}
if (options_ & kModeAppend) {
info->options |= ZX_STREAM_MODE_APPEND;
}
info->seek = seek_;
info->content_size = vmo_->content_size_manager()->GetContentSize();
}
bool StreamDispatcher::CanResizeVmo() const {
Guard<CriticalMutex> guard{get_lock()};
return options_ & kCanResizeVmo;
}
zx_status_t StreamDispatcher::CreateWriteOpAndExpandVmo(
size_t total_capacity, zx_off_t offset, uint64_t* out_length,
ktl::optional<uint64_t>* out_prev_content_size, ContentSizeManager::Operation* out_op) {
DEBUG_ASSERT(out_op);
DEBUG_ASSERT(out_length);
zx_status_t status = ZX_OK;
const bool can_resize_vmo = CanResizeVmo();
{
Guard<Mutex> content_size_guard{vmo_->content_size_manager()->lock()};
size_t requested_content_size;
if (add_overflow(offset, total_capacity, &requested_content_size)) {
return ZX_ERR_FILE_BIG;
}
vmo_->content_size_manager()->BeginWriteLocked(requested_content_size, &content_size_guard,
out_prev_content_size, out_op);
uint64_t vmo_size = 0u;
status = vmo_->ExpandIfNecessary(requested_content_size, can_resize_vmo, &vmo_size);
if (status != ZX_OK) {
if (vmo_size <= offset) {
// Unable to expand to requested size and cannot even perform partial write.
out_op->AssertParentLockHeld();
out_op->CancelLocked();
// Return `ZX_ERR_OUT_OF_RANGE` for range errors. Otherwise, clients expect all other errors
// related to resize failure to be `ZX_ERR_NO_SPACE`.
return status == ZX_ERR_OUT_OF_RANGE ? status : ZX_ERR_NO_SPACE;
}
}
DEBUG_ASSERT(vmo_size > offset);
// Allow writing up to the minimum of the VMO size and requested content size, since we want to
// write at most the requested size but don't want to write beyond the VMO size.
const uint64_t target_content_size = ktl::min(vmo_size, requested_content_size);
*out_length = target_content_size - offset;
if (target_content_size != requested_content_size) {
out_op->AssertParentLockHeld();
out_op->ShrinkSizeLocked(target_content_size);
}
}
// Zero content between the previous content size and the start of the write.
if (out_prev_content_size->has_value() && out_prev_content_size->value() < offset) {
status = vmo_->vmo()->ZeroRange(out_prev_content_size->value(),
offset - out_prev_content_size->value());
if (status != ZX_OK) {
Guard<Mutex> content_size_guard{out_op->parent()->lock()};
out_op->CancelLocked();
return status;
}
}
return ZX_OK;
}