Merge libdispatch-703.1.4
Signed-off-by: Daniel A. Steffen <dsteffen@apple.com>
diff --git a/dispatch/dispatch.h b/dispatch/dispatch.h
index 5fa0456..a26b951 100644
--- a/dispatch/dispatch.h
+++ b/dispatch/dispatch.h
@@ -53,7 +53,7 @@
#endif
#endif
-#define DISPATCH_API_VERSION 20160612
+#define DISPATCH_API_VERSION 20160712
#ifndef __DISPATCH_BUILDING_DISPATCH__
diff --git a/dispatch/group.h b/dispatch/group.h
index 5756a40..c50ad89 100644
--- a/dispatch/group.h
+++ b/dispatch/group.h
@@ -134,8 +134,7 @@
* @discussion
* This function waits for the completion of the blocks associated with the
* given dispatch group, and returns after all blocks have completed or when
- * the specified timeout has elapsed. When a timeout occurs, the group is
- * restored to its original state.
+ * the specified timeout has elapsed.
*
* This function will return immediately if there are no blocks associated
* with the dispatch group (i.e. the group is empty).
@@ -262,7 +261,7 @@
*
* @discussion
* Calling this function indicates block has completed and left the dispatch
- * groupJ by a means other than dispatch_group_async().
+ * group by a means other than dispatch_group_async().
*
* @param group
* The dispatch group to update.
diff --git a/man/dispatch_source_create.3 b/man/dispatch_source_create.3
index 1c1951b..4da708c 100644
--- a/man/dispatch_source_create.3
+++ b/man/dispatch_source_create.3
@@ -271,8 +271,8 @@
handler via a call to
.Fn dispatch_source_merge_data .
The data will be merged with the source's pending data via an atomic add or
-logic OR (based on the source's type), and the event handler block will be
-submitted to the source's target queue. The
+atomic bitwise OR (based on the source's type), and the event handler block will
+be submitted to the source's target queue. The
.Fa data
is application defined. These sources have no
.Fa handle
@@ -297,7 +297,8 @@
.Fn dispatch_source_get_data
indicates which of the events in the
.Fa mask
-were observed. Note that because this source type will request notifications on the provided port, it should not be mixed with the use of
+were observed. Note that because this source type will request notifications on
+the provided port, it should not be mixed with the use of
.Fn mach_port_request_notification
on the same port.
.Pp
@@ -314,8 +315,8 @@
.Pp
.Vt DISPATCH_SOURCE_TYPE_MEMORYPRESSURE
.Pp
-Sources of this type monitor the system memory pressure condition for state changes.
-The
+Sources of this type monitor the system memory pressure condition for state
+changes. The
.Fa handle
is unused and should be zero. The
.Fa mask
diff --git a/os/firehose_buffer_private.h b/os/firehose_buffer_private.h
index 0c85164..2c6466f 100644
--- a/os/firehose_buffer_private.h
+++ b/os/firehose_buffer_private.h
@@ -69,7 +69,7 @@
uint8_t fbc_data[FIREHOSE_BUFFER_CHUNK_SIZE
- sizeof(firehose_buffer_pos_u)
- sizeof(uint64_t)];
-} *firehose_buffer_chunk_t;
+} __attribute__((aligned(8))) *firehose_buffer_chunk_t;
typedef struct firehose_buffer_range_s {
uint16_t fbr_offset; // offset from the start of the buffer
diff --git a/private/private.h b/private/private.h
index 5a75335..3c37bed 100644
--- a/private/private.h
+++ b/private/private.h
@@ -66,7 +66,7 @@
#endif /* !__DISPATCH_BUILDING_DISPATCH__ */
// <rdar://problem/9627726> Check that public and private dispatch headers match
-#if DISPATCH_API_VERSION != 20160612 // Keep in sync with <dispatch/dispatch.h>
+#if DISPATCH_API_VERSION != 20160712 // Keep in sync with <dispatch/dispatch.h>
#error "Dispatch header mismatch between /usr/include and /usr/local/include"
#endif
diff --git a/private/queue_private.h b/private/queue_private.h
index 0acaceb..33de371 100644
--- a/private/queue_private.h
+++ b/private/queue_private.h
@@ -228,6 +228,23 @@
#endif /* __BLOCKS__ */
/*!
+ * @function dispatch_pthread_root_queue_copy_current
+ *
+ * @abstract
+ * Returns a reference to the pthread root queue object that has created the
+ * currently executing thread, or NULL if the current thread is not associated
+ * to a pthread root queue.
+ *
+ * @result
+ * A new reference to a pthread root queue object or NULL.
+ */
+__OSX_AVAILABLE(10.12) __IOS_AVAILABLE(10.0)
+__TVOS_AVAILABLE(10.0) __WATCHOS_AVAILABLE(3.0)
+DISPATCH_EXPORT DISPATCH_RETURNS_RETAINED DISPATCH_WARN_RESULT DISPATCH_NOTHROW
+dispatch_queue_t _Nullable
+dispatch_pthread_root_queue_copy_current(void);
+
+/*!
* @constant DISPATCH_APPLY_CURRENT_ROOT_QUEUE
* @discussion Constant to pass to the dispatch_apply() and dispatch_apply_f()
* functions to indicate that the root queue for the current thread should be
diff --git a/src/apply.c b/src/apply.c
index 57021e5..e051a16 100644
--- a/src/apply.c
+++ b/src/apply.c
@@ -87,6 +87,9 @@
_dispatch_thread_event_destroy(&da->da_event);
}
if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) {
+#if DISPATCH_INTROSPECTION
+ _dispatch_continuation_free(da->da_dc);
+#endif
_dispatch_continuation_free((dispatch_continuation_t)da);
}
}
@@ -145,6 +148,9 @@
});
} while (++idx < iter);
+#if DISPATCH_INTROSPECTION
+ _dispatch_continuation_free(da->da_dc);
+#endif
_dispatch_continuation_free((dispatch_continuation_t)da);
}
@@ -262,7 +268,12 @@
da->da_iterations = iterations;
da->da_nested = nested;
da->da_thr_cnt = thr_cnt;
+#if DISPATCH_INTROSPECTION
+ da->da_dc = _dispatch_continuation_alloc();
+ *da->da_dc = dc;
+#else
da->da_dc = &dc;
+#endif
da->da_flags = 0;
if (slowpath(dq->dq_width == 1) || slowpath(thr_cnt <= 1)) {
diff --git a/src/firehose/firehose_buffer.c b/src/firehose/firehose_buffer.c
index 5cdd9a7..1305bde 100644
--- a/src/firehose/firehose_buffer.c
+++ b/src/firehose/firehose_buffer.c
@@ -305,10 +305,10 @@
"Invalid values for MADVISE_CHUNK_COUNT / CHUNK_SIZE");
}
- kr = mach_vm_map(mach_task_self(), &vm_addr, sizeof(*fb),
- 0, VM_FLAGS_ANYWHERE | VM_MAKE_TAG(VM_MEMORY_GENEALOGY),
- MEMORY_OBJECT_NULL, 0, FALSE, VM_PROT_DEFAULT, VM_PROT_ALL,
- VM_INHERIT_NONE);
+ kr = mach_vm_map(mach_task_self(), &vm_addr, sizeof(*fb), 0,
+ VM_FLAGS_ANYWHERE | VM_FLAGS_PURGABLE |
+ VM_MAKE_TAG(VM_MEMORY_GENEALOGY), MEMORY_OBJECT_NULL, 0, FALSE,
+ VM_PROT_DEFAULT, VM_PROT_ALL, VM_INHERIT_NONE);
if (slowpath(kr)) {
if (kr != KERN_NO_SPACE) dispatch_assume_zero(kr);
firehose_mach_port_send_release(logd_port);
diff --git a/src/firehose/firehose_server.c b/src/firehose/firehose_server.c
index 1af06e0..a6be2fa 100644
--- a/src/firehose/firehose_server.c
+++ b/src/firehose/firehose_server.c
@@ -149,10 +149,12 @@
}
#define DRAIN_BATCH_SIZE 4
+#define FIREHOSE_DRAIN_FOR_IO 0x1
+#define FIREHOSE_DRAIN_POLL 0x2
OS_NOINLINE
static void
-firehose_client_drain(firehose_client_t fc, mach_port_t port, bool for_io)
+firehose_client_drain(firehose_client_t fc, mach_port_t port, uint32_t flags)
{
firehose_buffer_t fb = fc->fc_buffer;
firehose_buffer_chunk_t fbc;
@@ -161,6 +163,7 @@
uint16_t flushed, ref, count = 0;
uint16_t client_head, client_flushed, sent_flushed;
firehose_snapshot_t snapshot = NULL;
+ bool for_io = (flags & FIREHOSE_DRAIN_FOR_IO);
if (for_io) {
evt = FIREHOSE_EVENT_IO_BUFFER_RECEIVED;
@@ -201,9 +204,9 @@
}
}
- ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK;
// see firehose_buffer_ring_enqueue
do {
+ ref = (flushed + count) & FIREHOSE_RING_POS_IDX_MASK;
ref = os_atomic_load(&fbh_ring[ref], relaxed);
ref &= FIREHOSE_RING_POS_IDX_MASK;
} while (fc->fc_is_kernel && !ref);
@@ -251,13 +254,24 @@
firehose_client_notify(fc, port);
}
if (fc->fc_is_kernel) {
- // see firehose_client_kernel_source_handle_event
- dispatch_resume(fc->fc_kernel_source);
- } else if (fc->fc_use_notifs && count >= DRAIN_BATCH_SIZE) {
- // if we hit the drain batch size, the client probably logs a lot
- // and there's more to drain, so optimistically schedule draining again
- // this is cheap since the queue is hot, and is fair for other clients
- firehose_client_push_async_merge(fc, 0, for_io);
+ if (!(flags & FIREHOSE_DRAIN_POLL)) {
+ // see firehose_client_kernel_source_handle_event
+ dispatch_resume(fc->fc_kernel_source);
+ }
+ } else {
+ if (fc->fc_use_notifs && count >= DRAIN_BATCH_SIZE) {
+ // if we hit the drain batch size, the client probably logs a lot
+ // and there's more to drain, so optimistically schedule draining
+ // again this is cheap since the queue is hot, and is fair for other
+ // clients
+ firehose_client_push_async_merge(fc, 0, for_io);
+ }
+ if (count && server_config.fs_kernel_client) {
+ // the kernel is special because it can drop messages, so if we're
+ // draining, poll the kernel each time while we're bound to a thread
+ firehose_client_drain(server_config.fs_kernel_client,
+ MACH_PORT_NULL, flags | FIREHOSE_DRAIN_POLL);
+ }
}
return;
@@ -277,13 +291,13 @@
static void
firehose_client_drain_io_async(void *ctx)
{
- firehose_client_drain(ctx, MACH_PORT_NULL, true);
+ firehose_client_drain(ctx, MACH_PORT_NULL, FIREHOSE_DRAIN_FOR_IO);
}
static void
firehose_client_drain_mem_async(void *ctx)
{
- firehose_client_drain(ctx, MACH_PORT_NULL, false);
+ firehose_client_drain(ctx, MACH_PORT_NULL, 0);
}
OS_NOINLINE
@@ -751,8 +765,6 @@
{
struct firehose_server_s *fs = &server_config;
- dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port,
- MACH_PORT_NULL, NULL);
if (fs->fs_kernel_client) {
dispatch_async(fs->fs_io_drain_queue, ^{
struct firehose_client_connected_info_s fcci = {
@@ -761,6 +773,8 @@
firehose_client_resume(fs->fs_kernel_client, &fcci);
});
}
+ dispatch_mach_connect(fs->fs_mach_channel, fs->fs_bootstrap_port,
+ MACH_PORT_NULL, NULL);
}
#pragma mark -
@@ -1035,7 +1049,7 @@
if (extra_info_port && extra_info_size) {
mach_vm_address_t addr = 0;
kr = mach_vm_map(mach_task_self(), &addr, extra_info_size, 0,
- VM_FLAGS_ANYWHERE, mem_port, 0, FALSE,
+ VM_FLAGS_ANYWHERE, extra_info_port, 0, FALSE,
VM_PROT_READ, VM_PROT_READ, VM_INHERIT_NONE);
if (dispatch_assume_zero(kr)) {
mach_vm_deallocate(mach_task_self(), base_addr, mem_size);
@@ -1104,7 +1118,8 @@
}
block = dispatch_block_create_with_qos_class(flags, qos, 0, ^{
- firehose_client_drain(fc, reply_port, for_io);
+ firehose_client_drain(fc, reply_port,
+ for_io ? FIREHOSE_DRAIN_FOR_IO : 0);
});
dispatch_async(q, block);
_Block_release(block);
diff --git a/src/inline_internal.h b/src/inline_internal.h
index 5d941a2..d1c73dd 100644
--- a/src/inline_internal.h
+++ b/src/inline_internal.h
@@ -861,8 +861,6 @@
static inline void _dispatch_set_defaultpriority_override(void);
static inline void _dispatch_reset_defaultpriority(pthread_priority_t pp);
static inline pthread_priority_t _dispatch_get_priority(void);
-static inline void _dispatch_set_priority(pthread_priority_t pp,
- _dispatch_thread_set_self_t flags);
static inline pthread_priority_t _dispatch_set_defaultpriority(
pthread_priority_t pp, pthread_priority_t *new_pp);
@@ -1553,36 +1551,38 @@
}
struct _dispatch_identity_s {
- pthread_priority_t old_pri;
pthread_priority_t old_pp;
};
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queue_identity_assume(struct _dispatch_identity_s *di,
- pthread_priority_t pp, _dispatch_thread_set_self_t flags)
+ pthread_priority_t pp)
{
// assumed_rq was set by the caller, we need to fake the priorities
dispatch_queue_t assumed_rq = _dispatch_queue_get_current();
dispatch_assert(dx_type(assumed_rq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE);
- di->old_pri = _dispatch_get_priority();
- // _dispatch_root_queue_drain_deferred_item() may turn a manager thread
- // into a regular root queue, and we must never try to restore the manager
- // flag once we became a regular work queue thread.
- di->old_pri &= ~(pthread_priority_t)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
di->old_pp = _dispatch_get_defaultpriority();
- if (!pp) pp = di->old_pri;
- if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >
- (assumed_rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
- _dispatch_wqthread_override_start(_dispatch_tid_self(), pp);
- // Ensure that the root queue sees that this thread was overridden.
- _dispatch_set_defaultpriority_override();
+ if (!(assumed_rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG)) {
+ if (!pp) {
+ pp = _dispatch_get_priority();
+ // _dispatch_root_queue_drain_deferred_item() may turn a manager
+ // thread into a regular root queue, and we must never try to
+ // restore the manager flag once we became a regular work queue
+ // thread.
+ pp &= ~(pthread_priority_t)_PTHREAD_PRIORITY_EVENT_MANAGER_FLAG;
+ }
+ if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >
+ (assumed_rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
+ _dispatch_wqthread_override_start(_dispatch_tid_self(), pp);
+ // Ensure that the root queue sees that this thread was overridden.
+ _dispatch_set_defaultpriority_override();
+ }
}
_dispatch_reset_defaultpriority(assumed_rq->dq_priority);
- _dispatch_set_priority(assumed_rq->dq_priority, flags);
}
DISPATCH_ALWAYS_INLINE
@@ -1590,7 +1590,6 @@
_dispatch_root_queue_identity_restore(struct _dispatch_identity_s *di)
{
_dispatch_reset_defaultpriority(di->old_pp);
- _dispatch_set_priority(di->old_pri, 0);
}
typedef dispatch_queue_t
@@ -1631,7 +1630,7 @@
if (overriding) {
_dispatch_object_debug(dq, "stolen onto thread 0x%x, 0x%lx",
_dispatch_tid_self(), _dispatch_get_defaultpriority());
- _dispatch_root_queue_identity_assume(&di, 0, 0);
+ _dispatch_root_queue_identity_assume(&di, 0);
}
if (!(flags & DISPATCH_INVOKE_MANAGER_DRAIN)) {
@@ -1819,6 +1818,23 @@
}
#endif
+DISPATCH_ALWAYS_INLINE DISPATCH_CONST
+static inline dispatch_queue_t
+_dispatch_get_root_queue_with_overcommit(dispatch_queue_t rq, bool overcommit)
+{
+ bool rq_overcommit = (rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
+ // root queues in _dispatch_root_queues are not overcommit for even indices
+ // and overcommit for odd ones, so fixing overcommit is either returning
+ // the same queue, or picking its neighbour in _dispatch_root_queues
+ if (overcommit && !rq_overcommit) {
+ return rq + 1;
+ }
+ if (!overcommit && rq_overcommit) {
+ return rq - 1;
+ }
+ return rq;
+}
+
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_queue_set_bound_thread(dispatch_queue_t dq)
@@ -1927,10 +1943,16 @@
#if HAVE_PTHREAD_WORKQUEUE_QOS
const dispatch_priority_t rootqueue_flag = _PTHREAD_PRIORITY_ROOTQUEUE_FLAG;
const dispatch_priority_t inherited_flag = _PTHREAD_PRIORITY_INHERIT_FLAG;
+ const dispatch_priority_t defaultqueue_flag =
+ _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
dispatch_priority_t dqp = dq->dq_priority, tqp = tq->dq_priority;
if ((!(dqp & ~_PTHREAD_PRIORITY_FLAGS_MASK) || (dqp & inherited_flag)) &&
(tqp & rootqueue_flag)) {
- dq->dq_priority = (tqp & ~rootqueue_flag) | inherited_flag;
+ if (tqp & defaultqueue_flag) {
+ dq->dq_priority = 0;
+ } else {
+ dq->dq_priority = (tqp & ~rootqueue_flag) | inherited_flag;
+ }
}
#else
(void)dq; (void)tq;
@@ -2007,10 +2029,11 @@
#if HAVE_PTHREAD_WORKQUEUE_QOS
pthread_priority_t p = pp & ~_PTHREAD_PRIORITY_FLAGS_MASK;
pthread_priority_t rqp = rq->dq_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
- bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
+ pthread_priority_t defaultqueue =
+ rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
if (!p || (!defaultqueue && p < rqp)) {
- p = rqp;
+ p = rqp | defaultqueue;
}
return p | (rq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
#else
@@ -2032,12 +2055,11 @@
#endif
}
+#if HAVE_PTHREAD_WORKQUEUE_QOS
DISPATCH_ALWAYS_INLINE
static inline pthread_priority_t
-_dispatch_priority_compute_update(pthread_priority_t pp,
- _dispatch_thread_set_self_t flags)
+_dispatch_priority_compute_update(pthread_priority_t pp)
{
-#if HAVE_PTHREAD_WORKQUEUE_QOS
dispatch_assert(pp != DISPATCH_NO_PRIORITY);
if (!_dispatch_set_qos_class_enabled) return 0;
// the priority in _dispatch_get_priority() only tracks manager-ness
@@ -2051,11 +2073,7 @@
pthread_priority_t cur_priority = _dispatch_get_priority();
pthread_priority_t unbind = _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG;
pthread_priority_t overcommit = _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
- if (flags & DISPATCH_IGNORE_UNBIND) {
- // if DISPATCH_IGNORE_UNBIND is passed, we want to ignore the
- // difference if it is limited to the NEEDS_UNBIND flag
- cur_priority &= ~(unbind | overcommit);
- } else if (unlikely(cur_priority & unbind)) {
+ if (unlikely(cur_priority & unbind)) {
// else we always need an update if the NEEDS_UNBIND flag is set
// the slowpath in _dispatch_set_priority_and_voucher_slow() will
// adjust the priority further with the proper overcommitness
@@ -2064,11 +2082,9 @@
cur_priority &= ~overcommit;
}
if (unlikely(pp != cur_priority)) return pp;
-#else
- (void)pp; (void)flags;
-#endif
return 0;
}
+#endif
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline voucher_t
@@ -2076,7 +2092,7 @@
voucher_t v, _dispatch_thread_set_self_t flags)
{
#if HAVE_PTHREAD_WORKQUEUE_QOS
- pp = _dispatch_priority_compute_update(pp, flags);
+ pp = _dispatch_priority_compute_update(pp);
if (likely(!pp)) {
if (v == DISPATCH_NO_VOUCHER) {
return DISPATCH_NO_VOUCHER;
@@ -2129,21 +2145,6 @@
}
DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_set_priority(pthread_priority_t pp,
- _dispatch_thread_set_self_t flags)
-{
- dispatch_assert(pp != DISPATCH_NO_PRIORITY);
- pp = _dispatch_priority_compute_update(pp, flags);
- if (likely(!pp)) {
- return;
- }
-#if HAVE_PTHREAD_WORKQUEUE_QOS
- _dispatch_set_priority_and_mach_voucher_slow(pp, VOUCHER_NO_MACH_VOUCHER);
-#endif
-}
-
-DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_queue_need_override(dispatch_queue_class_t dqu, pthread_priority_t pp)
{
diff --git a/src/internal.h b/src/internal.h
index 9482850..a9aee11 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -408,8 +408,9 @@
void _dispatch_log(const char *msg, ...);
#endif // DISPATCH_USE_OS_DEBUG_LOG
-#define dsnprintf(...) \
- ({ int _r = snprintf(__VA_ARGS__); _r < 0 ? 0u : (size_t)_r; })
+#define dsnprintf(buf, siz, ...) \
+ ({ size_t _siz = siz; int _r = snprintf(buf, _siz, __VA_ARGS__); \
+ _r < 0 ? 0u : ((size_t)_r > _siz ? _siz : (size_t)_r); })
#if __GNUC__
#define dispatch_static_assert(e) ({ \
@@ -864,12 +865,16 @@
#define DISPATCH_TRACE_SUBCLASS_DEFAULT 0
#define DISPATCH_TRACE_SUBCLASS_VOUCHER 1
#define DISPATCH_TRACE_SUBCLASS_PERF 2
+#define DISPATCH_TRACE_SUBCLASS_MACH_MSG 3
+
#define DISPATCH_PERF_non_leaf_retarget DISPATCH_CODE(PERF, 1)
#define DISPATCH_PERF_post_activate_retarget DISPATCH_CODE(PERF, 2)
#define DISPATCH_PERF_post_activate_mutation DISPATCH_CODE(PERF, 3)
#define DISPATCH_PERF_delayed_registration DISPATCH_CODE(PERF, 4)
#define DISPATCH_PERF_mutable_target DISPATCH_CODE(PERF, 5)
+#define DISPATCH_MACH_MSG_hdr_move DISPATCH_CODE(MACH_MSG, 1)
+
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_ktrace_impl(uint32_t code, uint64_t a, uint64_t b,
@@ -1034,8 +1039,7 @@
DISPATCH_PRIORITY_ENFORCE = 0x1,
DISPATCH_VOUCHER_REPLACE = 0x2,
DISPATCH_VOUCHER_CONSUME = 0x4,
- DISPATCH_IGNORE_UNBIND = 0x8,
- DISPATCH_THREAD_PARK = 0x10,
+ DISPATCH_THREAD_PARK = 0x8,
);
DISPATCH_WARN_RESULT
static inline voucher_t _dispatch_adopt_priority_and_set_voucher(
diff --git a/src/io.c b/src/io.c
index e2a1232..e4f05ae 100644
--- a/src/io.c
+++ b/src/io.c
@@ -24,13 +24,6 @@
#define DISPATCH_IO_DEBUG DISPATCH_DEBUG
#endif
-#if DISPATCH_IO_DEBUG
-#define _dispatch_fd_debug(msg, fd, args...) \
- _dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
-#else
-#define _dispatch_fd_debug(msg, fd, args...)
-#endif
-
#if DISPATCH_DATA_IS_BRIDGED_TO_NSDATA
#define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
#define _dispatch_io_data_release(x) _dispatch_objc_release(x)
@@ -75,7 +68,7 @@
dispatch_operation_t operation, dispatch_data_t data);
static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
dispatch_io_t channel);
-static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
+static void _dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
dispatch_io_t channel);
static void _dispatch_stream_source_handler(void *ctx);
static void _dispatch_stream_queue_handler(void *ctx);
@@ -120,6 +113,38 @@
((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
#pragma mark -
+#pragma mark dispatch_io_debug
+
+#if DISPATCH_IO_DEBUG
+#if !DISPATCH_DEBUG
+#define _dispatch_io_log(x, ...) do { \
+ _dispatch_log("%llu\t%p\t" x, _dispatch_absolute_time(), \
+ (void *)_dispatch_thread_self(), ##__VA_ARGS__); \
+ } while (0)
+#ifdef _dispatch_object_debug
+#undef _dispatch_object_debug
+#define _dispatch_object_debug dispatch_debug
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
+#endif
+#else
+#define _dispatch_io_log(x, ...) _dispatch_debug(x, ##__VA_ARGS__)
+#endif // DISPATCH_DEBUG
+#else
+#define _dispatch_io_log(x, ...)
+#endif // DISPATCH_IO_DEBUG
+
+#define _dispatch_fd_debug(msg, fd, ...) \
+ _dispatch_io_log("fd[0x%x]: " msg, fd, ##__VA_ARGS__)
+#define _dispatch_op_debug(msg, op, ...) \
+ _dispatch_io_log("op[%p]: " msg, op, ##__VA_ARGS__)
+#define _dispatch_channel_debug(msg, channel, ...) \
+ _dispatch_io_log("channel[%p]: " msg, channel, ##__VA_ARGS__)
+#define _dispatch_fd_entry_debug(msg, fd_entry, ...) \
+ _dispatch_io_log("fd_entry[%p]: " msg, fd_entry, ##__VA_ARGS__)
+#define _dispatch_disk_debug(msg, disk, ...) \
+ _dispatch_io_log("disk[%p]: " msg, disk, ##__VA_ARGS__)
+
+#pragma mark -
#pragma mark dispatch_io_hashtables
// Global hashtable of dev_t -> disk_s mappings
@@ -227,7 +252,8 @@
_dispatch_retain(queue);
dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
dispatch_async(queue, ^{
- _dispatch_fd_debug("cleanup handler invoke", -1);
+ _dispatch_channel_debug("cleanup handler invoke: err %d",
+ channel, err);
cleanup_handler(err);
});
_dispatch_release(queue);
@@ -318,9 +344,9 @@
if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
return DISPATCH_BAD_INPUT;
}
- _dispatch_fd_debug("io create", fd);
dispatch_io_t channel = _dispatch_io_create(type);
channel->fd = fd;
+ _dispatch_channel_debug("create", channel);
channel->fd_actual = fd;
dispatch_suspend(channel->queue);
_dispatch_retain(queue);
@@ -374,9 +400,9 @@
if (!path_data) {
return DISPATCH_OUT_OF_MEMORY;
}
- _dispatch_fd_debug("io create with path %s", -1, path);
dispatch_io_t channel = _dispatch_io_create(type);
channel->fd = -1;
+ _dispatch_channel_debug("create with path %s", channel, path);
channel->fd_actual = -1;
path_data->channel = channel;
path_data->oflag = oflag;
@@ -461,8 +487,8 @@
if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
return DISPATCH_BAD_INPUT;
}
- _dispatch_fd_debug("io create with io %p", -1, in_channel);
dispatch_io_t channel = _dispatch_io_create(type);
+ _dispatch_channel_debug("create with channel %p", channel, in_channel);
dispatch_suspend(channel->queue);
_dispatch_retain(queue);
_dispatch_retain(channel);
@@ -569,7 +595,7 @@
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
- _dispatch_fd_debug("io set high water", channel->fd);
+ _dispatch_channel_debug("set high water: %zu", channel, high_water);
if (channel->params.low > high_water) {
channel->params.low = high_water;
}
@@ -583,7 +609,7 @@
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
- _dispatch_fd_debug("io set low water", channel->fd);
+ _dispatch_channel_debug("set low water: %zu", channel, low_water);
if (channel->params.high < low_water) {
channel->params.high = low_water ? low_water : 1;
}
@@ -598,7 +624,7 @@
{
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
- _dispatch_fd_debug("io set interval", channel->fd);
+ _dispatch_channel_debug("set interval: %llu", channel, interval);
channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
channel->params.interval_flags = flags;
_dispatch_release(channel);
@@ -642,7 +668,7 @@
static void
_dispatch_io_stop(dispatch_io_t channel)
{
- _dispatch_fd_debug("io stop", channel->fd);
+ _dispatch_channel_debug("stop", channel);
(void)os_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
_dispatch_retain(channel);
dispatch_async(channel->queue, ^{
@@ -650,7 +676,7 @@
_dispatch_object_debug(channel, "%s", __func__);
dispatch_fd_entry_t fd_entry = channel->fd_entry;
if (fd_entry) {
- _dispatch_fd_debug("io stop cleanup", channel->fd);
+ _dispatch_channel_debug("stop cleanup", channel);
_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
if (!(channel->atomic_flags & DIO_CLOSED)) {
channel->fd_entry = NULL;
@@ -661,8 +687,8 @@
_dispatch_retain(channel);
dispatch_async(_dispatch_io_fds_lockq, ^{
_dispatch_object_debug(channel, "%s", __func__);
- _dispatch_fd_debug("io stop after close cleanup",
- channel->fd);
+ _dispatch_channel_debug("stop cleanup after close",
+ channel);
dispatch_fd_entry_t fdi;
uintptr_t hash = DIO_HASH(channel->fd);
TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
@@ -697,7 +723,7 @@
dispatch_async(channel->queue, ^{
dispatch_async(channel->barrier_queue, ^{
_dispatch_object_debug(channel, "%s", __func__);
- _dispatch_fd_debug("io close", channel->fd);
+ _dispatch_channel_debug("close", channel);
if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
(void)os_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
relaxed);
@@ -967,10 +993,6 @@
{
// On channel queue
dispatch_assert(direction < DOP_DIR_MAX);
- _dispatch_fd_debug("operation create", channel->fd);
-#if DISPATCH_IO_DEBUG
- int fd = channel->fd;
-#endif
// Safe to call _dispatch_io_get_error() with channel->fd_entry since
// that can only be NULL if atomic_flags are set rdar://problem/8362514
int err = _dispatch_io_get_error(NULL, channel, false);
@@ -985,7 +1007,8 @@
} else if (direction == DOP_DIR_WRITE && !err) {
d = NULL;
}
- _dispatch_fd_debug("IO handler invoke", fd);
+ _dispatch_channel_debug("IO handler invoke: err %d", channel,
+ err);
handler(true, d, err);
_dispatch_io_data_release(data);
});
@@ -995,6 +1018,7 @@
}
dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
sizeof(struct dispatch_operation_s));
+ _dispatch_channel_debug("operation create: %p", channel, op);
op->do_next = DISPATCH_OBJECT_LISTLESS;
op->do_xref_cnt = -1; // operation object is not exposed externally
op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
@@ -1023,6 +1047,7 @@
_dispatch_operation_dispose(dispatch_operation_t op)
{
_dispatch_object_debug(op, "%s", __func__);
+ _dispatch_op_debug("dispose", op);
// Deliver the data if there's any
if (op->fd_entry) {
_dispatch_operation_deliver_data(op, DOP_DONE);
@@ -1049,6 +1074,7 @@
dispatch_release(op->op_q);
}
Block_release(op->handler);
+ _dispatch_op_debug("disposed", op);
}
static void
@@ -1071,6 +1097,7 @@
handler(true, d, err);
_dispatch_io_data_release(data);
});
+ _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
_dispatch_release(op);
return;
}
@@ -1098,13 +1125,14 @@
dispatch_queue_t tq, dispatch_data_t data)
{
// On stream queue or disk queue
- _dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
+ _dispatch_op_debug("enqueue", op);
_dispatch_io_data_retain(data);
op->data = data;
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
op->err = err;
// Final release
+ _dispatch_op_debug("release -> %d, err %d", op, op->do_ref_cnt, err);
_dispatch_release(op);
return false;
}
@@ -1241,7 +1269,6 @@
dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
_dispatch_io_fds_lockq_init);
dispatch_async(_dispatch_io_fds_lockq, ^{
- _dispatch_fd_debug("fd entry init", fd);
dispatch_fd_entry_t fd_entry = NULL;
// Check to see if there is an existing entry for the given fd
uintptr_t hash = DIO_HASH(fd);
@@ -1257,8 +1284,9 @@
// If we did not find an existing entry, create one
fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
}
+ _dispatch_fd_entry_debug("init", fd_entry);
dispatch_async(fd_entry->barrier_queue, ^{
- _dispatch_fd_debug("fd entry init completion", fd);
+ _dispatch_fd_entry_debug("init completion", fd_entry);
completion_callback(fd_entry);
// stat() is complete, release reference to fd_entry
_dispatch_fd_entry_release(fd_entry);
@@ -1286,16 +1314,16 @@
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
{
// On fds lock queue
- _dispatch_fd_debug("fd entry create", fd);
dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
_dispatch_io_fds_lockq);
+ _dispatch_fd_entry_debug("create: fd %d", fd_entry, fd);
fd_entry->fd = fd;
TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
fd_entry->barrier_queue = dispatch_queue_create(
"com.apple.libdispatch-io.barrierq", NULL);
fd_entry->barrier_group = dispatch_group_create();
dispatch_async(fd_entry->barrier_queue, ^{
- _dispatch_fd_debug("fd entry stat", fd);
+ _dispatch_fd_entry_debug("stat", fd_entry);
int err, orig_flags, orig_nosigpipe = -1;
struct stat st;
_dispatch_io_syscall_switch(err,
@@ -1367,7 +1395,7 @@
// all operations associated with this entry have been freed
dispatch_async(fd_entry->close_queue, ^{
if (!fd_entry->disk) {
- _dispatch_fd_debug("close queue fd_entry cleanup", fd);
+ _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
dispatch_op_direction_t dir;
for (dir = 0; dir < DOP_DIR_MAX; dir++) {
_dispatch_stream_dispose(fd_entry, dir);
@@ -1385,11 +1413,11 @@
// source cancels it and suspends the close queue. Freeing the fd_entry
// structure must happen after the source cancel handler has finished
dispatch_async(fd_entry->close_queue, ^{
- _dispatch_fd_debug("close queue release", fd);
+ _dispatch_fd_entry_debug("close queue release", fd_entry);
dispatch_release(fd_entry->close_queue);
- _dispatch_fd_debug("barrier queue release", fd);
+ _dispatch_fd_entry_debug("barrier queue release", fd_entry);
dispatch_release(fd_entry->barrier_queue);
- _dispatch_fd_debug("barrier group release", fd);
+ _dispatch_fd_entry_debug("barrier group release", fd_entry);
dispatch_release(fd_entry->barrier_group);
if (fd_entry->orig_flags != -1) {
_dispatch_io_syscall(
@@ -1418,9 +1446,9 @@
dev_t dev, mode_t mode)
{
// On devs lock queue
- _dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
path_data->channel->queue);
+ _dispatch_fd_entry_debug("create: path %s", fd_entry, path_data->path);
if (S_ISREG(mode)) {
_dispatch_disk_init(fd_entry, major(dev));
} else {
@@ -1439,7 +1467,7 @@
// that the channel associated with this entry has been closed and that
// all operations associated with this entry have been freed
dispatch_async(fd_entry->close_queue, ^{
- _dispatch_fd_debug("close queue fd_entry cleanup", -1);
+ _dispatch_fd_entry_debug("close queue cleanup", fd_entry);
if (!fd_entry->disk) {
dispatch_op_direction_t dir;
for (dir = 0; dir < DOP_DIR_MAX; dir++) {
@@ -1458,7 +1486,7 @@
}
});
dispatch_async(fd_entry->close_queue, ^{
- _dispatch_fd_debug("close queue release", -1);
+ _dispatch_fd_entry_debug("close queue release", fd_entry);
dispatch_release(fd_entry->close_queue);
dispatch_release(fd_entry->barrier_queue);
dispatch_release(fd_entry->barrier_group);
@@ -1511,7 +1539,7 @@
}
_dispatch_fd_entry_retain(fd_entry);
dispatch_async(fd_entry->disk->pick_queue, ^{
- _dispatch_disk_cleanup_operations(fd_entry->disk, channel);
+ _dispatch_disk_cleanup_inactive_operations(fd_entry->disk, channel);
_dispatch_fd_entry_release(fd_entry);
if (channel) {
_dispatch_release(channel);
@@ -1683,7 +1711,7 @@
{
// On stream queue
_dispatch_object_debug(op, "%s", __func__);
- _dispatch_fd_debug("complete operation", op->fd_entry->fd);
+ _dispatch_op_debug("complete: stream %p", op, stream);
TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
if (op == stream->op) {
stream->op = NULL;
@@ -1692,6 +1720,7 @@
dispatch_source_cancel(op->timer);
}
// Final release will deliver any pending data
+ _dispatch_op_debug("release -> %d (stream complete)", op, op->do_ref_cnt);
_dispatch_release(op);
}
@@ -1700,7 +1729,7 @@
{
// On pick queue
_dispatch_object_debug(op, "%s", __func__);
- _dispatch_fd_debug("complete operation", op->fd_entry->fd);
+ _dispatch_op_debug("complete: disk %p", op, disk);
// Current request is always the last op returned
if (disk->cur_rq == op) {
disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
@@ -1719,6 +1748,7 @@
dispatch_source_cancel(op->timer);
}
// Final release will deliver any pending data
+ _dispatch_op_debug("release -> %d (disk complete)", op, op->do_ref_cnt);
_dispatch_release(op);
}
@@ -1806,18 +1836,34 @@
}
}
-static void
-_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
+static inline void
+_dispatch_disk_cleanup_specified_operations(dispatch_disk_t disk,
+ dispatch_io_t channel, bool inactive_only)
{
// On pick queue
dispatch_operation_t op, tmp;
TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
+ if (inactive_only && op->active) continue;
if (!channel || op->channel == channel) {
+ _dispatch_op_debug("cleanup: disk %p", op, disk);
_dispatch_disk_complete_operation(disk, op);
}
}
}
+static void
+_dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
+{
+ _dispatch_disk_cleanup_specified_operations(disk, channel, false);
+}
+
+static void
+_dispatch_disk_cleanup_inactive_operations(dispatch_disk_t disk,
+ dispatch_io_t channel)
+{
+ _dispatch_disk_cleanup_specified_operations(disk, channel, true);
+}
+
#pragma mark -
#pragma mark dispatch_stream_handler/dispatch_disk_handler
@@ -1829,7 +1875,7 @@
return stream->source;
}
dispatch_fd_t fd = op->fd_entry->fd;
- _dispatch_fd_debug("stream source create", fd);
+ _dispatch_op_debug("stream source create", op);
dispatch_source_t source = NULL;
if (op->direction == DOP_DIR_READ) {
source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
@@ -1848,7 +1894,7 @@
// unregistered
dispatch_queue_t close_queue = op->fd_entry->close_queue;
dispatch_source_set_cancel_handler(source, ^{
- _dispatch_fd_debug("stream source cancel", fd);
+ _dispatch_op_debug("stream source cancel", op);
dispatch_resume(close_queue);
});
stream->source = source;
@@ -1896,13 +1942,13 @@
goto pick;
}
stream->op = op;
- _dispatch_fd_debug("stream handler", op->fd_entry->fd);
+ _dispatch_op_debug("stream handler", op);
dispatch_fd_entry_t fd_entry = op->fd_entry;
_dispatch_fd_entry_retain(fd_entry);
// For performance analysis
if (!op->total && dispatch_io_defaults.initial_delivery) {
// Empty delivery to signal the start of the operation
- _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
+ _dispatch_op_debug("initial delivery", op);
_dispatch_operation_deliver_data(op, DOP_DELIVER);
}
// TODO: perform on the operation target queue to get correct priority
@@ -1960,7 +2006,7 @@
if (disk->io_active) {
return;
}
- _dispatch_fd_debug("disk handler", -1);
+ _dispatch_disk_debug("disk handler", disk);
dispatch_operation_t op;
size_t i = disk->free_idx, j = disk->req_idx;
if (j <= i) {
@@ -1976,8 +2022,10 @@
continue;
}
_dispatch_retain(op);
+ _dispatch_op_debug("retain -> %d", op, op->do_ref_cnt + 1);
disk->advise_list[i%disk->advise_list_depth] = op;
op->active = true;
+ _dispatch_op_debug("activate: disk %p", op, disk);
_dispatch_object_debug(op, "%s", __func__);
} else {
// No more operations to get
@@ -1989,6 +2037,7 @@
op = disk->advise_list[disk->req_idx];
if (op) {
disk->io_active = true;
+ _dispatch_op_debug("async perform: disk %p", op, disk);
dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
}
}
@@ -1997,8 +2046,8 @@
_dispatch_disk_perform(void *ctxt)
{
dispatch_disk_t disk = ctxt;
+ _dispatch_disk_debug("disk perform", disk);
size_t chunk_size = dispatch_io_defaults.chunk_size;
- _dispatch_fd_debug("disk perform", -1);
dispatch_operation_t op;
size_t i = disk->advise_idx, j = disk->free_idx;
if (j <= i) {
@@ -2022,7 +2071,7 @@
// For performance analysis
if (!op->total && dispatch_io_defaults.initial_delivery) {
// Empty delivery to signal the start of the operation
- _dispatch_fd_debug("initial delivery", op->fd_entry->fd);
+ _dispatch_op_debug("initial delivery", op);
_dispatch_operation_deliver_data(op, DOP_DELIVER);
}
// Advise two chunks if the list only has one element and this is the
@@ -2038,7 +2087,9 @@
int result = _dispatch_operation_perform(op);
disk->advise_list[disk->req_idx] = NULL;
disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
+ _dispatch_op_debug("async perform completion: disk %p", op, disk);
dispatch_async(disk->pick_queue, ^{
+ _dispatch_op_debug("perform completion", op);
switch (result) {
case DISPATCH_OP_DELIVER:
_dispatch_operation_deliver_data(op, DOP_DEFAULT);
@@ -2060,12 +2111,15 @@
dispatch_assert(result);
break;
}
+ _dispatch_op_debug("deactivate: disk %p", op, disk);
op->active = false;
disk->io_active = false;
_dispatch_disk_handler(disk);
// Balancing the retain in _dispatch_disk_handler. Note that op must be
// released at the very end, since it might hold the last reference to
// the disk
+ _dispatch_op_debug("release -> %d (disk perform complete)", op,
+ op->do_ref_cnt);
_dispatch_release(op);
});
}
@@ -2076,6 +2130,8 @@
static void
_dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
{
+ _dispatch_op_debug("advise", op);
+ if (_dispatch_io_get_error(op, NULL, true)) return;
#ifdef __linux__
// linux does not support fcntl (F_RDAVISE)
// define necessary datastructure and use readahead
@@ -2123,6 +2179,7 @@
static int
_dispatch_operation_perform(dispatch_operation_t op)
{
+ _dispatch_op_debug("perform", op);
int err = _dispatch_io_get_error(op, NULL, true);
if (err) {
goto error;
@@ -2151,7 +2208,7 @@
op->buf_siz = max_buf_siz;
}
op->buf = valloc(op->buf_siz);
- _dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
+ _dispatch_op_debug("buffer allocated", op);
} else if (op->direction == DOP_DIR_WRITE) {
// Always write the first data piece, if that is smaller than a
// chunk, accumulate further data pieces until chunk size is reached
@@ -2177,7 +2234,7 @@
op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
NULL);
_dispatch_io_data_release(d);
- _dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
+ _dispatch_op_debug("buffer mapped", op);
}
}
if (op->fd_entry->fd == -1) {
@@ -2214,7 +2271,7 @@
}
// EOF is indicated by two handler invocations
if (processed == 0) {
- _dispatch_fd_debug("EOF", op->fd_entry->fd);
+ _dispatch_op_debug("performed: EOF", op);
return DISPATCH_OP_DELIVER_AND_COMPLETE;
}
op->buf_len += (size_t)processed;
@@ -2230,7 +2287,7 @@
if (err == EAGAIN) {
// For disk based files with blocking I/O we should never get EAGAIN
dispatch_assert(!op->fd_entry->disk);
- _dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
+ _dispatch_op_debug("performed: EAGAIN", op);
if (op->direction == DOP_DIR_READ && op->total &&
op->channel == op->fd_entry->convenience_channel) {
// Convenience read with available data completes on EAGAIN
@@ -2238,6 +2295,7 @@
}
return DISPATCH_OP_RESUME;
}
+ _dispatch_op_debug("performed: err %d", op, err);
op->err = err;
switch (err) {
case ECANCELED:
@@ -2267,7 +2325,7 @@
deliver = true;
} else if (op->buf_len < op->buf_siz) {
// Request buffer is not yet used up
- _dispatch_fd_debug("buffer data", op->fd_entry->fd);
+ _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
return;
}
} else {
@@ -2321,17 +2379,14 @@
}
if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
op->undelivered = undelivered;
- _dispatch_fd_debug("buffer data", op->fd_entry->fd);
+ _dispatch_op_debug("buffer data: undelivered %zu", op, undelivered);
return;
}
op->undelivered = 0;
_dispatch_object_debug(op, "%s", __func__);
- _dispatch_fd_debug("deliver data", op->fd_entry->fd);
+ _dispatch_op_debug("deliver data", op);
dispatch_op_direction_t direction = op->direction;
dispatch_io_handler_t handler = op->handler;
-#if DISPATCH_IO_DEBUG
- int fd = op->fd_entry->fd;
-#endif
dispatch_fd_entry_t fd_entry = op->fd_entry;
_dispatch_fd_entry_retain(fd_entry);
dispatch_io_t channel = op->channel;
@@ -2343,7 +2398,7 @@
if (done) {
if (direction == DOP_DIR_READ && err) {
if (dispatch_data_get_size(d)) {
- _dispatch_fd_debug("IO handler invoke", fd);
+ _dispatch_op_debug("IO handler invoke", op);
handler(false, d, 0);
}
d = NULL;
@@ -2351,7 +2406,7 @@
d = NULL;
}
}
- _dispatch_fd_debug("IO handler invoke", fd);
+ _dispatch_op_debug("IO handler invoke: err %d", op, err);
handler(done, d, err);
_dispatch_release(channel);
_dispatch_fd_entry_release(fd_entry);
diff --git a/src/once.c b/src/once.c
index 82885cc..d7d6a8e 100644
--- a/src/once.c
+++ b/src/once.c
@@ -60,7 +60,7 @@
dispatch_thread_event_t event;
if (os_atomic_cmpxchg(vval, NULL, tail, acquire)) {
- dow.dow_thread = _dispatch_thread_port();
+ dow.dow_thread = _dispatch_tid_self();
_dispatch_client_callout(ctxt, func);
// The next barrier must be long and strong.
diff --git a/src/queue.c b/src/queue.c
index 2602994..58c545b 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -1235,56 +1235,38 @@
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
- _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
- qos_class_t qos = dqa->dqa_qos_class;
- dispatch_queue_flags_t dqf = 0;
- const void *vtable;
+ //
+ // Step 1: Normalize arguments (qos, overcommit, tq)
+ //
+ qos_class_t qos = dqa->dqa_qos_class;
+#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
+ if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
+ !_dispatch_root_queues[
+ DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
+ qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
+ }
+#endif
+ bool maintenance_fallback = false;
+#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
+ maintenance_fallback = true;
+#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
+ if (maintenance_fallback) {
+ if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
+ !_dispatch_root_queues[
+ DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
+ qos = _DISPATCH_QOS_CLASS_BACKGROUND;
+ }
+ }
+
+ _dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
- "a non global target queue");
+ "a non-global target queue");
}
}
- if (legacy) {
- // if any of these attributes is specified, use non legacy classes
- if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency
-#if 0 // <rdar://problem/23211394>
- || overcommit != _dispatch_queue_attr_overcommit_unspecified
-#endif
- ) {
- legacy = false;
- }
- }
- if (legacy) {
- vtable = DISPATCH_VTABLE(queue);
- } else if (dqa->dqa_concurrent) {
- vtable = DISPATCH_VTABLE(queue_concurrent);
- } else {
- vtable = DISPATCH_VTABLE(queue_serial);
- }
- switch (dqa->dqa_autorelease_frequency) {
- case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
- dqf |= DQF_AUTORELEASE_NEVER;
- break;
- case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
- dqf |= DQF_AUTORELEASE_ALWAYS;
- break;
- }
- if (label) {
- const char *tmp = _dispatch_strdup_if_mutable(label);
- if (tmp != label) {
- dqf |= DQF_LABEL_NEEDS_FREE;
- label = tmp;
- }
- }
- dispatch_queue_t dq = _dispatch_alloc(vtable,
- sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
- _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
- DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
-
- dq->dq_label = label;
if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
@@ -1296,15 +1278,8 @@
}
}
if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
- if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
- if (!(tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG)) {
- tq++;
- }
- } else {
- if (tq->dq_priority & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG) {
- tq--;
- }
- }
+ tq = _dispatch_get_root_queue_with_overcommit(tq,
+ overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
@@ -1327,43 +1302,68 @@
_dispatch_queue_attr_overcommit_enabled;
}
}
+ if (!tq) {
+ qos_class_t tq_qos = qos == _DISPATCH_QOS_CLASS_UNSPECIFIED ?
+ _DISPATCH_QOS_CLASS_DEFAULT : qos;
+ tq = _dispatch_get_root_queue(tq_qos, overcommit ==
+ _dispatch_queue_attr_overcommit_enabled);
+ if (slowpath(!tq)) {
+ DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
+ }
+ }
+
+ //
+ // Step 2: Initialize the queue
+ //
+
+ if (legacy) {
+ // if any of these attributes is specified, use non legacy classes
+ if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
+ legacy = false;
+ }
+ }
+
+ const void *vtable;
+ dispatch_queue_flags_t dqf = 0;
+ if (legacy) {
+ vtable = DISPATCH_VTABLE(queue);
+ } else if (dqa->dqa_concurrent) {
+ vtable = DISPATCH_VTABLE(queue_concurrent);
+ } else {
+ vtable = DISPATCH_VTABLE(queue_serial);
+ }
+ switch (dqa->dqa_autorelease_frequency) {
+ case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
+ dqf |= DQF_AUTORELEASE_NEVER;
+ break;
+ case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
+ dqf |= DQF_AUTORELEASE_ALWAYS;
+ break;
+ }
+ if (label) {
+ const char *tmp = _dispatch_strdup_if_mutable(label);
+ if (tmp != label) {
+ dqf |= DQF_LABEL_NEEDS_FREE;
+ label = tmp;
+ }
+ }
+
+ dispatch_queue_t dq = _dispatch_alloc(vtable,
+ sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
+ _dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
+ DISPATCH_QUEUE_WIDTH_MAX : 1, dqa->dqa_inactive);
+
+ dq->dq_label = label;
+
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = (dispatch_priority_t)_pthread_qos_class_encode(qos,
dqa->dqa_relative_priority,
overcommit == _dispatch_queue_attr_overcommit_enabled ?
_PTHREAD_PRIORITY_OVERCOMMIT_FLAG : 0);
#endif
- if (!tq) {
- if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
- qos = _DISPATCH_QOS_CLASS_DEFAULT;
- }
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- if (qos == _DISPATCH_QOS_CLASS_USER_INTERACTIVE &&
- !_dispatch_root_queues[
- DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS].dq_priority) {
- qos = _DISPATCH_QOS_CLASS_USER_INITIATED;
- }
-#endif
- bool maintenance_fallback = false;
-#if DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- maintenance_fallback = true;
-#endif // DISPATCH_USE_NOQOS_WORKQUEUE_FALLBACK
- if (maintenance_fallback) {
- if (qos == _DISPATCH_QOS_CLASS_MAINTENANCE &&
- !_dispatch_root_queues[
- DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS].dq_priority) {
- qos = _DISPATCH_QOS_CLASS_BACKGROUND;
- }
- }
-
- tq = _dispatch_get_root_queue(qos, overcommit ==
- _dispatch_queue_attr_overcommit_enabled);
- if (slowpath(!tq)) {
- DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
- }
- } else {
+ _dispatch_retain(tq);
+ if (qos == _DISPATCH_QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
- _dispatch_retain(tq);
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
@@ -2194,6 +2194,21 @@
}
#endif
+dispatch_queue_t
+dispatch_pthread_root_queue_copy_current(void)
+{
+ dispatch_queue_t dq = _dispatch_queue_get_current();
+ if (!dq) return NULL;
+ while (slowpath(dq->do_targetq)) {
+ dq = dq->do_targetq;
+ }
+ if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
+ dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
+ return NULL;
+ }
+ return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
+}
+
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
void
@@ -2550,7 +2565,7 @@
if (!pflags) return;
int r = _pthread_set_properties_self(pflags, pp, kv);
if (r == EINVAL) {
- DISPATCH_INTERNAL_CRASH(0, "_pthread_set_properties_self failed");
+ DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
}
(void)dispatch_assume_zero(r);
}
@@ -3174,10 +3189,24 @@
dispatch_thread_frame_s dtf;
struct dispatch_continuation_s *other_dc = dc->dc_other;
dispatch_invoke_flags_t ctxt_flags = (dispatch_invoke_flags_t)dc->dc_ctxt;
- dispatch_queue_t dq = dc->dc_data, rq, old_dq, old_rq = NULL;
+ // if we went through _dispatch_root_queue_push_override,
+ // the "right" root queue was stuffed into dc_func
+ dispatch_queue_t assumed_rq = (dispatch_queue_t)dc->dc_func;
+ dispatch_queue_t dq = dc->dc_data, rq, old_dq;
+ struct _dispatch_identity_s di;
pthread_priority_t op, dp, old_dp;
+ if (ctxt_flags) {
+ flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
+ flags |= ctxt_flags;
+ }
+ old_dq = _dispatch_get_current_queue();
+ if (assumed_rq) {
+ _dispatch_queue_set_current(assumed_rq);
+ _dispatch_root_queue_identity_assume(&di, 0);
+ }
+
old_dp = _dispatch_set_defaultpriority(dq->dq_priority, &dp);
op = dq->dq_override;
if (op > (dp & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
@@ -3186,28 +3215,19 @@
_dispatch_set_defaultpriority_override();
}
- if (ctxt_flags) {
- flags &= ~_DISPATCH_INVOKE_AUTORELEASE_MASK;
- flags |= ctxt_flags;
- }
- if (dc->dc_func) {
- // if we went through _dispatch_root_queue_push_override,
- // the "right" root queue was stuffed into dc_func
- old_rq = _dispatch_get_current_queue();
- _dispatch_queue_set_current((dispatch_queue_t)dc->dc_func);
- }
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_continuation_pop_forwarded(dc, DISPATCH_NO_VOUCHER,
DISPATCH_OBJ_CONSUME_BIT, {
_dispatch_continuation_pop(other_dc, dq, flags);
});
- _dispatch_reset_defaultpriority(old_dp);
_dispatch_thread_frame_pop(&dtf);
- if (old_rq) {
- _dispatch_queue_set_current(old_rq);
+ if (assumed_rq) {
+ _dispatch_root_queue_identity_restore(&di);
+ _dispatch_queue_set_current(old_dq);
}
+ _dispatch_reset_defaultpriority(old_dp);
+
rq = dq->do_targetq;
- old_dq = _dispatch_get_current_queue();
while (slowpath(rq->do_targetq) && rq != old_dq) {
_dispatch_non_barrier_complete(rq);
rq = rq->do_targetq;
@@ -4230,9 +4250,6 @@
int r;
_dispatch_debug_root_queue(dq, __func__);
- dispatch_once_f(&_dispatch_root_queues_pred, NULL,
- _dispatch_root_queues_init_once);
-
#if HAVE_PTHREAD_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
if (qc->dgq_kworkqueue != (void*)(~0ul))
@@ -4863,13 +4880,30 @@
DISPATCH_ALWAYS_INLINE
static inline bool
-_dispatch_root_queue_push_needs_override(dispatch_queue_t rq,
+_dispatch_need_global_root_queue_push_override(dispatch_queue_t rq,
pthread_priority_t pp)
{
- if (dx_type(rq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE || !rq->dq_priority) {
- return false;
- }
- return pp > (rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK);
+ pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
+ bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
+
+ if (unlikely(!rqp)) return false;
+
+ pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
+ return defaultqueue ? pp && pp != rqp : pp > rqp;
+}
+
+DISPATCH_ALWAYS_INLINE
+static inline bool
+_dispatch_need_global_root_queue_push_override_stealer(dispatch_queue_t rq,
+ pthread_priority_t pp)
+{
+ pthread_priority_t rqp = rq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK;
+ bool defaultqueue = rq->dq_priority & _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG;
+
+ if (unlikely(!rqp)) return false;
+
+ pp &= _PTHREAD_PRIORITY_QOS_CLASS_MASK;
+ return defaultqueue || pp > rqp;
}
DISPATCH_NOINLINE
@@ -5006,8 +5040,10 @@
}
apply_again:
- if (_dispatch_root_queue_push_needs_override(tq, pp)) {
- _dispatch_root_queue_push_override_stealer(tq, dq, pp);
+ if (dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
+ if (_dispatch_need_global_root_queue_push_override_stealer(tq, pp)) {
+ _dispatch_root_queue_push_override_stealer(tq, dq, pp);
+ }
} else if (_dispatch_queue_need_override(tq, pp)) {
dx_wakeup(tq, pp, DISPATCH_WAKEUP_OVERRIDING);
}
@@ -5095,8 +5131,7 @@
dq = old_dq;
dou._do = old_dou;
}
- if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >
- (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
+ if (_dispatch_need_global_root_queue_push_override(dq, pp)) {
return _dispatch_root_queue_push_override(dq, dou, pp);
}
// bit of cheating: we should really pass `pp` but we know that we are
@@ -5108,22 +5143,36 @@
#endif
DISPATCH_NOINLINE
+static void
+_dispatch_queue_push_slow(dispatch_queue_t dq, dispatch_object_t dou,
+ pthread_priority_t pp)
+{
+ dispatch_once_f(&_dispatch_root_queues_pred, NULL,
+ _dispatch_root_queues_init_once);
+ _dispatch_queue_push(dq, dou, pp);
+}
+
+DISPATCH_NOINLINE
void
_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
pthread_priority_t pp)
{
_dispatch_assert_is_valid_qos_override(pp);
- if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE && dq->dq_priority) {
+ if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) {
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (unlikely(ddi && !(ddi->ddi_stashed_pp &
(dispatch_priority_t)_PTHREAD_PRIORITY_FLAGS_MASK))) {
+ dispatch_assert(_dispatch_root_queues_pred == DLOCK_ONCE_DONE);
return _dispatch_trystash_to_deferred_items(dq, dou, pp, ddi);
}
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
- if ((pp & _PTHREAD_PRIORITY_QOS_CLASS_MASK) >
- (dq->dq_priority & _PTHREAD_PRIORITY_QOS_CLASS_MASK)) {
+ // can't use dispatch_once_f() as it would create a frame
+ if (unlikely(_dispatch_root_queues_pred != DLOCK_ONCE_DONE)) {
+ return _dispatch_queue_push_slow(dq, dou, pp);
+ }
+ if (_dispatch_need_global_root_queue_push_override(dq, pp)) {
return _dispatch_root_queue_push_override(dq, dou, pp);
}
#endif
@@ -5213,7 +5262,7 @@
flags ^= DISPATCH_WAKEUP_SLOW_WAITER;
dispatch_assert(!(flags & DISPATCH_WAKEUP_CONSUME));
- os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release,{
+ os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = old_state | bits;
if (_dq_state_drain_pended(old_state)) {
// same as DISPATCH_QUEUE_DRAIN_UNLOCK_PRESERVE_WAITERS_BIT
@@ -5221,21 +5270,22 @@
new_state &= ~DISPATCH_QUEUE_DRAIN_OWNER_MASK;
new_state &= ~DISPATCH_QUEUE_DRAIN_PENDED;
}
- if (likely(_dq_state_is_runnable(new_state) &&
- !_dq_state_drain_locked(new_state))) {
- if (_dq_state_has_pending_barrier(old_state) ||
- new_state + pending_barrier_width <
- DISPATCH_QUEUE_WIDTH_FULL_BIT) {
- // see _dispatch_queue_drain_try_lock
- new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
- new_state ^= xor_owner_and_set_full_width_and_in_barrier;
- } else {
- new_state |= DISPATCH_QUEUE_ENQUEUED;
- }
- } else if (_dq_state_drain_locked(new_state)) {
+ if (unlikely(_dq_state_drain_locked(new_state))) {
#ifdef DLOCK_NOWAITERS_BIT
new_state &= ~(uint64_t)DLOCK_NOWAITERS_BIT;
#endif
+ } else if (unlikely(!_dq_state_is_runnable(new_state) ||
+ !(flags & DISPATCH_WAKEUP_FLUSH))) {
+ // either not runnable, or was not for the first item (26700358)
+ // so we should not try to lock and handle overrides instead
+ } else if (_dq_state_has_pending_barrier(old_state) ||
+ new_state + pending_barrier_width <
+ DISPATCH_QUEUE_WIDTH_FULL_BIT) {
+ // see _dispatch_queue_drain_try_lock
+ new_state &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
+ new_state ^= xor_owner_and_set_full_width_and_in_barrier;
+ } else {
+ new_state |= DISPATCH_QUEUE_ENQUEUED;
}
});
if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
@@ -5400,7 +5450,7 @@
pp = _dispatch_priority_inherit_from_root_queue(pp, dq);
_dispatch_queue_set_current(dq);
- _dispatch_root_queue_identity_assume(&di, pp, DISPATCH_IGNORE_UNBIND);
+ _dispatch_root_queue_identity_assume(&di, pp);
#if DISPATCH_COCOA_COMPAT
void *pool = _dispatch_last_resort_autorelease_pool_push();
#endif // DISPATCH_COCOA_COMPAT
diff --git a/src/shims/lock.h b/src/shims/lock.h
index 5c2dfc5..246c807 100644
--- a/src/shims/lock.h
+++ b/src/shims/lock.h
@@ -497,12 +497,8 @@
DLOCK_GATE_UNLOCKED, tid_self, acquire));
}
-DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_gate_wait(dispatch_gate_t l, dispatch_lock_options_t flags)
-{
- _dispatch_gate_wait_slow(l, DLOCK_GATE_UNLOCKED, flags);
-}
+#define _dispatch_gate_wait(l, flags) \
+ _dispatch_gate_wait_slow(l, DLOCK_GATE_UNLOCKED, flags)
DISPATCH_ALWAYS_INLINE
static inline void
@@ -523,13 +519,9 @@
DLOCK_ONCE_UNLOCKED, tid_self, acquire));
}
-DISPATCH_ALWAYS_INLINE
-static inline void
-_dispatch_once_gate_wait(dispatch_once_gate_t l)
-{
- _dispatch_gate_wait_slow(&l->dgo_gate, (dispatch_lock)DLOCK_ONCE_DONE,
- DLOCK_LOCK_NONE);
-}
+#define _dispatch_once_gate_wait(l) \
+ _dispatch_gate_wait_slow(&(l)->dgo_gate, (dispatch_lock)DLOCK_ONCE_DONE, \
+ DLOCK_LOCK_NONE)
DISPATCH_ALWAYS_INLINE
static inline void
diff --git a/src/source.c b/src/source.c
index b01e9b2..a5a2c94 100644
--- a/src/source.c
+++ b/src/source.c
@@ -2433,7 +2433,7 @@
_dispatch_timer_expired = true;
for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
_dispatch_timers_mask |=
- 1 << DISPATCH_TIMER_INDEX(DISPATCH_TIMER_WALL_CLOCK, qos);
+ 1 << DISPATCH_TIMER_INDEX(DISPATCH_TIMER_KIND_WALL, qos);
}
}
#endif
@@ -4566,6 +4566,9 @@
DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
+ if (hdr == _dispatch_kevent_mach_msg_buf(ke)) {
+ _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr, (uint64_t)dmsg->dmsg_buf);
+ }
dmsg->dmsg_voucher = voucher;
dmsg->dmsg_priority = priority;
dmsg->do_ctxt = ctxt;
@@ -4585,6 +4588,7 @@
}
void *ctxt = dmr->dmr_ctxt;
mach_msg_header_t *hdr, *hdr2 = NULL;
+ void *hdr_copyout_addr;
mach_msg_size_t siz, msgsiz = 0;
mach_msg_return_t kr;
mach_msg_option_t options;
@@ -4602,6 +4606,7 @@
(options & MACH_RCV_TIMEOUT) ? "poll" : "wait");
kr = mach_msg(hdr, options, 0, siz, reply_port, MACH_MSG_TIMEOUT_NONE,
MACH_PORT_NULL);
+ hdr_copyout_addr = hdr;
_dispatch_debug_machport(reply_port);
_dispatch_debug("machport[0x%08x]: MACH_RCV_MSG (size %u, opts 0x%x) "
"returned: %s - 0x%x", reply_port, siz, options,
@@ -4652,18 +4657,20 @@
dispatch_assume_zero(kr);
break;
}
- if (slowpath(dm->dq_atomic_flags & DSF_CANCELED)) {
+ _dispatch_mach_msg_reply_received(dm, dmr, hdr->msgh_local_port);
+ hdr->msgh_local_port = MACH_PORT_NULL;
+ if (slowpath((dm->dq_atomic_flags & DSF_CANCELED) || kr)) {
if (!kr) mach_msg_destroy(hdr);
goto out;
}
- _dispatch_mach_msg_reply_received(dm, dmr, hdr->msgh_local_port);
- hdr->msgh_local_port = MACH_PORT_NULL;
- if (kr) goto out;
dispatch_mach_msg_t dmsg;
dispatch_mach_msg_destructor_t destructor = (!hdr2) ?
DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
+ if (!hdr2 || hdr != hdr_copyout_addr) {
+ _dispatch_ktrace2(DISPATCH_MACH_MSG_hdr_move, (uint64_t)hdr_copyout_addr, (uint64_t)_dispatch_mach_msg_get_msg(dmsg));
+ }
dmsg->do_ctxt = ctxt;
return dmsg;
out:
@@ -5902,8 +5909,12 @@
}
if (dm->ds_is_direct_kevent) {
pp &= (~_PTHREAD_PRIORITY_FLAGS_MASK |
+ _PTHREAD_PRIORITY_DEFAULTQUEUE_FLAG |
_PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
// _dispatch_mach_reply_kevent_register assumes this has been done
+ // which is unlike regular sources or queues, the DEFAULTQUEUE flag
+ // is used so that the priority of that channel doesn't act as a floor
+ // QoS for incoming messages (26761457)
dm->dq_priority = (dispatch_priority_t)pp;
}
dm->ds_is_installed = true;