| // Copyright 2020 The Fuchsia Authors |
| // |
| // Use of this source code is governed by a MIT-style |
| // license that can be found in the LICENSE file or at |
| // https://opensource.org/licenses/MIT |
| |
| #include "object/stream_dispatcher.h" |
| |
| #include <lib/counters.h> |
| #include <zircon/errors.h> |
| #include <zircon/rights.h> |
| #include <zircon/types.h> |
| |
| #include <fbl/alloc_checker.h> |
| #include <ktl/algorithm.h> |
| #include <ktl/atomic.h> |
| #include <object/vm_object_dispatcher.h> |
| |
| #include <ktl/enforce.h> |
| |
| KCOUNTER(dispatcher_stream_create_count, "dispatcher.stream.create") |
| KCOUNTER(dispatcher_stream_destroy_count, "dispatcher.stream.destroy") |
| |
| // static |
| zx_status_t StreamDispatcher::parse_create_syscall_flags(uint32_t flags, uint32_t* out_flags, |
| zx_rights_t* out_required_vmo_rights) { |
| uint32_t res = 0; |
| zx_rights_t required_vmo_rights = ZX_RIGHT_NONE; |
| if (flags & ZX_STREAM_MODE_READ) { |
| res |= kModeRead; |
| required_vmo_rights |= ZX_RIGHT_READ; |
| flags &= ~ZX_STREAM_MODE_READ; |
| } |
| if (flags & ZX_STREAM_MODE_WRITE) { |
| res |= kModeWrite; |
| required_vmo_rights |= ZX_RIGHT_WRITE; |
| flags &= ~ZX_STREAM_MODE_WRITE; |
| } |
| if (flags & ZX_STREAM_MODE_APPEND) { |
| res |= kModeAppend; |
| flags &= ~ZX_STREAM_MODE_APPEND; |
| } |
| |
| if (flags) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| |
| *out_flags = res; |
| *out_required_vmo_rights = required_vmo_rights; |
| return ZX_OK; |
| } |
| |
| // static |
| zx_status_t StreamDispatcher::Create(uint32_t options, fbl::RefPtr<VmObjectPaged> vmo, |
| fbl::RefPtr<StreamSizeManager> ssm, zx_off_t seek, |
| KernelHandle<StreamDispatcher>* handle, zx_rights_t* rights) { |
| fbl::AllocChecker ac; |
| KernelHandle new_handle( |
| fbl::AdoptRef(new (&ac) StreamDispatcher(options, ktl::move(vmo), ktl::move(ssm), seek))); |
| if (!ac.check()) { |
| return ZX_ERR_NO_MEMORY; |
| } |
| |
| zx_rights_t new_rights = default_rights(); |
| |
| if (options & kModeRead) { |
| new_rights |= ZX_RIGHT_READ; |
| } |
| if (options & kModeWrite) { |
| new_rights |= ZX_RIGHT_WRITE; |
| } |
| |
| *rights = new_rights; |
| *handle = ktl::move(new_handle); |
| return ZX_OK; |
| } |
| |
| StreamDispatcher::StreamDispatcher(uint32_t options, fbl::RefPtr<VmObjectPaged> vmo, |
| fbl::RefPtr<StreamSizeManager> stream_size_mgr, zx_off_t seek) |
| : options_(options), |
| vmo_(ktl::move(vmo)), |
| stream_size_mgr_(ktl::move(stream_size_mgr)), |
| seek_(seek) { |
| kcounter_add(dispatcher_stream_create_count, 1); |
| (void)options_; |
| } |
| |
| StreamDispatcher::~StreamDispatcher() { kcounter_add(dispatcher_stream_destroy_count, 1); } |
| |
| ktl::pair<zx_status_t, size_t> StreamDispatcher::ReadVector(user_out_iovec_t user_data) { |
| canary_.Assert(); |
| |
| size_t total_capacity = 0; |
| { |
| zx_status_t status = user_data.GetTotalCapacity(&total_capacity); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| if (total_capacity == 0) { |
| return {ZX_OK, 0}; |
| } |
| } |
| |
| size_t length = 0u; |
| uint64_t offset = 0u; |
| StreamSizeManager::Operation op(stream_size_mgr_.get()); |
| |
| Guard<Mutex> seek_guard{&seek_lock_}; |
| { |
| Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()}; |
| |
| uint64_t size_limit = 0u; |
| stream_size_mgr_->BeginReadLocked(seek_ + total_capacity, &size_limit, &op); |
| if (size_limit <= seek_) { |
| // Return |ZX_OK| since there is nothing to be read. |
| op.CancelLocked(); |
| return {ZX_OK, 0}; |
| } |
| |
| offset = seek_; |
| length = size_limit - offset; |
| } |
| |
| auto [status, read_bytes] = vmo_->ReadUserVector(user_data, offset, length); |
| seek_ += read_bytes; |
| |
| // Reacquire the lock to commit the operation. |
| Guard<Mutex> stream_size_guard{op.lock()}; |
| op.CommitLocked(); |
| |
| return {read_bytes > 0 ? ZX_OK : status, read_bytes}; |
| } |
| |
| ktl::pair<zx_status_t, size_t> StreamDispatcher::ReadVectorAt(user_out_iovec_t user_data, |
| zx_off_t offset) { |
| canary_.Assert(); |
| |
| size_t total_capacity = 0; |
| { |
| zx_status_t status = user_data.GetTotalCapacity(&total_capacity); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| if (total_capacity == 0) { |
| return {ZX_OK, 0}; |
| } |
| } |
| |
| size_t length = 0u; |
| StreamSizeManager::Operation op(stream_size_mgr_.get()); |
| |
| { |
| Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()}; |
| |
| uint64_t size_limit = 0u; |
| stream_size_mgr_->BeginReadLocked(offset + total_capacity, &size_limit, &op); |
| if (size_limit <= offset) { |
| // Return |ZX_OK| since there is nothing to be read. |
| op.CancelLocked(); |
| return {ZX_OK, 0}; |
| } |
| |
| length = size_limit - offset; |
| } |
| |
| auto [status, read_bytes] = vmo_->ReadUserVector(user_data, offset, length); |
| |
| // Reacquire the lock to commit the operation. |
| Guard<Mutex> stream_size_guard{op.lock()}; |
| op.CommitLocked(); |
| |
| return {read_bytes > 0 ? ZX_OK : status, read_bytes}; |
| } |
| |
| ktl::pair<zx_status_t, size_t> StreamDispatcher::WriteVector(user_in_iovec_t user_data) { |
| canary_.Assert(); |
| |
| if (IsInAppendMode()) { |
| return AppendVector(user_data); |
| } |
| |
| size_t total_capacity = 0; |
| { |
| zx_status_t status = user_data.GetTotalCapacity(&total_capacity); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| |
| // Return early if writing zero bytes since there's nothing to do. |
| if (total_capacity == 0) { |
| return {ZX_OK, 0}; |
| } |
| } |
| |
| size_t length = 0u; |
| StreamSizeManager::Operation op(stream_size_mgr_.get()); |
| ktl::optional<uint64_t> prev_stream_size; |
| |
| Guard<Mutex> seek_guard{&seek_lock_}; |
| |
| { |
| zx_status_t status = CreateWriteOp(total_capacity, seek_, &length, &prev_stream_size, &op); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| } |
| |
| auto [status, written] = vmo_->WriteUserVector( |
| user_data, seek_, length, |
| prev_stream_size ? [&prev_stream_size, &op](const uint64_t write_offset, const size_t len) { |
| if (write_offset + len > *prev_stream_size) { |
| op.UpdateStreamSizeFromProgress(write_offset + len); |
| } |
| } : VmObject::OnWriteBytesTransferredCallback()); |
| |
| // Reacquire the lock to potentially shrink and commit the operation. |
| Guard<Mutex> stream_size_guard{op.lock()}; |
| |
| // Update the stream size operation if operation was partially successful. |
| if (written < length) { |
| DEBUG_ASSERT(status != ZX_OK); |
| |
| if (written == 0u) { |
| // Do not commit the operation if nothing was written. |
| op.CancelLocked(); |
| return {status, written}; |
| } else { |
| op.ShrinkSizeLocked(seek_ + written); |
| } |
| } |
| |
| seek_ += written; |
| |
| op.CommitLocked(); |
| return {written > 0 ? ZX_OK : status, written}; |
| } |
| |
| ktl::pair<zx_status_t, size_t> StreamDispatcher::WriteVectorAt(user_in_iovec_t user_data, |
| zx_off_t offset) { |
| canary_.Assert(); |
| |
| size_t total_capacity = 0; |
| { |
| zx_status_t status = user_data.GetTotalCapacity(&total_capacity); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| |
| // Return early if writing zero bytes |
| if (total_capacity == 0) { |
| return {ZX_OK, 0}; |
| } |
| } |
| |
| size_t length = 0u; |
| StreamSizeManager::Operation op(stream_size_mgr_.get()); |
| ktl::optional<uint64_t> prev_stream_size; |
| |
| { |
| zx_status_t status = CreateWriteOp(total_capacity, offset, &length, &prev_stream_size, &op); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| } |
| |
| auto [status, written] = vmo_->WriteUserVector( |
| user_data, offset, length, |
| prev_stream_size ? [&prev_stream_size, &op](const uint64_t write_offset, const size_t len) { |
| if (write_offset + len > *prev_stream_size) { |
| op.UpdateStreamSizeFromProgress(write_offset + len); |
| } |
| } : VmObject::OnWriteBytesTransferredCallback()); |
| |
| // Reacquire the lock to potentially shrink and commit the operation. |
| Guard<Mutex> stream_size_guard{op.lock()}; |
| |
| // Update the stream size operation if operation was partially successful. |
| if (written < length) { |
| DEBUG_ASSERT(status != ZX_OK); |
| |
| if (written == 0u) { |
| // Do not commit the operation if nothing was written. |
| op.CancelLocked(); |
| return {status, written}; |
| } else { |
| op.ShrinkSizeLocked(offset + written); |
| } |
| } |
| |
| op.CommitLocked(); |
| return {written > 0 ? ZX_OK : status, written}; |
| } |
| |
| ktl::pair<zx_status_t, size_t> StreamDispatcher::AppendVector(user_in_iovec_t user_data) { |
| canary_.Assert(); |
| |
| size_t total_capacity = 0; |
| { |
| zx_status_t status = user_data.GetTotalCapacity(&total_capacity); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| |
| // Return early if writing zero bytes since there's nothing to do. |
| if (total_capacity == 0) { |
| return {ZX_OK, 0}; |
| } |
| } |
| |
| size_t length = 0u; |
| uint64_t offset = 0u; |
| StreamSizeManager::Operation op(stream_size_mgr_.get()); |
| Guard<Mutex> seek_guard{&seek_lock_}; |
| |
| // This section expands the VMO if necessary and bumps the |seek_| pointer if successful. |
| { |
| Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()}; |
| |
| zx_status_t status = |
| stream_size_mgr_->BeginAppendLocked(total_capacity, &stream_size_guard, &op); |
| if (status != ZX_OK) { |
| return {status, 0}; |
| } |
| |
| uint64_t new_stream_size = op.GetSizeLocked(); |
| |
| offset = new_stream_size - total_capacity; |
| |
| uint64_t vmo_size = vmo_->size(); |
| if (vmo_size <= offset) { |
| // We can't even perform a partial write |
| op.CancelLocked(); |
| return {ZX_ERR_OUT_OF_RANGE, 0}; |
| } |
| |
| if (vmo_size < new_stream_size) { |
| // Unable to expand to requested size but able to perform a partial write. |
| op.ShrinkSizeLocked(vmo_size); |
| } |
| |
| length = ktl::min(vmo_size, new_stream_size) - offset; |
| } |
| |
| auto [status, written] = vmo_->WriteUserVector( |
| user_data, offset, length, [&op](const uint64_t write_offset, const size_t len) { |
| op.UpdateStreamSizeFromProgress(write_offset + len); |
| }); |
| seek_ = offset + written; |
| |
| // Reacquire the lock to potentially shrink and commit the operation. |
| Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), op.lock()}; |
| |
| // Update the stream size operation if operation was partially successful. |
| if (written < length) { |
| DEBUG_ASSERT(status != ZX_OK); |
| |
| if (written == 0) { |
| // Do not commit the operation if nothing was written. |
| op.CancelLocked(); |
| return {status, written}; |
| } else { |
| op.ShrinkSizeLocked(offset + written); |
| } |
| } |
| |
| op.CommitLocked(); |
| return {written > 0 ? ZX_OK : status, written}; |
| } |
| |
| zx_status_t StreamDispatcher::Seek(zx_stream_seek_origin_t whence, int64_t offset, |
| zx_off_t* out_seek) { |
| canary_.Assert(); |
| |
| Guard<Mutex> seek_guard{&seek_lock_}; |
| |
| zx_off_t target; |
| switch (whence) { |
| case ZX_STREAM_SEEK_ORIGIN_START: { |
| if (offset < 0) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| target = static_cast<zx_off_t>(offset); |
| break; |
| } |
| case ZX_STREAM_SEEK_ORIGIN_CURRENT: { |
| if (add_overflow(seek_, offset, &target)) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| break; |
| } |
| case ZX_STREAM_SEEK_ORIGIN_END: { |
| uint64_t stream_size = stream_size_mgr_->GetStreamSize(); |
| if (add_overflow(stream_size, offset, &target)) { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| break; |
| } |
| default: { |
| return ZX_ERR_INVALID_ARGS; |
| } |
| } |
| |
| seek_ = target; |
| *out_seek = seek_; |
| return ZX_OK; |
| } |
| |
| zx_status_t StreamDispatcher::SetAppendMode(bool value) { |
| Guard<CriticalMutex> guard{get_lock()}; |
| options_ = (options_ & ~kModeAppend) | (value ? kModeAppend : 0); |
| return ZX_OK; |
| } |
| |
| bool StreamDispatcher::IsInAppendMode() const { |
| Guard<CriticalMutex> guard{get_lock()}; |
| return options_ & kModeAppend; |
| } |
| |
| zx_info_stream_t StreamDispatcher::GetInfo() const { |
| canary_.Assert(); |
| |
| Guard<CriticalMutex> options_guard{get_lock()}; |
| Guard<Mutex> seek_guard{&seek_lock_}; |
| |
| uint32_t options = 0; |
| if (options_ & kModeRead) { |
| options |= ZX_STREAM_MODE_READ; |
| } |
| if (options_ & kModeWrite) { |
| options |= ZX_STREAM_MODE_WRITE; |
| } |
| if (options_ & kModeAppend) { |
| options |= ZX_STREAM_MODE_APPEND; |
| } |
| |
| return { |
| .options = options, |
| .seek = seek_, |
| // |content_size| is the legacy name for the stream size |
| .content_size = stream_size_mgr_->GetStreamSize(), |
| }; |
| } |
| |
| bool StreamDispatcher::CanResizeVmo() const { |
| Guard<CriticalMutex> guard{get_lock()}; |
| return options_ & kCanResizeVmo; |
| } |
| |
| zx_status_t StreamDispatcher::CreateWriteOp(size_t total_capacity, zx_off_t offset, |
| uint64_t* out_length, |
| ktl::optional<uint64_t>* out_prev_stream_size, |
| StreamSizeManager::Operation* out_op) { |
| DEBUG_ASSERT(out_op); |
| DEBUG_ASSERT(out_length); |
| |
| zx_status_t status = ZX_OK; |
| |
| { |
| Guard<Mutex> stream_size_guard{AliasedLock, stream_size_mgr_->lock(), out_op->lock()}; |
| |
| size_t requested_stream_size; |
| if (add_overflow(offset, total_capacity, &requested_stream_size)) { |
| return ZX_ERR_FILE_BIG; |
| } |
| |
| stream_size_mgr_->BeginWriteLocked(requested_stream_size, &stream_size_guard, |
| out_prev_stream_size, out_op); |
| |
| uint64_t vmo_size = vmo_->size(); |
| if (vmo_size <= offset) { |
| // We can't even perform a partial write |
| out_op->CancelLocked(); |
| return ZX_ERR_OUT_OF_RANGE; |
| } |
| |
| // Allow writing up to the minimum of the VMO size and requested stream size, since we want to |
| // write at most the requested size but don't want to write beyond the VMO size. |
| const uint64_t target_stream_size = ktl::min(vmo_size, requested_stream_size); |
| *out_length = target_stream_size - offset; |
| |
| if (target_stream_size != requested_stream_size) { |
| out_op->ShrinkSizeLocked(target_stream_size); |
| } |
| } |
| |
| // Zero content between the previous stream size and the start of the write. |
| if (out_prev_stream_size->has_value() && out_prev_stream_size->value() < offset) { |
| status = vmo_->ZeroRange(out_prev_stream_size->value(), offset - out_prev_stream_size->value()); |
| if (status != ZX_OK) { |
| Guard<Mutex> stream_size_guard{out_op->lock()}; |
| out_op->CancelLocked(); |
| return status; |
| } |
| } |
| |
| return ZX_OK; |
| } |