| /* |
| * Copyright (c) 2009-2013 Apple Inc. All rights reserved. |
| * |
| * @APPLE_APACHE_LICENSE_HEADER_START@ |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| * @APPLE_APACHE_LICENSE_HEADER_END@ |
| */ |
| |
| #include "internal.h" |
| |
| #ifndef DISPATCH_IO_DEBUG |
| #define DISPATCH_IO_DEBUG DISPATCH_DEBUG |
| #endif |
| |
| #ifndef PAGE_SIZE |
| #define PAGE_SIZE ((size_t)getpagesize()) |
| #endif |
| |
| #if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA |
| #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x) |
| #define _dispatch_io_data_release(x) _dispatch_objc_release(x) |
| #else |
| #define _dispatch_io_data_retain(x) dispatch_retain(x) |
| #define _dispatch_io_data_release(x) dispatch_release(x) |
| #endif |
| |
| typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry); |
| |
| DISPATCH_EXPORT DISPATCH_NOTHROW |
| void _dispatch_iocntl(uint32_t param, uint64_t value); |
| |
| static dispatch_operation_t _dispatch_operation_create( |
| dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset, |
| size_t length, dispatch_data_t data, dispatch_queue_t queue, |
| dispatch_io_handler_t handler); |
| static void _dispatch_operation_enqueue(dispatch_operation_t op, |
| dispatch_op_direction_t direction, dispatch_data_t data); |
| static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq, |
| dispatch_operation_t op); |
| static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry); |
| static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry); |
| static void _dispatch_fd_entry_init_async(dispatch_fd_t fd, |
| dispatch_fd_entry_init_callback_t completion_callback); |
| static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, |
| uintptr_t hash); |
| static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path( |
| dispatch_io_path_data_t path_data, dev_t dev, mode_t mode); |
| static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, |
| dispatch_io_t channel); |
| static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, |
| dispatch_io_t channel); |
| static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry, |
| dispatch_queue_t tq); |
| static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, |
| dispatch_op_direction_t direction); |
| static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev); |
| static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream, |
| dispatch_operation_t operation, dispatch_data_t data); |
| static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk, |
| dispatch_operation_t operation, dispatch_data_t data); |
| static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream, |
| dispatch_io_t channel); |
| static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk, |
| dispatch_io_t channel); |
| static void _dispatch_stream_source_handler(void *ctx); |
| static void _dispatch_stream_queue_handler(void *ctx); |
| static void _dispatch_stream_handler(void *ctx); |
| static void _dispatch_disk_handler(void *ctx); |
| static void _dispatch_disk_perform(void *ctxt); |
| static void _dispatch_operation_advise(dispatch_operation_t op, |
| size_t chunk_size); |
| static int _dispatch_operation_perform(dispatch_operation_t op); |
| static void _dispatch_operation_deliver_data(dispatch_operation_t op, |
| dispatch_op_flags_t flags); |
| |
| // Macros to wrap syscalls which return -1 on error, and retry on EINTR |
| #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \ |
| switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \ |
| case EINTR: continue; \ |
| __VA_ARGS__ \ |
| } \ |
| break; \ |
| } while (1) |
| #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \ |
| _dispatch_io_syscall_switch_noerr(__err, __syscall, \ |
| case 0: break; \ |
| __VA_ARGS__ \ |
| ); \ |
| } while (0) |
| #define _dispatch_io_syscall(__syscall) do { int __err; \ |
| _dispatch_io_syscall_switch(__err, __syscall); \ |
| } while (0) |
| |
| enum { |
| DISPATCH_OP_COMPLETE = 1, |
| DISPATCH_OP_DELIVER, |
| DISPATCH_OP_DELIVER_AND_COMPLETE, |
| DISPATCH_OP_COMPLETE_RESUME, |
| DISPATCH_OP_RESUME, |
| DISPATCH_OP_ERR, |
| DISPATCH_OP_FD_ERR, |
| }; |
| |
| #define _dispatch_io_Block_copy(x) \ |
| ((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x))) |
| |
| #pragma mark - |
| #pragma mark dispatch_io_debug |
| |
| #if DISPATCH_IO_DEBUG |
| #if !DISPATCH_DEBUG |
| #define _dispatch_io_log(x, ...) do { \ |
| _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \ |
| (void *)_dispatch_thread_self(), ##__VA_ARGS__); \ |
| } while (0) |
| #ifdef _dispatch_object_debug |
| #undef _dispatch_object_debug |
| #define _dispatch_object_debug dispatch_debug |
| #pragma clang diagnostic ignored "-Wdeprecated-declarations" |
| #endif |
| #else |
| #define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__) |
| #endif // DISPATCH_DEBUG |
| #else |
| #define _dispatch_io_log(x, ...) |
| #endif // DISPATCH_IO_DEBUG |
| |
| #define _dispatch_fd_debug(msg, fd, ...) \ |
| _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__) |
| #define _dispatch_op_debug(msg, op, ...) \ |
| _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__) |
| #define _dispatch_channel_debug(msg, channel, ...) \ |
| _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__) |
| #define _dispatch_fd_entry_debug(msg, fd_entry, ...) \ |
| _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__) |
| #define _dispatch_disk_debug(msg, disk, ...) \ |
| _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__) |
| |
| #pragma mark - |
| #pragma mark dispatch_io_hashtables |
| |
| // Global hashtable of dev_t -> disk_s mappings |
| DISPATCH_CACHELINE_ALIGN |
| static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE]; |
| // Global hashtable of fd -> fd_entry_s mappings |
| DISPATCH_CACHELINE_ALIGN |
| static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE]; |
| |
| static dispatch_once_t _dispatch_io_devs_lockq_pred; |
| static dispatch_queue_t _dispatch_io_devs_lockq; |
| static dispatch_queue_t _dispatch_io_fds_lockq; |
| |
| static char const * const _dispatch_io_key = "io"; |
| |
| static void |
| _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED) |
| { |
| _dispatch_io_fds_lockq = dispatch_queue_create( |
| "com.apple.libdispatch-io.fd_lockq", NULL); |
| unsigned int i; |
| for (i = 0; i < DIO_HASH_SIZE; i++) { |
| TAILQ_INIT(&_dispatch_io_fds[i]); |
| } |
| } |
| |
| static void |
| _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED) |
| { |
| _dispatch_io_devs_lockq = dispatch_queue_create( |
| "com.apple.libdispatch-io.dev_lockq", NULL); |
| unsigned int i; |
| for (i = 0; i < DIO_HASH_SIZE; i++) { |
| TAILQ_INIT(&_dispatch_io_devs[i]); |
| } |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_defaults |
| |
| enum { |
| DISPATCH_IOCNTL_CHUNK_PAGES = 1, |
| DISPATCH_IOCNTL_LOW_WATER_CHUNKS, |
| DISPATCH_IOCNTL_INITIAL_DELIVERY, |
| DISPATCH_IOCNTL_MAX_PENDING_IO_REQS, |
| }; |
| |
| static struct dispatch_io_defaults_s { |
| size_t chunk_size, low_water_chunks, max_pending_io_reqs; |
| bool initial_delivery; |
| } dispatch_io_defaults = { |
| .chunk_size = DIO_MAX_CHUNK_SIZE, |
| .low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS, |
| .max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS, |
| }; |
| |
| #define _dispatch_iocntl_set_default(p, v) do { \ |
| dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \ |
| } while (0) |
| |
| void |
| _dispatch_iocntl(uint32_t param, uint64_t value) |
| { |
| switch (param) { |
| case DISPATCH_IOCNTL_CHUNK_PAGES: |
| _dispatch_iocntl_set_default(chunk_size, value * PAGE_SIZE); |
| break; |
| case DISPATCH_IOCNTL_LOW_WATER_CHUNKS: |
| _dispatch_iocntl_set_default(low_water_chunks, value); |
| break; |
| case DISPATCH_IOCNTL_INITIAL_DELIVERY: |
| _dispatch_iocntl_set_default(initial_delivery, value); |
| case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS: |
| _dispatch_iocntl_set_default(max_pending_io_reqs, value); |
| break; |
| } |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_t |
| |
| static dispatch_io_t |
| _dispatch_io_create(dispatch_io_type_t type) |
| { |
| dispatch_io_t channel = _dispatch_object_alloc(DISPATCH_VTABLE(io), |
| sizeof(struct dispatch_io_s)); |
| channel->do_next = DISPATCH_OBJECT_LISTLESS; |
| channel->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true); |
| channel->params.type = type; |
| channel->params.high = SIZE_MAX; |
| channel->params.low = dispatch_io_defaults.low_water_chunks * |
| dispatch_io_defaults.chunk_size; |
| channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq", |
| NULL); |
| return channel; |
| } |
| |
| static void |
| _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry, |
| dispatch_queue_t queue, int err, void (^cleanup_handler)(int)) |
| { |
| // Enqueue the cleanup handler on the suspended close queue |
| if (cleanup_handler) { |
| _dispatch_retain(queue); |
| dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{ |
| dispatch_async(queue, ^{ |
| _dispatch_channel_debug("cleanup handler invoke: err %d", |
| channel, err); |
| cleanup_handler(err); |
| }); |
| _dispatch_release(queue); |
| }); |
| } |
| if (fd_entry) { |
| channel->fd_entry = fd_entry; |
| dispatch_retain(fd_entry->barrier_queue); |
| dispatch_retain(fd_entry->barrier_group); |
| channel->barrier_queue = fd_entry->barrier_queue; |
| channel->barrier_group = fd_entry->barrier_group; |
| } else { |
| // Still need to create a barrier queue, since all operations go |
| // through it |
| channel->barrier_queue = dispatch_queue_create( |
| "com.apple.libdispatch-io.barrierq", NULL); |
| channel->barrier_group = dispatch_group_create(); |
| } |
| } |
| |
| void |
| _dispatch_io_dispose(dispatch_io_t channel, DISPATCH_UNUSED bool *allow_free) |
| { |
| _dispatch_object_debug(channel, "%s", __func__); |
| if (channel->fd_entry && |
| !(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { |
| if (channel->fd_entry->path_data) { |
| // This modification is safe since path_data->channel is checked |
| // only on close_queue (which is still suspended at this point) |
| channel->fd_entry->path_data->channel = NULL; |
| } |
| // Cleanup handlers will only run when all channels related to this |
| // fd are complete |
| _dispatch_fd_entry_release(channel->fd_entry); |
| } |
| if (channel->queue) { |
| dispatch_release(channel->queue); |
| } |
| if (channel->barrier_queue) { |
| dispatch_release(channel->barrier_queue); |
| } |
| if (channel->barrier_group) { |
| dispatch_release(channel->barrier_group); |
| } |
| } |
| |
| static int |
| _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode) |
| { |
| int err = 0; |
| if (S_ISDIR(mode)) { |
| err = EISDIR; |
| } else if (channel->params.type == DISPATCH_IO_RANDOM && |
| (S_ISFIFO(mode) || S_ISSOCK(mode))) { |
| err = ESPIPE; |
| } |
| return err; |
| } |
| |
| static int |
| _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel, |
| bool ignore_closed) |
| { |
| // On _any_ queue |
| int err; |
| if (op) { |
| channel = op->channel; |
| } |
| if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { |
| if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) { |
| err = ECANCELED; |
| } else { |
| err = 0; |
| } |
| } else { |
| err = op ? op->fd_entry->err : channel->err; |
| } |
| return err; |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_channels |
| |
| dispatch_io_t |
| dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, |
| dispatch_queue_t queue, void (^cleanup_handler)(int)) |
| { |
| if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { |
| return DISPATCH_BAD_INPUT; |
| } |
| dispatch_io_t channel = _dispatch_io_create(type); |
| channel->fd = fd; |
| _dispatch_channel_debug("create", channel); |
| channel->fd_actual = fd; |
| dispatch_suspend(channel->queue); |
| _dispatch_retain(queue); |
| _dispatch_retain(channel); |
| _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { |
| // On barrier queue |
| int err = fd_entry->err; |
| if (!err) { |
| err = _dispatch_io_validate_type(channel, fd_entry->stat.mode); |
| } |
| if (!err && type == DISPATCH_IO_RANDOM) { |
| off_t f_ptr; |
| _dispatch_io_syscall_switch_noerr(err, |
| f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR), |
| case 0: channel->f_ptr = f_ptr; break; |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| } |
| channel->err = err; |
| _dispatch_fd_entry_retain(fd_entry); |
| _dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| }); |
| _dispatch_object_debug(channel, "%s", __func__); |
| return channel; |
| } |
| |
| dispatch_io_t |
| dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd, |
| dispatch_queue_t queue, void *context, |
| void (*cleanup_handler)(void *context, int error)) |
| { |
| return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL : |
| ^(int error){ cleanup_handler(context, error); }); |
| } |
| |
| dispatch_io_t |
| dispatch_io_create_with_path(dispatch_io_type_t type, const char *path, |
| int oflag, mode_t mode, dispatch_queue_t queue, |
| void (^cleanup_handler)(int error)) |
| { |
| if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) || |
| !(*path == '/')) { |
| return DISPATCH_BAD_INPUT; |
| } |
| size_t pathlen = strlen(path); |
| dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1); |
| if (!path_data) { |
| return DISPATCH_OUT_OF_MEMORY; |
| } |
| dispatch_io_t channel = _dispatch_io_create(type); |
| channel->fd = -1; |
| _dispatch_channel_debug("create with path %s", channel, path); |
| channel->fd_actual = -1; |
| path_data->channel = channel; |
| path_data->oflag = oflag; |
| path_data->mode = mode; |
| path_data->pathlen = pathlen; |
| memcpy(path_data->path, path, pathlen + 1); |
| _dispatch_retain(queue); |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| int err = 0; |
| struct stat st; |
| _dispatch_io_syscall_switch_noerr(err, |
| (path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW |
| #ifndef __linux__ |
| || (path_data->oflag & O_SYMLINK) == O_SYMLINK |
| #endif |
| ? lstat(path_data->path, &st) : stat(path_data->path, &st), |
| case 0: |
| err = _dispatch_io_validate_type(channel, st.st_mode); |
| break; |
| default: |
| if ((path_data->oflag & O_CREAT) && |
| (*(path_data->path + path_data->pathlen - 1) != '/')) { |
| // Check parent directory |
| char *c = strrchr(path_data->path, '/'); |
| dispatch_assert(c); |
| *c = 0; |
| int perr; |
| _dispatch_io_syscall_switch_noerr(perr, |
| stat(path_data->path, &st), |
| case 0: |
| // Since the parent directory exists, open() will |
| // create a regular file after the fd_entry has |
| // been filled in |
| st.st_mode = S_IFREG; |
| err = 0; |
| break; |
| ); |
| *c = '/'; |
| } |
| break; |
| ); |
| channel->err = err; |
| if (err) { |
| free(path_data); |
| _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| return; |
| } |
| dispatch_suspend(channel->queue); |
| dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, |
| _dispatch_io_devs_lockq_init); |
| dispatch_async(_dispatch_io_devs_lockq, ^{ |
| dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path( |
| path_data, st.st_dev, st.st_mode); |
| _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| }); |
| }); |
| _dispatch_object_debug(channel, "%s", __func__); |
| return channel; |
| } |
| |
| dispatch_io_t |
| dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path, |
| int oflag, mode_t mode, dispatch_queue_t queue, void *context, |
| void (*cleanup_handler)(void *context, int error)) |
| { |
| return dispatch_io_create_with_path(type, path, oflag, mode, queue, |
| !cleanup_handler ? NULL : |
| ^(int error){ cleanup_handler(context, error); }); |
| } |
| |
| dispatch_io_t |
| dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel, |
| dispatch_queue_t queue, void (^cleanup_handler)(int error)) |
| { |
| if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) { |
| return DISPATCH_BAD_INPUT; |
| } |
| dispatch_io_t channel = _dispatch_io_create(type); |
| _dispatch_channel_debug("create with channel %p", channel, in_channel); |
| dispatch_suspend(channel->queue); |
| _dispatch_retain(queue); |
| _dispatch_retain(channel); |
| _dispatch_retain(in_channel); |
| dispatch_async(in_channel->queue, ^{ |
| int err0 = _dispatch_io_get_error(NULL, in_channel, false); |
| if (err0) { |
| channel->err = err0; |
| _dispatch_io_init(channel, NULL, queue, err0, cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_release(channel); |
| _dispatch_release(in_channel); |
| _dispatch_release(queue); |
| return; |
| } |
| dispatch_async(in_channel->barrier_queue, ^{ |
| int err = _dispatch_io_get_error(NULL, in_channel, false); |
| // If there is no error, the fd_entry for the in_channel is valid. |
| // Since we are running on in_channel's queue, the fd_entry has been |
| // fully resolved and will stay valid for the duration of this block |
| if (!err) { |
| err = in_channel->err; |
| if (!err) { |
| err = in_channel->fd_entry->err; |
| } |
| } |
| if (!err) { |
| err = _dispatch_io_validate_type(channel, |
| in_channel->fd_entry->stat.mode); |
| } |
| if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) { |
| off_t f_ptr; |
| _dispatch_io_syscall_switch_noerr(err, |
| f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR), |
| case 0: channel->f_ptr = f_ptr; break; |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| } |
| channel->err = err; |
| if (err) { |
| _dispatch_io_init(channel, NULL, queue, err, cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_release(channel); |
| _dispatch_release(in_channel); |
| _dispatch_release(queue); |
| return; |
| } |
| if (in_channel->fd == -1) { |
| // in_channel was created from path |
| channel->fd = -1; |
| channel->fd_actual = -1; |
| mode_t mode = in_channel->fd_entry->stat.mode; |
| dev_t dev = in_channel->fd_entry->stat.dev; |
| size_t path_data_len = sizeof(struct dispatch_io_path_data_s) + |
| in_channel->fd_entry->path_data->pathlen + 1; |
| dispatch_io_path_data_t path_data = malloc(path_data_len); |
| memcpy(path_data, in_channel->fd_entry->path_data, |
| path_data_len); |
| path_data->channel = channel; |
| // lockq_io_devs is known to already exist |
| dispatch_async(_dispatch_io_devs_lockq, ^{ |
| dispatch_fd_entry_t fd_entry; |
| fd_entry = _dispatch_fd_entry_create_with_path(path_data, |
| dev, mode); |
| _dispatch_io_init(channel, fd_entry, queue, 0, |
| cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| }); |
| } else { |
| dispatch_fd_entry_t fd_entry = in_channel->fd_entry; |
| channel->fd = in_channel->fd; |
| channel->fd_actual = in_channel->fd_actual; |
| _dispatch_fd_entry_retain(fd_entry); |
| _dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler); |
| dispatch_resume(channel->queue); |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| } |
| _dispatch_release(in_channel); |
| _dispatch_object_debug(channel, "%s", __func__); |
| }); |
| }); |
| _dispatch_object_debug(channel, "%s", __func__); |
| return channel; |
| } |
| |
| dispatch_io_t |
| dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel, |
| dispatch_queue_t queue, void *context, |
| void (*cleanup_handler)(void *context, int error)) |
| { |
| return dispatch_io_create_with_io(type, in_channel, queue, |
| !cleanup_handler ? NULL : |
| ^(int error){ cleanup_handler(context, error); }); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_accessors |
| |
| void |
| dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water) |
| { |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| _dispatch_channel_debug("set high water: %zu", channel, high_water); |
| if (channel->params.low > high_water) { |
| channel->params.low = high_water; |
| } |
| channel->params.high = high_water ? high_water : 1; |
| _dispatch_release(channel); |
| }); |
| } |
| |
| void |
| dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water) |
| { |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| _dispatch_channel_debug("set low water: %zu", channel, low_water); |
| if (channel->params.high < low_water) { |
| channel->params.high = low_water ? low_water : 1; |
| } |
| channel->params.low = low_water; |
| _dispatch_release(channel); |
| }); |
| } |
| |
| void |
| dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval, |
| unsigned long flags) |
| { |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| _dispatch_channel_debug("set interval: %llu", channel, interval); |
| channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX; |
| channel->params.interval_flags = flags; |
| _dispatch_release(channel); |
| }); |
| } |
| |
| void |
| _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq) |
| { |
| _dispatch_retain(dq); |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| dispatch_queue_t prev_dq = channel->do_targetq; |
| channel->do_targetq = dq; |
| _dispatch_release(prev_dq); |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_release(channel); |
| }); |
| } |
| |
| dispatch_fd_t |
| dispatch_io_get_descriptor(dispatch_io_t channel) |
| { |
| if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { |
| return -1; |
| } |
| dispatch_fd_t fd = channel->fd_actual; |
| if (fd == -1 && !_dispatch_io_get_error(NULL, channel, false)) { |
| dispatch_thread_context_t ctxt = |
| _dispatch_thread_context_find(_dispatch_io_key); |
| if (ctxt && ctxt->dtc_io_in_barrier == channel) { |
| (void)_dispatch_fd_entry_open(channel->fd_entry, channel); |
| } |
| } |
| return channel->fd_actual; |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_operations |
| |
| static void |
| _dispatch_io_stop(dispatch_io_t channel) |
| { |
| _dispatch_channel_debug("stop", channel); |
| (void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed); |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| dispatch_async(channel->barrier_queue, ^{ |
| _dispatch_object_debug(channel, "%s", __func__); |
| dispatch_fd_entry_t fd_entry = channel->fd_entry; |
| if (fd_entry) { |
| _dispatch_channel_debug("stop cleanup", channel); |
| _dispatch_fd_entry_cleanup_operations(fd_entry, channel); |
| if (!(channel->atomic_flags & DIO_CLOSED)) { |
| if (fd_entry->path_data) { |
| fd_entry->path_data->channel = NULL; |
| } |
| channel->fd_entry = NULL; |
| _dispatch_fd_entry_release(fd_entry); |
| } |
| } else if (channel->fd != -1) { |
| // Stop after close, need to check if fd_entry still exists |
| _dispatch_retain(channel); |
| dispatch_async(_dispatch_io_fds_lockq, ^{ |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_channel_debug("stop cleanup after close", |
| channel); |
| dispatch_fd_entry_t fdi; |
| uintptr_t hash = DIO_HASH(channel->fd); |
| TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) { |
| if (fdi->fd == channel->fd) { |
| _dispatch_fd_entry_cleanup_operations(fdi, channel); |
| break; |
| } |
| } |
| _dispatch_release(channel); |
| }); |
| } |
| _dispatch_release(channel); |
| }); |
| }); |
| } |
| |
| void |
| dispatch_io_close(dispatch_io_t channel, unsigned long flags) |
| { |
| if (flags & DISPATCH_IO_STOP) { |
| // Don't stop an already stopped channel |
| if (channel->atomic_flags & DIO_STOPPED) { |
| return; |
| } |
| return _dispatch_io_stop(channel); |
| } |
| // Don't close an already closed or stopped channel |
| if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) { |
| return; |
| } |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| dispatch_async(channel->barrier_queue, ^{ |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_channel_debug("close", channel); |
| if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) { |
| (void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED, |
| relaxed); |
| dispatch_fd_entry_t fd_entry = channel->fd_entry; |
| if (fd_entry) { |
| if (fd_entry->path_data) { |
| fd_entry->path_data->channel = NULL; |
| } |
| channel->fd_entry = NULL; |
| _dispatch_fd_entry_release(fd_entry); |
| } |
| } |
| _dispatch_release(channel); |
| }); |
| }); |
| } |
| |
| void |
| dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier) |
| { |
| _dispatch_retain(channel); |
| dispatch_async(channel->queue, ^{ |
| dispatch_queue_t io_q = channel->do_targetq; |
| dispatch_queue_t barrier_queue = channel->barrier_queue; |
| dispatch_group_t barrier_group = channel->barrier_group; |
| dispatch_async(barrier_queue, ^{ |
| dispatch_suspend(barrier_queue); |
| dispatch_group_notify(barrier_group, io_q, ^{ |
| dispatch_thread_context_s io_ctxt = { |
| .dtc_key = _dispatch_io_key, |
| .dtc_io_in_barrier = channel, |
| }; |
| |
| _dispatch_object_debug(channel, "%s", __func__); |
| _dispatch_thread_context_push(&io_ctxt); |
| barrier(); |
| _dispatch_thread_context_pop(&io_ctxt); |
| dispatch_resume(barrier_queue); |
| _dispatch_release(channel); |
| }); |
| }); |
| }); |
| } |
| |
| void |
| dispatch_io_barrier_f(dispatch_io_t channel, void *context, |
| dispatch_function_t barrier) |
| { |
| return dispatch_io_barrier(channel, ^{ barrier(context); }); |
| } |
| |
| void |
| dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length, |
| dispatch_queue_t queue, dispatch_io_handler_t handler) |
| { |
| _dispatch_retain(channel); |
| _dispatch_retain(queue); |
| dispatch_async(channel->queue, ^{ |
| dispatch_operation_t op; |
| op = _dispatch_operation_create(DOP_DIR_READ, channel, offset, |
| length, dispatch_data_empty, queue, handler); |
| if (op) { |
| dispatch_queue_t barrier_q = channel->barrier_queue; |
| dispatch_async(barrier_q, ^{ |
| _dispatch_operation_enqueue(op, DOP_DIR_READ, |
| dispatch_data_empty); |
| }); |
| } |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| }); |
| } |
| |
| void |
| dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length, |
| dispatch_queue_t queue, void *context, |
| dispatch_io_handler_function_t handler) |
| { |
| return dispatch_io_read(channel, offset, length, queue, |
| ^(bool done, dispatch_data_t d, int error){ |
| handler(context, done, d, error); |
| }); |
| } |
| |
| void |
| dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data, |
| dispatch_queue_t queue, dispatch_io_handler_t handler) |
| { |
| _dispatch_io_data_retain(data); |
| _dispatch_retain(channel); |
| _dispatch_retain(queue); |
| dispatch_async(channel->queue, ^{ |
| dispatch_operation_t op; |
| op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset, |
| dispatch_data_get_size(data), data, queue, handler); |
| if (op) { |
| dispatch_queue_t barrier_q = channel->barrier_queue; |
| dispatch_async(barrier_q, ^{ |
| _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); |
| _dispatch_io_data_release(data); |
| }); |
| } else { |
| _dispatch_io_data_release(data); |
| } |
| _dispatch_release(channel); |
| _dispatch_release(queue); |
| }); |
| } |
| |
| void |
| dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data, |
| dispatch_queue_t queue, void *context, |
| dispatch_io_handler_function_t handler) |
| { |
| return dispatch_io_write(channel, offset, data, queue, |
| ^(bool done, dispatch_data_t d, int error){ |
| handler(context, done, d, error); |
| }); |
| } |
| |
| void |
| dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, |
| void (^handler)(dispatch_data_t, int)) |
| { |
| _dispatch_retain(queue); |
| _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { |
| // On barrier queue |
| if (fd_entry->err) { |
| int err = fd_entry->err; |
| dispatch_async(queue, ^{ |
| _dispatch_fd_debug("convenience handler invoke", fd); |
| handler(dispatch_data_empty, err); |
| }); |
| _dispatch_release(queue); |
| return; |
| } |
| // Safe to access fd_entry on barrier queue |
| dispatch_io_t channel = fd_entry->convenience_channel; |
| if (!channel) { |
| channel = _dispatch_io_create(DISPATCH_IO_STREAM); |
| channel->fd = fd; |
| channel->fd_actual = fd; |
| channel->fd_entry = fd_entry; |
| dispatch_retain(fd_entry->barrier_queue); |
| dispatch_retain(fd_entry->barrier_group); |
| channel->barrier_queue = fd_entry->barrier_queue; |
| channel->barrier_group = fd_entry->barrier_group; |
| fd_entry->convenience_channel = channel; |
| } |
| __block dispatch_data_t deliver_data = dispatch_data_empty; |
| __block int err = 0; |
| dispatch_async(fd_entry->close_queue, ^{ |
| dispatch_async(queue, ^{ |
| _dispatch_fd_debug("convenience handler invoke", fd); |
| handler(deliver_data, err); |
| _dispatch_io_data_release(deliver_data); |
| }); |
| _dispatch_release(queue); |
| }); |
| dispatch_operation_t op = |
| _dispatch_operation_create(DOP_DIR_READ, channel, 0, |
| length, dispatch_data_empty, |
| _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false), |
| ^(bool done, dispatch_data_t data, int error) { |
| if (data) { |
| data = dispatch_data_create_concat(deliver_data, data); |
| _dispatch_io_data_release(deliver_data); |
| deliver_data = data; |
| } |
| if (done) { |
| err = error; |
| } |
| }); |
| if (op) { |
| _dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty); |
| } |
| }); |
| } |
| |
| void |
| dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue, |
| void *context, void (*handler)(void *, dispatch_data_t, int)) |
| { |
| return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){ |
| handler(context, d, error); |
| }); |
| } |
| |
| void |
| dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, |
| void (^handler)(dispatch_data_t, int)) |
| { |
| _dispatch_io_data_retain(data); |
| _dispatch_retain(queue); |
| _dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) { |
| // On barrier queue |
| if (fd_entry->err) { |
| int err = fd_entry->err; |
| dispatch_async(queue, ^{ |
| _dispatch_fd_debug("convenience handler invoke", fd); |
| handler(NULL, err); |
| }); |
| _dispatch_release(queue); |
| return; |
| } |
| // Safe to access fd_entry on barrier queue |
| dispatch_io_t channel = fd_entry->convenience_channel; |
| if (!channel) { |
| channel = _dispatch_io_create(DISPATCH_IO_STREAM); |
| channel->fd = fd; |
| channel->fd_actual = fd; |
| channel->fd_entry = fd_entry; |
| dispatch_retain(fd_entry->barrier_queue); |
| dispatch_retain(fd_entry->barrier_group); |
| channel->barrier_queue = fd_entry->barrier_queue; |
| channel->barrier_group = fd_entry->barrier_group; |
| fd_entry->convenience_channel = channel; |
| } |
| __block dispatch_data_t deliver_data = NULL; |
| __block int err = 0; |
| dispatch_async(fd_entry->close_queue, ^{ |
| dispatch_async(queue, ^{ |
| _dispatch_fd_debug("convenience handler invoke", fd); |
| handler(deliver_data, err); |
| if (deliver_data) { |
| _dispatch_io_data_release(deliver_data); |
| } |
| }); |
| _dispatch_release(queue); |
| }); |
| dispatch_operation_t op = |
| _dispatch_operation_create(DOP_DIR_WRITE, channel, 0, |
| dispatch_data_get_size(data), data, |
| _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false), |
| ^(bool done, dispatch_data_t d, int error) { |
| if (done) { |
| if (d) { |
| _dispatch_io_data_retain(d); |
| deliver_data = d; |
| } |
| err = error; |
| } |
| }); |
| if (op) { |
| _dispatch_operation_enqueue(op, DOP_DIR_WRITE, data); |
| } |
| _dispatch_io_data_release(data); |
| }); |
| } |
| |
| void |
| dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue, |
| void *context, void (*handler)(void *, dispatch_data_t, int)) |
| { |
| return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){ |
| handler(context, d, error); |
| }); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_operation_t |
| |
| static dispatch_operation_t |
| _dispatch_operation_create(dispatch_op_direction_t direction, |
| dispatch_io_t channel, off_t offset, size_t length, |
| dispatch_data_t data, dispatch_queue_t queue, |
| dispatch_io_handler_t handler) |
| { |
| // On channel queue |
| dispatch_assert(direction < DOP_DIR_MAX); |
| // Safe to call _dispatch_io_get_error() with channel->fd_entry since |
| // that can only be NULL if atomic_flags are set rdar://problem/8362514 |
| int err = _dispatch_io_get_error(NULL, channel, false); |
| if (err || !length) { |
| _dispatch_io_data_retain(data); |
| _dispatch_retain(queue); |
| dispatch_async(channel->barrier_queue, ^{ |
| dispatch_async(queue, ^{ |
| dispatch_data_t d = data; |
| if (direction == DOP_DIR_READ && err) { |
| d = NULL; |
| } else if (direction == DOP_DIR_WRITE && !err) { |
| d = NULL; |
| } |
| _dispatch_channel_debug("IO handler invoke: err %d", channel, |
| err); |
| handler(true, d, err); |
| _dispatch_io_data_release(data); |
| }); |
| _dispatch_release(queue); |
| }); |
| return NULL; |
| } |
| dispatch_operation_t op = _dispatch_object_alloc(DISPATCH_VTABLE(operation), |
| sizeof(struct dispatch_operation_s)); |
| _dispatch_channel_debug("operation create: %p", channel, op); |
| op->do_next = DISPATCH_OBJECT_LISTLESS; |
| op->do_xref_cnt = -1; // operation object is not exposed externally |
| op->op_q = dispatch_queue_create_with_target("com.apple.libdispatch-io.opq", |
| NULL, queue); |
| op->active = false; |
| op->direction = direction; |
| op->offset = offset + channel->f_ptr; |
| op->length = length; |
| op->handler = _dispatch_io_Block_copy(handler); |
| _dispatch_retain(channel); |
| op->channel = channel; |
| op->params = channel->params; |
| // Take a snapshot of the priority of the channel queue. The actual I/O |
| // for this operation will be performed at this priority |
| dispatch_queue_t targetq = op->channel->do_targetq; |
| while (fastpath(targetq->do_targetq)) { |
| targetq = targetq->do_targetq; |
| } |
| op->do_targetq = targetq; |
| _dispatch_object_debug(op, "%s", __func__); |
| return op; |
| } |
| |
| void |
| _dispatch_operation_dispose(dispatch_operation_t op, |
| DISPATCH_UNUSED bool *allow_free) |
| { |
| _dispatch_object_debug(op, "%s", __func__); |
| _dispatch_op_debug("dispose", op); |
| // Deliver the data if there's any |
| if (op->fd_entry) { |
| _dispatch_operation_deliver_data(op, DOP_DONE); |
| dispatch_group_leave(op->fd_entry->barrier_group); |
| _dispatch_fd_entry_release(op->fd_entry); |
| } |
| if (op->channel) { |
| _dispatch_release(op->channel); |
| } |
| if (op->timer) { |
| dispatch_release(op->timer); |
| } |
| // For write operations, op->buf is owned by op->buf_data |
| if (op->buf && op->direction == DOP_DIR_READ) { |
| free(op->buf); |
| } |
| if (op->buf_data) { |
| _dispatch_io_data_release(op->buf_data); |
| } |
| if (op->data) { |
| _dispatch_io_data_release(op->data); |
| } |
| if (op->op_q) { |
| dispatch_release(op->op_q); |
| } |
| Block_release(op->handler); |
| _dispatch_op_debug("disposed", op); |
| } |
| |
| static void |
| _dispatch_operation_enqueue(dispatch_operation_t op, |
| dispatch_op_direction_t direction, dispatch_data_t data) |
| { |
| // Called from the barrier queue |
| _dispatch_io_data_retain(data); |
| // If channel is closed or stopped, then call the handler immediately |
| int err = _dispatch_io_get_error(NULL, op->channel, false); |
| if (err) { |
| dispatch_io_handler_t handler = op->handler; |
| dispatch_async(op->op_q, ^{ |
| dispatch_data_t d = data; |
| if (direction == DOP_DIR_READ && err) { |
| d = NULL; |
| } else if (direction == DOP_DIR_WRITE && !err) { |
| d = NULL; |
| } |
| handler(true, d, err); |
| _dispatch_io_data_release(data); |
| }); |
| _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err); |
| _dispatch_release(op); |
| return; |
| } |
| // Finish operation init |
| op->fd_entry = op->channel->fd_entry; |
| _dispatch_fd_entry_retain(op->fd_entry); |
| dispatch_group_enter(op->fd_entry->barrier_group); |
| dispatch_disk_t disk = op->fd_entry->disk; |
| if (!disk) { |
| dispatch_stream_t stream = op->fd_entry->streams[direction]; |
| dispatch_async(stream->dq, ^{ |
| _dispatch_stream_enqueue_operation(stream, op, data); |
| _dispatch_io_data_release(data); |
| }); |
| } else { |
| dispatch_async(disk->pick_queue, ^{ |
| _dispatch_disk_enqueue_operation(disk, op, data); |
| _dispatch_io_data_release(data); |
| }); |
| } |
| } |
| |
| static bool |
| _dispatch_operation_should_enqueue(dispatch_operation_t op, |
| dispatch_queue_t tq, dispatch_data_t data) |
| { |
| // On stream queue or disk queue |
| _dispatch_op_debug("enqueue", op); |
| _dispatch_io_data_retain(data); |
| op->data = data; |
| int err = _dispatch_io_get_error(op, NULL, true); |
| if (err) { |
| op->err = err; |
| // Final release |
| _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err); |
| _dispatch_release(op); |
| return false; |
| } |
| if (op->params.interval) { |
| dispatch_resume(_dispatch_operation_timer(tq, op)); |
| } |
| return true; |
| } |
| |
| static dispatch_source_t |
| _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op) |
| { |
| // On stream queue or pick queue |
| if (op->timer) { |
| return op->timer; |
| } |
| dispatch_source_t timer = dispatch_source_create( |
| DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq); |
| dispatch_source_set_timer(timer, |
| dispatch_time(DISPATCH_TIME_NOW, (int64_t)op->params.interval), |
| op->params.interval, 0); |
| dispatch_source_set_event_handler(timer, ^{ |
| // On stream queue or pick queue |
| if (dispatch_source_testcancel(timer)) { |
| // Do nothing. The operation has already completed |
| return; |
| } |
| dispatch_op_flags_t flags = DOP_DEFAULT; |
| if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) { |
| // Deliver even if there is less data than the low-water mark |
| flags |= DOP_DELIVER; |
| } |
| // If the operation is active, dont deliver data |
| if ((op->active) && (flags & DOP_DELIVER)) { |
| op->flags = flags; |
| } else { |
| _dispatch_operation_deliver_data(op, flags); |
| } |
| }); |
| op->timer = timer; |
| return op->timer; |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_fd_entry_t |
| |
| #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD |
| static void |
| _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) |
| { |
| guardid_t guard = fd_entry; |
| const unsigned int guard_flags = GUARD_CLOSE; |
| int err, fd_flags = 0; |
| _dispatch_io_syscall_switch_noerr(err, |
| change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags, |
| &fd_flags), |
| case 0: |
| fd_entry->guard_flags = guard_flags; |
| fd_entry->orig_fd_flags = fd_flags; |
| break; |
| case EPERM: break; |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| } |
| |
| static void |
| _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) |
| { |
| if (!fd_entry->guard_flags) { |
| return; |
| } |
| guardid_t guard = fd_entry; |
| int err, fd_flags = fd_entry->orig_fd_flags; |
| _dispatch_io_syscall_switch(err, |
| change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0, |
| &fd_flags), |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| } |
| #else |
| static inline void |
| _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } |
| static inline void |
| _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; } |
| #endif // DISPATCH_USE_GUARDED_FD |
| |
| static inline int |
| _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path, |
| int oflag, mode_t mode) { |
| #if DISPATCH_USE_GUARDED_FD |
| guardid_t guard = (uintptr_t)fd_entry; |
| const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP | |
| GUARD_SOCKET_IPC | GUARD_FILEPORT; |
| int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC, |
| mode); |
| if (fd != -1) { |
| fd_entry->guard_flags = guard_flags; |
| return fd; |
| } |
| errno = 0; |
| #else |
| (void)fd_entry; |
| #endif |
| return open(path, oflag, mode); |
| } |
| |
| static inline int |
| _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) { |
| #if DISPATCH_USE_GUARDED_FD |
| if (fd_entry->guard_flags) { |
| guardid_t guard = (uintptr_t)fd_entry; |
| return guarded_close_np(fd, &guard); |
| } else |
| #else |
| (void)fd_entry; |
| #endif |
| { |
| return close(fd); |
| } |
| } |
| |
| static inline void |
| _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) { |
| dispatch_suspend(fd_entry->close_queue); |
| } |
| |
| static inline void |
| _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) { |
| dispatch_resume(fd_entry->close_queue); |
| } |
| |
| static void |
| _dispatch_fd_entry_init_async(dispatch_fd_t fd, |
| dispatch_fd_entry_init_callback_t completion_callback) |
| { |
| static dispatch_once_t _dispatch_io_fds_lockq_pred; |
| dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL, |
| _dispatch_io_fds_lockq_init); |
| dispatch_async(_dispatch_io_fds_lockq, ^{ |
| dispatch_fd_entry_t fd_entry = NULL; |
| // Check to see if there is an existing entry for the given fd |
| uintptr_t hash = DIO_HASH(fd); |
| TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) { |
| if (fd_entry->fd == fd) { |
| // Retain the fd_entry to ensure it cannot go away until the |
| // stat() has completed |
| _dispatch_fd_entry_retain(fd_entry); |
| break; |
| } |
| } |
| if (!fd_entry) { |
| // If we did not find an existing entry, create one |
| fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash); |
| } |
| _dispatch_fd_entry_debug("init", fd_entry); |
| dispatch_async(fd_entry->barrier_queue, ^{ |
| _dispatch_fd_entry_debug("init completion", fd_entry); |
| completion_callback(fd_entry); |
| // stat() is complete, release reference to fd_entry |
| _dispatch_fd_entry_release(fd_entry); |
| }); |
| }); |
| } |
| |
| static dispatch_fd_entry_t |
| _dispatch_fd_entry_create(dispatch_queue_t q) |
| { |
| dispatch_fd_entry_t fd_entry; |
| fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s)); |
| // Use target queue to ensure that no concurrent lookups are going on when |
| // the close queue is running |
| fd_entry->close_queue = dispatch_queue_create_with_target( |
| "com.apple.libdispatch-io.closeq", NULL, q); |
| // Suspend the cleanup queue until closing |
| _dispatch_fd_entry_retain(fd_entry); |
| return fd_entry; |
| } |
| |
| static dispatch_fd_entry_t |
| _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash) |
| { |
| // On fds lock queue |
| dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( |
| _dispatch_io_fds_lockq); |
| _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd); |
| fd_entry->fd = fd; |
| TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list); |
| fd_entry->barrier_queue = dispatch_queue_create( |
| "com.apple.libdispatch-io.barrierq", NULL); |
| fd_entry->barrier_group = dispatch_group_create(); |
| dispatch_async(fd_entry->barrier_queue, ^{ |
| _dispatch_fd_entry_debug("stat", fd_entry); |
| int err, orig_flags, orig_nosigpipe = -1; |
| struct stat st; |
| _dispatch_io_syscall_switch(err, |
| fstat(fd, &st), |
| default: fd_entry->err = err; return; |
| ); |
| fd_entry->stat.dev = st.st_dev; |
| fd_entry->stat.mode = st.st_mode; |
| _dispatch_fd_entry_guard(fd_entry); |
| _dispatch_io_syscall_switch(err, |
| orig_flags = fcntl(fd, F_GETFL), |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 |
| if (S_ISFIFO(st.st_mode)) { |
| _dispatch_io_syscall_switch(err, |
| orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE), |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| if (orig_nosigpipe != -1) { |
| _dispatch_io_syscall_switch(err, |
| orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1), |
| default: |
| orig_nosigpipe = -1; |
| (void)dispatch_assume_zero(err); |
| break; |
| ); |
| } |
| } |
| #endif |
| if (S_ISREG(st.st_mode)) { |
| if (orig_flags != -1) { |
| _dispatch_io_syscall_switch(err, |
| fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK), |
| default: |
| orig_flags = -1; |
| (void)dispatch_assume_zero(err); |
| break; |
| ); |
| } |
| dev_t dev = major(st.st_dev); |
| // We have to get the disk on the global dev queue. The |
| // barrier queue cannot continue until that is complete |
| dispatch_suspend(fd_entry->barrier_queue); |
| dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL, |
| _dispatch_io_devs_lockq_init); |
| dispatch_async(_dispatch_io_devs_lockq, ^{ |
| _dispatch_disk_init(fd_entry, dev); |
| dispatch_resume(fd_entry->barrier_queue); |
| }); |
| } else { |
| if (orig_flags != -1) { |
| _dispatch_io_syscall_switch(err, |
| fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK), |
| default: |
| orig_flags = -1; |
| (void)dispatch_assume_zero(err); |
| break; |
| ); |
| } |
| |
| _dispatch_stream_init(fd_entry, |
| _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false)); |
| } |
| fd_entry->orig_flags = orig_flags; |
| fd_entry->orig_nosigpipe = orig_nosigpipe; |
| }); |
| // This is the first item run when the close queue is resumed, indicating |
| // that all channels associated with this entry have been closed and that |
| // all operations associated with this entry have been freed |
| dispatch_async(fd_entry->close_queue, ^{ |
| if (!fd_entry->disk) { |
| _dispatch_fd_entry_debug("close queue cleanup", fd_entry); |
| dispatch_op_direction_t dir; |
| for (dir = 0; dir < DOP_DIR_MAX; dir++) { |
| _dispatch_stream_dispose(fd_entry, dir); |
| } |
| } else { |
| dispatch_disk_t disk = fd_entry->disk; |
| dispatch_async(_dispatch_io_devs_lockq, ^{ |
| _dispatch_release(disk); |
| }); |
| } |
| // Remove this entry from the global fd list |
| TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list); |
| }); |
| // If there was a source associated with this stream, disposing of the |
| // source cancels it and suspends the close queue. Freeing the fd_entry |
| // structure must happen after the source cancel handler has finished |
| dispatch_async(fd_entry->close_queue, ^{ |
| _dispatch_fd_entry_debug("close queue release", fd_entry); |
| dispatch_release(fd_entry->close_queue); |
| _dispatch_fd_entry_debug("barrier queue release", fd_entry); |
| dispatch_release(fd_entry->barrier_queue); |
| _dispatch_fd_entry_debug("barrier group release", fd_entry); |
| dispatch_release(fd_entry->barrier_group); |
| if (fd_entry->orig_flags != -1) { |
| _dispatch_io_syscall( |
| fcntl(fd, F_SETFL, fd_entry->orig_flags) |
| ); |
| } |
| #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123 |
| if (fd_entry->orig_nosigpipe != -1) { |
| _dispatch_io_syscall( |
| fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe) |
| ); |
| } |
| #endif |
| _dispatch_fd_entry_unguard(fd_entry); |
| if (fd_entry->convenience_channel) { |
| fd_entry->convenience_channel->fd_entry = NULL; |
| dispatch_release(fd_entry->convenience_channel); |
| } |
| free(fd_entry); |
| }); |
| return fd_entry; |
| } |
| |
| static dispatch_fd_entry_t |
| _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data, |
| dev_t dev, mode_t mode) |
| { |
| // On devs lock queue |
| dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create( |
| path_data->channel->queue); |
| _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path); |
| if (S_ISREG(mode)) { |
| _dispatch_disk_init(fd_entry, major(dev)); |
| } else { |
| _dispatch_stream_init(fd_entry, |
| _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false)); |
| } |
| fd_entry->fd = -1; |
| fd_entry->orig_flags = -1; |
| fd_entry->path_data = path_data; |
| fd_entry->stat.dev = dev; |
| fd_entry->stat.mode = mode; |
| fd_entry->barrier_queue = dispatch_queue_create( |
| "com.apple.libdispatch-io.barrierq", NULL); |
| fd_entry->barrier_group = dispatch_group_create(); |
| // This is the first item run when the close queue is resumed, indicating |
| // that the channel associated with this entry has been closed and that |
| // all operations associated with this entry have been freed |
| dispatch_async(fd_entry->close_queue, ^{ |
| _dispatch_fd_entry_debug("close queue cleanup", fd_entry); |
| if (!fd_entry->disk) { |
| dispatch_op_direction_t dir; |
| for (dir = 0; dir < DOP_DIR_MAX; dir++) { |
| _dispatch_stream_dispose(fd_entry, dir); |
| } |
| } |
| if (fd_entry->fd != -1) { |
| _dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd); |
| } |
| if (fd_entry->path_data->channel) { |
| // If associated channel has not been released yet, mark it as |
| // no longer having an fd_entry (for stop after close). |
| // It is safe to modify channel since we are on close_queue with |
| // target queue the channel queue |
| fd_entry->path_data->channel->fd_entry = NULL; |
| } |
| }); |
| dispatch_async(fd_entry->close_queue, ^{ |
| _dispatch_fd_entry_debug("close queue release", fd_entry); |
| dispatch_release(fd_entry->close_queue); |
| dispatch_release(fd_entry->barrier_queue); |
| dispatch_release(fd_entry->barrier_group); |
| free(fd_entry->path_data); |
| free(fd_entry); |
| }); |
| return fd_entry; |
| } |
| |
| static int |
| _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel) |
| { |
| if (!(fd_entry->fd == -1 && fd_entry->path_data)) { |
| return 0; |
| } |
| if (fd_entry->err) { |
| return fd_entry->err; |
| } |
| int fd = -1; |
| int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK : |
| fd_entry->path_data->oflag | O_NONBLOCK; |
| open: |
| fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path, |
| oflag, fd_entry->path_data->mode); |
| if (fd == -1) { |
| int err = errno; |
| if (err == EINTR) { |
| goto open; |
| } |
| (void)os_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed); |
| return err; |
| } |
| if (!os_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) { |
| // Lost the race with another open |
| _dispatch_fd_entry_guarded_close(fd_entry, fd); |
| } else { |
| channel->fd_actual = fd; |
| } |
| _dispatch_object_debug(channel, "%s", __func__); |
| return 0; |
| } |
| |
| static void |
| _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry, |
| dispatch_io_t channel) |
| { |
| if (fd_entry->disk) { |
| if (channel) { |
| _dispatch_retain(channel); |
| } |
| _dispatch_fd_entry_retain(fd_entry); |
| dispatch_async(fd_entry->disk->pick_queue, ^{ |
| _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel); |
| _dispatch_fd_entry_release(fd_entry); |
| if (channel) { |
| _dispatch_release(channel); |
| } |
| }); |
| } else { |
| dispatch_op_direction_t direction; |
| for (direction = 0; direction < DOP_DIR_MAX; direction++) { |
| dispatch_stream_t stream = fd_entry->streams[direction]; |
| if (!stream) { |
| continue; |
| } |
| if (channel) { |
| _dispatch_retain(channel); |
| } |
| _dispatch_fd_entry_retain(fd_entry); |
| dispatch_async(stream->dq, ^{ |
| _dispatch_stream_cleanup_operations(stream, channel); |
| _dispatch_fd_entry_release(fd_entry); |
| if (channel) { |
| _dispatch_release(channel); |
| } |
| }); |
| } |
| } |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_stream_t/dispatch_disk_t |
| |
| static void |
| _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq) |
| { |
| dispatch_op_direction_t direction; |
| for (direction = 0; direction < DOP_DIR_MAX; direction++) { |
| dispatch_stream_t stream; |
| stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s)); |
| stream->dq = dispatch_queue_create_with_target( |
| "com.apple.libdispatch-io.streamq", NULL, tq); |
| dispatch_set_context(stream->dq, stream); |
| TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]); |
| TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]); |
| fd_entry->streams[direction] = stream; |
| } |
| } |
| |
| static void |
| _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry, |
| dispatch_op_direction_t direction) |
| { |
| // On close queue |
| dispatch_stream_t stream = fd_entry->streams[direction]; |
| if (!stream) { |
| return; |
| } |
| dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); |
| dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])); |
| if (stream->source) { |
| // Balanced by source cancel handler: |
| _dispatch_fd_entry_retain(fd_entry); |
| dispatch_source_cancel(stream->source); |
| dispatch_resume(stream->source); |
| dispatch_release(stream->source); |
| } |
| dispatch_set_context(stream->dq, NULL); |
| dispatch_release(stream->dq); |
| free(stream); |
| } |
| |
| static void |
| _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev) |
| { |
| // On devs lock queue |
| dispatch_disk_t disk; |
| // Check to see if there is an existing entry for the given device |
| uintptr_t hash = DIO_HASH(dev); |
| TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) { |
| if (disk->dev == dev) { |
| _dispatch_retain(disk); |
| goto out; |
| } |
| } |
| // Otherwise create a new entry |
| size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs; |
| disk = _dispatch_object_alloc(DISPATCH_VTABLE(disk), |
| sizeof(struct dispatch_disk_s) + |
| (pending_reqs_depth * sizeof(dispatch_operation_t))); |
| disk->do_next = DISPATCH_OBJECT_LISTLESS; |
| disk->do_xref_cnt = -1; |
| disk->advise_list_depth = pending_reqs_depth; |
| disk->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false); |
| disk->dev = dev; |
| TAILQ_INIT(&disk->operations); |
| disk->cur_rq = TAILQ_FIRST(&disk->operations); |
| char label[45]; |
| snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", |
| (int)dev); |
| disk->pick_queue = dispatch_queue_create(label, NULL); |
| TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list); |
| out: |
| fd_entry->disk = disk; |
| TAILQ_INIT(&fd_entry->stream_ops); |
| } |
| |
| void |
| _dispatch_disk_dispose(dispatch_disk_t disk, DISPATCH_UNUSED bool *allow_free) |
| { |
| uintptr_t hash = DIO_HASH(disk->dev); |
| TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list); |
| dispatch_assert(TAILQ_EMPTY(&disk->operations)); |
| size_t i; |
| for (i=0; i<disk->advise_list_depth; ++i) { |
| dispatch_assert(!disk->advise_list[i]); |
| } |
| dispatch_release(disk->pick_queue); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_stream_operations/dispatch_disk_operations |
| |
| static inline bool |
| _dispatch_stream_operation_avail(dispatch_stream_t stream) |
| { |
| return !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) || |
| !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])); |
| } |
| |
| static void |
| _dispatch_stream_enqueue_operation(dispatch_stream_t stream, |
| dispatch_operation_t op, dispatch_data_t data) |
| { |
| if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) { |
| return; |
| } |
| _dispatch_object_debug(op, "%s", __func__); |
| bool no_ops = !_dispatch_stream_operation_avail(stream); |
| TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list); |
| if (no_ops) { |
| dispatch_async_f(stream->dq, stream->dq, |
| _dispatch_stream_queue_handler); |
| } |
| } |
| |
| static void |
| _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op, |
| dispatch_data_t data) |
| { |
| if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) { |
| return; |
| } |
| _dispatch_object_debug(op, "%s", __func__); |
| if (op->params.type == DISPATCH_IO_STREAM) { |
| if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) { |
| TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); |
| } |
| TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list); |
| } else { |
| TAILQ_INSERT_TAIL(&disk->operations, op, operation_list); |
| } |
| _dispatch_disk_handler(disk); |
| } |
| |
| static void |
| _dispatch_stream_complete_operation(dispatch_stream_t stream, |
| dispatch_operation_t op) |
| { |
| // On stream queue |
| _dispatch_object_debug(op, "%s", __func__); |
| _dispatch_op_debug("complete: stream %p", op, stream); |
| TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list); |
| if (op == stream->op) { |
| stream->op = NULL; |
| } |
| if (op->timer) { |
| dispatch_source_cancel(op->timer); |
| } |
| // Final release will deliver any pending data |
| _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt); |
| _dispatch_release(op); |
| } |
| |
| static void |
| _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op) |
| { |
| // On pick queue |
| _dispatch_object_debug(op, "%s", __func__); |
| _dispatch_op_debug("complete: disk %p", op, disk); |
| // Current request is always the last op returned |
| if (disk->cur_rq == op) { |
| disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s, |
| operation_list); |
| } |
| if (op->params.type == DISPATCH_IO_STREAM) { |
| // Check if there are other pending stream operations behind it |
| dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list); |
| TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list); |
| if (op_next) { |
| TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list); |
| } |
| } |
| TAILQ_REMOVE(&disk->operations, op, operation_list); |
| if (op->timer) { |
| dispatch_source_cancel(op->timer); |
| } |
| // Final release will deliver any pending data |
| _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt); |
| _dispatch_release(op); |
| } |
| |
| static dispatch_operation_t |
| _dispatch_stream_pick_next_operation(dispatch_stream_t stream, |
| dispatch_operation_t op) |
| { |
| // On stream queue |
| if (!op) { |
| // On the first run through, pick the first operation |
| if (!_dispatch_stream_operation_avail(stream)) { |
| return op; |
| } |
| if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) { |
| op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]); |
| } else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) { |
| op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); |
| } |
| return op; |
| } |
| if (op->params.type == DISPATCH_IO_STREAM) { |
| // Stream operations need to be serialized so continue the current |
| // operation until it is finished |
| return op; |
| } |
| // Get the next random operation (round-robin) |
| if (op->params.type == DISPATCH_IO_RANDOM) { |
| op = TAILQ_NEXT(op, operation_list); |
| if (!op) { |
| op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]); |
| } |
| return op; |
| } |
| return NULL; |
| } |
| |
| static dispatch_operation_t |
| _dispatch_disk_pick_next_operation(dispatch_disk_t disk) |
| { |
| // On pick queue |
| dispatch_operation_t op; |
| if (!TAILQ_EMPTY(&disk->operations)) { |
| if (disk->cur_rq == NULL) { |
| op = TAILQ_FIRST(&disk->operations); |
| } else { |
| op = disk->cur_rq; |
| do { |
| op = TAILQ_NEXT(op, operation_list); |
| if (!op) { |
| op = TAILQ_FIRST(&disk->operations); |
| } |
| // TODO: more involved picking algorithm rdar://problem/8780312 |
| } while (op->active && op != disk->cur_rq); |
| } |
| if (!op->active) { |
| disk->cur_rq = op; |
| return op; |
| } |
| } |
| return NULL; |
| } |
| |
| static void |
| _dispatch_stream_cleanup_operations(dispatch_stream_t stream, |
| dispatch_io_t channel) |
| { |
| // On stream queue |
| dispatch_operation_t op, tmp; |
| typeof(*stream->operations) *operations; |
| operations = &stream->operations[DISPATCH_IO_RANDOM]; |
| TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { |
| if (!channel || op->channel == channel) { |
| _dispatch_stream_complete_operation(stream, op); |
| } |
| } |
| operations = &stream->operations[DISPATCH_IO_STREAM]; |
| TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) { |
| if (!channel || op->channel == channel) { |
| _dispatch_stream_complete_operation(stream, op); |
| } |
| } |
| if (stream->source_running && !_dispatch_stream_operation_avail(stream)) { |
| dispatch_suspend(stream->source); |
| stream->source_running = false; |
| } |
| } |
| |
| static inline void |
| _dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk, |
| dispatch_io_t channel, bool inactive_only) |
| { |
| // On pick queue |
| dispatch_operation_t op, tmp; |
| TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) { |
| if (inactive_only && op->active) continue; |
| if (!channel || op->channel == channel) { |
| _dispatch_op_debug("cleanup: disk %p", op, disk); |
| _dispatch_disk_complete_operation(disk, op); |
| } |
| } |
| } |
| |
| static void |
| _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel) |
| { |
| _dispatch_disk_cleanup_specified_operations(disk, channel, false); |
| } |
| |
| static void |
| _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk, |
| dispatch_io_t channel) |
| { |
| _dispatch_disk_cleanup_specified_operations(disk, channel, true); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_stream_handler/dispatch_disk_handler |
| |
| static dispatch_source_t |
| _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op) |
| { |
| // On stream queue |
| if (stream->source) { |
| return stream->source; |
| } |
| dispatch_fd_t fd = op->fd_entry->fd; |
| _dispatch_op_debug("stream source create", op); |
| dispatch_source_t source = NULL; |
| if (op->direction == DOP_DIR_READ) { |
| source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, |
| (uintptr_t)fd, 0, stream->dq); |
| } else if (op->direction == DOP_DIR_WRITE) { |
| source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, |
| (uintptr_t)fd, 0, stream->dq); |
| } else { |
| dispatch_assert(op->direction < DOP_DIR_MAX); |
| return NULL; |
| } |
| dispatch_set_context(source, stream); |
| dispatch_source_set_event_handler_f(source, |
| _dispatch_stream_source_handler); |
| // Close queue must not run user cleanup handlers until sources are fully |
| // unregistered |
| dispatch_queue_t close_queue = op->fd_entry->close_queue; |
| dispatch_source_set_mandatory_cancel_handler(source, ^{ |
| _dispatch_op_debug("stream source cancel", op); |
| dispatch_resume(close_queue); |
| }); |
| stream->source = source; |
| return stream->source; |
| } |
| |
| static void |
| _dispatch_stream_source_handler(void *ctx) |
| { |
| // On stream queue |
| dispatch_stream_t stream = (dispatch_stream_t)ctx; |
| dispatch_suspend(stream->source); |
| stream->source_running = false; |
| return _dispatch_stream_handler(stream); |
| } |
| |
| static void |
| _dispatch_stream_queue_handler(void *ctx) |
| { |
| // On stream queue |
| dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx); |
| if (!stream) { |
| // _dispatch_stream_dispose has been called |
| return; |
| } |
| return _dispatch_stream_handler(stream); |
| } |
| |
| static void |
| _dispatch_stream_handler(void *ctx) |
| { |
| // On stream queue |
| dispatch_stream_t stream = (dispatch_stream_t)ctx; |
| dispatch_operation_t op; |
| pick: |
| op = _dispatch_stream_pick_next_operation(stream, stream->op); |
| if (!op) { |
| _dispatch_debug("no operation found: stream %p", stream); |
| return; |
| } |
| int err = _dispatch_io_get_error(op, NULL, true); |
| if (err) { |
| op->err = err; |
| _dispatch_stream_complete_operation(stream, op); |
| goto pick; |
| } |
| stream->op = op; |
| _dispatch_op_debug("stream handler", op); |
| dispatch_fd_entry_t fd_entry = op->fd_entry; |
| _dispatch_fd_entry_retain(fd_entry); |
| // For performance analysis |
| if (!op->total && dispatch_io_defaults.initial_delivery) { |
| // Empty delivery to signal the start of the operation |
| _dispatch_op_debug("initial delivery", op); |
| _dispatch_operation_deliver_data(op, DOP_DELIVER); |
| } |
| // TODO: perform on the operation target queue to get correct priority |
| int result = _dispatch_operation_perform(op); |
| dispatch_op_flags_t flags = ~0u; |
| switch (result) { |
| case DISPATCH_OP_DELIVER: |
| flags = DOP_DEFAULT; |
| // Fall through |
| case DISPATCH_OP_DELIVER_AND_COMPLETE: |
| flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY : |
| DOP_DEFAULT; |
| _dispatch_operation_deliver_data(op, flags); |
| // Fall through |
| case DISPATCH_OP_COMPLETE: |
| if (flags != DOP_DEFAULT) { |
| _dispatch_stream_complete_operation(stream, op); |
| } |
| if (_dispatch_stream_operation_avail(stream)) { |
| dispatch_async_f(stream->dq, stream->dq, |
| _dispatch_stream_queue_handler); |
| } |
| break; |
| case DISPATCH_OP_COMPLETE_RESUME: |
| _dispatch_stream_complete_operation(stream, op); |
| // Fall through |
| case DISPATCH_OP_RESUME: |
| if (_dispatch_stream_operation_avail(stream)) { |
| stream->source_running = true; |
| dispatch_resume(_dispatch_stream_source(stream, op)); |
| } |
| break; |
| case DISPATCH_OP_ERR: |
| _dispatch_stream_cleanup_operations(stream, op->channel); |
| break; |
| case DISPATCH_OP_FD_ERR: |
| _dispatch_fd_entry_retain(fd_entry); |
| dispatch_async(fd_entry->barrier_queue, ^{ |
| _dispatch_fd_entry_cleanup_operations(fd_entry, NULL); |
| _dispatch_fd_entry_release(fd_entry); |
| }); |
| break; |
| default: |
| break; |
| } |
| _dispatch_fd_entry_release(fd_entry); |
| return; |
| } |
| |
| static void |
| _dispatch_disk_handler(void *ctx) |
| { |
| // On pick queue |
| dispatch_disk_t disk = (dispatch_disk_t)ctx; |
| if (disk->io_active) { |
| return; |
| } |
| _dispatch_disk_debug("disk handler", disk); |
| dispatch_operation_t op; |
| size_t i = disk->free_idx, j = disk->req_idx; |
| if (j <= i) { |
| j += disk->advise_list_depth; |
| } |
| while (i <= j) { |
| if ((!disk->advise_list[i%disk->advise_list_depth]) && |
| (op = _dispatch_disk_pick_next_operation(disk))) { |
| int err = _dispatch_io_get_error(op, NULL, true); |
| if (err) { |
| op->err = err; |
| _dispatch_disk_complete_operation(disk, op); |
| continue; |
| } |
| _dispatch_retain(op); |
| _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1); |
| disk->advise_list[i%disk->advise_list_depth] = op; |
| op->active = true; |
| _dispatch_op_debug("activate: disk %p", op, disk); |
| _dispatch_object_debug(op, "%s", __func__); |
| } else { |
| // No more operations to get |
| break; |
| } |
| i++; |
| } |
| disk->free_idx = (i%disk->advise_list_depth); |
| op = disk->advise_list[disk->req_idx]; |
| if (op) { |
| disk->io_active = true; |
| _dispatch_op_debug("async perform: disk %p", op, disk); |
| dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform); |
| } |
| } |
| |
| static void |
| _dispatch_disk_perform(void *ctxt) |
| { |
| dispatch_disk_t disk = ctxt; |
| _dispatch_disk_debug("disk perform", disk); |
| size_t chunk_size = dispatch_io_defaults.chunk_size; |
| dispatch_operation_t op; |
| size_t i = disk->advise_idx, j = disk->free_idx; |
| if (j <= i) { |
| j += disk->advise_list_depth; |
| } |
| do { |
| op = disk->advise_list[i%disk->advise_list_depth]; |
| if (!op) { |
| // Nothing more to advise, must be at free_idx |
| dispatch_assert(i%disk->advise_list_depth == disk->free_idx); |
| break; |
| } |
| if (op->direction == DOP_DIR_WRITE) { |
| // TODO: preallocate writes ? rdar://problem/9032172 |
| continue; |
| } |
| if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry, |
| op->channel)) { |
| continue; |
| } |
| // For performance analysis |
| if (!op->total && dispatch_io_defaults.initial_delivery) { |
| // Empty delivery to signal the start of the operation |
| _dispatch_op_debug("initial delivery", op); |
| _dispatch_operation_deliver_data(op, DOP_DELIVER); |
| } |
| // Advise two chunks if the list only has one element and this is the |
| // first advise on the operation |
| if ((j-i) == 1 && !disk->advise_list[disk->free_idx] && |
| !op->advise_offset) { |
| chunk_size *= 2; |
| } |
| _dispatch_operation_advise(op, chunk_size); |
| } while (++i < j); |
| disk->advise_idx = i%disk->advise_list_depth; |
| op = disk->advise_list[disk->req_idx]; |
| int result = _dispatch_operation_perform(op); |
| disk->advise_list[disk->req_idx] = NULL; |
| disk->req_idx = (++disk->req_idx)%disk->advise_list_depth; |
| _dispatch_op_debug("async perform completion: disk %p", op, disk); |
| dispatch_async(disk->pick_queue, ^{ |
| _dispatch_op_debug("perform completion", op); |
| switch (result) { |
| case DISPATCH_OP_DELIVER: |
| _dispatch_operation_deliver_data(op, DOP_DEFAULT); |
| break; |
| case DISPATCH_OP_COMPLETE: |
| _dispatch_disk_complete_operation(disk, op); |
| break; |
| case DISPATCH_OP_DELIVER_AND_COMPLETE: |
| _dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY); |
| _dispatch_disk_complete_operation(disk, op); |
| break; |
| case DISPATCH_OP_ERR: |
| _dispatch_disk_cleanup_operations(disk, op->channel); |
| break; |
| case DISPATCH_OP_FD_ERR: |
| _dispatch_disk_cleanup_operations(disk, NULL); |
| break; |
| default: |
| dispatch_assert(result); |
| break; |
| } |
| _dispatch_op_debug("deactivate: disk %p", op, disk); |
| op->active = false; |
| disk->io_active = false; |
| _dispatch_disk_handler(disk); |
| // Balancing the retain in _dispatch_disk_handler. Note that op must be |
| // released at the very end, since it might hold the last reference to |
| // the disk |
| _dispatch_op_debug("release -> %d (disk perform complete)", op, |
| op->do_ref_cnt); |
| _dispatch_release(op); |
| }); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_operation_perform |
| |
| static void |
| _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size) |
| { |
| _dispatch_op_debug("advise", op); |
| if (_dispatch_io_get_error(op, NULL, true)) return; |
| #ifdef __linux__ |
| // linux does not support fcntl (F_RDAVISE) |
| // define necessary datastructure and use readahead |
| struct radvisory { |
| off_t ra_offset; |
| int ra_count; |
| }; |
| #endif |
| int err; |
| struct radvisory advise; |
| // No point in issuing a read advise for the next chunk if we are already |
| // a chunk ahead from reading the bytes |
| if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) + |
| chunk_size + PAGE_SIZE)) { |
| return; |
| } |
| _dispatch_object_debug(op, "%s", __func__); |
| advise.ra_count = (int)chunk_size; |
| if (!op->advise_offset) { |
| op->advise_offset = op->offset; |
| // If this is the first time through, align the advised range to a |
| // page boundary |
| size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE; |
| advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0); |
| } |
| advise.ra_offset = op->advise_offset; |
| op->advise_offset += advise.ra_count; |
| #ifdef __linux__ |
| _dispatch_io_syscall_switch(err, |
| readahead(op->fd_entry->fd, advise.ra_offset, (size_t)advise.ra_count), |
| case EINVAL: break; // fd does refer to a non-supported filetype |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| #else |
| _dispatch_io_syscall_switch(err, |
| fcntl(op->fd_entry->fd, F_RDADVISE, &advise), |
| case EFBIG: break; // advised past the end of the file rdar://10415691 |
| case ENOTSUP: break; // not all FS support radvise rdar://13484629 |
| // TODO: set disk status on error |
| default: (void)dispatch_assume_zero(err); break; |
| ); |
| #endif |
| } |
| |
| static int |
| _dispatch_operation_perform(dispatch_operation_t op) |
| { |
| _dispatch_op_debug("perform", op); |
| int err = _dispatch_io_get_error(op, NULL, true); |
| if (err) { |
| goto error; |
| } |
| _dispatch_object_debug(op, "%s", __func__); |
| if (!op->buf) { |
| size_t max_buf_siz = op->params.high; |
| size_t chunk_siz = dispatch_io_defaults.chunk_size; |
| if (op->direction == DOP_DIR_READ) { |
| // If necessary, create a buffer for the ongoing operation, large |
| // enough to fit chunk_size but at most high-water |
| size_t data_siz = dispatch_data_get_size(op->data); |
| if (data_siz) { |
| dispatch_assert(data_siz < max_buf_siz); |
| max_buf_siz -= data_siz; |
| } |
| if (max_buf_siz > chunk_siz) { |
| max_buf_siz = chunk_siz; |
| } |
| if (op->length < SIZE_MAX) { |
| op->buf_siz = op->length - op->total; |
| if (op->buf_siz > max_buf_siz) { |
| op->buf_siz = max_buf_siz; |
| } |
| } else { |
| op->buf_siz = max_buf_siz; |
| } |
| op->buf = valloc(op->buf_siz); |
| _dispatch_op_debug("buffer allocated", op); |
| } else if (op->direction == DOP_DIR_WRITE) { |
| // Always write the first data piece, if that is smaller than a |
| // chunk, accumulate further data pieces until chunk size is reached |
| if (chunk_siz > max_buf_siz) { |
| chunk_siz = max_buf_siz; |
| } |
| op->buf_siz = 0; |
| dispatch_data_apply(op->data, |
| ^(dispatch_data_t region DISPATCH_UNUSED, |
| size_t offset DISPATCH_UNUSED, |
| const void* buf DISPATCH_UNUSED, size_t len) { |
| size_t siz = op->buf_siz + len; |
| if (!op->buf_siz || siz <= chunk_siz) { |
| op->buf_siz = siz; |
| } |
| return (bool)(siz < chunk_siz); |
| }); |
| if (op->buf_siz > max_buf_siz) { |
| op->buf_siz = max_buf_siz; |
| } |
| dispatch_data_t d; |
| d = dispatch_data_create_subrange(op->data, 0, op->buf_siz); |
| op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf, |
| NULL); |
| _dispatch_io_data_release(d); |
| _dispatch_op_debug("buffer mapped", op); |
| } |
| } |
| if (op->fd_entry->fd == -1) { |
| err = _dispatch_fd_entry_open(op->fd_entry, op->channel); |
| if (err) { |
| goto error; |
| } |
| } |
| void *buf = op->buf + op->buf_len; |
| size_t len = op->buf_siz - op->buf_len; |
| off_t off = (off_t)((size_t)op->offset + op->total); |
| ssize_t processed = -1; |
| syscall: |
| if (op->direction == DOP_DIR_READ) { |
| if (op->params.type == DISPATCH_IO_STREAM) { |
| processed = read(op->fd_entry->fd, buf, len); |
| } else if (op->params.type == DISPATCH_IO_RANDOM) { |
| processed = pread(op->fd_entry->fd, buf, len, off); |
| } |
| } else if (op->direction == DOP_DIR_WRITE) { |
| if (op->params.type == DISPATCH_IO_STREAM) { |
| processed = write(op->fd_entry->fd, buf, len); |
| } else if (op->params.type == DISPATCH_IO_RANDOM) { |
| processed = pwrite(op->fd_entry->fd, buf, len, off); |
| } |
| } |
| // Encountered an error on the file descriptor |
| if (processed == -1) { |
| err = errno; |
| if (err == EINTR) { |
| goto syscall; |
| } |
| goto error; |
| } |
| // EOF is indicated by two handler invocations |
| if (processed == 0) { |
| _dispatch_op_debug("performed: EOF", op); |
| return DISPATCH_OP_DELIVER_AND_COMPLETE; |
| } |
| op->buf_len += (size_t)processed; |
| op->total += (size_t)processed; |
| if (op->total == op->length) { |
| // Finished processing all the bytes requested by the operation |
| return DISPATCH_OP_COMPLETE; |
| } else { |
| // Deliver data only if we satisfy the filters |
| return DISPATCH_OP_DELIVER; |
| } |
| error: |
| if (err == EAGAIN || err == EWOULDBLOCK) { |
| // For disk based files with blocking I/O we should never get EAGAIN |
| dispatch_assert(!op->fd_entry->disk); |
| _dispatch_op_debug("performed: EAGAIN/EWOULDBLOCK", op); |
| if (op->direction == DOP_DIR_READ && op->total && |
| op->channel == op->fd_entry->convenience_channel) { |
| // Convenience read with available data completes on EAGAIN |
| return DISPATCH_OP_COMPLETE_RESUME; |
| } |
| return DISPATCH_OP_RESUME; |
| } |
| _dispatch_op_debug("performed: err %d", op, err); |
| op->err = err; |
| switch (err) { |
| case ECANCELED: |
| return DISPATCH_OP_ERR; |
| case EBADF: |
| (void)os_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed); |
| return DISPATCH_OP_FD_ERR; |
| default: |
| return DISPATCH_OP_COMPLETE; |
| } |
| } |
| |
| static void |
| _dispatch_operation_deliver_data(dispatch_operation_t op, |
| dispatch_op_flags_t flags) |
| { |
| // Either called from stream resp. pick queue or when op is finalized |
| dispatch_data_t data = NULL; |
| int err = 0; |
| size_t undelivered = op->undelivered + op->buf_len; |
| bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) || |
| (op->flags & DOP_DELIVER); |
| op->flags = DOP_DEFAULT; |
| if (!deliver) { |
| // Don't deliver data until low water mark has been reached |
| if (undelivered >= op->params.low) { |
| deliver = true; |
| } else if (op->buf_len < op->buf_siz) { |
| // Request buffer is not yet used up |
| _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered); |
| return; |
| } |
| } else { |
| err = op->err; |
| if (!err && (op->channel->atomic_flags & DIO_STOPPED)) { |
| err = ECANCELED; |
| op->err = err; |
| } |
| } |
| // Deliver data or buffer used up |
| if (op->direction == DOP_DIR_READ) { |
| if (op->buf_len) { |
| void *buf = op->buf; |
| data = dispatch_data_create(buf, op->buf_len, NULL, |
| DISPATCH_DATA_DESTRUCTOR_FREE); |
| op->buf = NULL; |
| op->buf_len = 0; |
| dispatch_data_t d = dispatch_data_create_concat(op->data, data); |
| _dispatch_io_data_release(op->data); |
| _dispatch_io_data_release(data); |
| data = d; |
| } else { |
| data = op->data; |
| } |
| op->data = deliver ? dispatch_data_empty : data; |
| } else if (op->direction == DOP_DIR_WRITE) { |
| if (deliver) { |
| data = dispatch_data_create_subrange(op->data, op->buf_len, |
| op->length); |
| } |
| if (op->buf_data && op->buf_len == op->buf_siz) { |
| _dispatch_io_data_release(op->buf_data); |
| op->buf_data = NULL; |
| op->buf = NULL; |
| op->buf_len = 0; |
| // Trim newly written buffer from head of unwritten data |
| dispatch_data_t d; |
| if (deliver) { |
| _dispatch_io_data_retain(data); |
| d = data; |
| } else { |
| d = dispatch_data_create_subrange(op->data, op->buf_siz, |
| op->length); |
| } |
| _dispatch_io_data_release(op->data); |
| op->data = d; |
| } |
| } else { |
| dispatch_assert(op->direction < DOP_DIR_MAX); |
| return; |
| } |
| if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) { |
| op->undelivered = undelivered; |
| _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered); |
| return; |
| } |
| op->undelivered = 0; |
| _dispatch_object_debug(op, "%s", __func__); |
| _dispatch_op_debug("deliver data", op); |
| dispatch_op_direction_t direction = op->direction; |
| dispatch_io_handler_t handler = op->handler; |
| dispatch_fd_entry_t fd_entry = op->fd_entry; |
| _dispatch_fd_entry_retain(fd_entry); |
| dispatch_io_t channel = op->channel; |
| _dispatch_retain(channel); |
| // Note that data delivery may occur after the operation is freed |
| dispatch_async(op->op_q, ^{ |
| bool done = (flags & DOP_DONE); |
| dispatch_data_t d = data; |
| if (done) { |
| if (direction == DOP_DIR_READ && err) { |
| if (dispatch_data_get_size(d)) { |
| _dispatch_op_debug("IO handler invoke", op); |
| handler(false, d, 0); |
| } |
| d = NULL; |
| } else if (direction == DOP_DIR_WRITE && !err) { |
| d = NULL; |
| } |
| } |
| _dispatch_op_debug("IO handler invoke: err %d", op, err); |
| handler(done, d, err); |
| _dispatch_release(channel); |
| _dispatch_fd_entry_release(fd_entry); |
| _dispatch_io_data_release(data); |
| }); |
| } |
| |
| #pragma mark - |
| #pragma mark dispatch_io_debug |
| |
| static size_t |
| _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz) |
| { |
| dispatch_queue_t target = channel->do_targetq; |
| return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, " |
| "queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = " |
| "%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %llu ", |
| channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random", |
| channel->fd_actual, channel->atomic_flags & DIO_STOPPED ? |
| "stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "", |
| channel->fd_entry, channel->queue, target && target->dq_label ? |
| target->dq_label : "", target, channel->barrier_queue, |
| channel->barrier_group, channel->err, channel->params.low, |
| channel->params.high, channel->params.interval_flags & |
| DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", |
| (unsigned long long) channel->params.interval); |
| } |
| |
| size_t |
| _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz) |
| { |
| size_t offset = 0; |
| offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", |
| dx_kind(channel), channel); |
| offset += _dispatch_object_debug_attr(channel, &buf[offset], |
| bufsiz - offset); |
| offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset); |
| offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); |
| return offset; |
| } |
| |
| static size_t |
| _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf, |
| size_t bufsiz) |
| { |
| dispatch_queue_t target = op->do_targetq; |
| dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL; |
| return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, " |
| "channel = %p, queue = %p -> %s[%p], target = %s[%p], " |
| "offset = %lld, length = %zu, done = %zu, undelivered = %zu, " |
| "flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, " |
| "interval%s = %llu ", op->params.type == DISPATCH_IO_STREAM ? |
| "stream" : "random", op->direction == DOP_DIR_READ ? "read" : |
| "write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry, |
| op->channel, op->op_q, oqtarget && oqtarget->dq_label ? |
| oqtarget->dq_label : "", oqtarget, target && target->dq_label ? |
| target->dq_label : "", target, (long long)op->offset, op->length, |
| op->total, op->undelivered + op->buf_len, op->flags, op->err, |
| op->params.low, op->params.high, op->params.interval_flags & |
| DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", |
| (unsigned long long)op->params.interval); |
| } |
| |
| size_t |
| _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz) |
| { |
| size_t offset = 0; |
| offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", |
| dx_kind(op), op); |
| offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset); |
| offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset); |
| offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); |
| return offset; |
| } |