blob: 7725d0d057219f600a9d1697205aedfe6b9ec107 [file] [log] [blame]
/*
* Copyright (c) 2008-2013 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @APPLE_APACHE_LICENSE_HEADER_END@
*/
#include "internal.h"
#if HAVE_MACH
#include "protocol.h" // _dispatch_send_wakeup_runloop_thread
#endif
#if HAVE_PTHREAD_WORKQUEUES || DISPATCH_USE_INTERNAL_WORKQUEUE
#define DISPATCH_USE_WORKQUEUES 1
#endif
#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \
!defined(DISPATCH_ENABLE_THREAD_POOL)
#define DISPATCH_ENABLE_THREAD_POOL 1
#endif
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL
#define DISPATCH_USE_PTHREAD_POOL 1
#endif
#if HAVE_PTHREAD_WORKQUEUES && (!HAVE_PTHREAD_WORKQUEUE_QOS || \
DISPATCH_DEBUG) && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
#endif
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && (DISPATCH_DEBUG || \
(!DISPATCH_USE_KEVENT_WORKQUEUE && !HAVE_PTHREAD_WORKQUEUE_QOS)) && \
!defined(DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP)
#define DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 1
#endif
#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || \
DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || \
DISPATCH_USE_INTERNAL_WORKQUEUE
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#define DISPATCH_USE_WORKQ_PRIORITY 1
#endif
#define DISPATCH_USE_WORKQ_OPTIONS 1
#endif
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#define pthread_workqueue_t void*
#endif
static void _dispatch_sig_thread(void *ctxt);
static void _dispatch_cache_cleanup(void *value);
static void _dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc);
static void _dispatch_queue_cleanup(void *ctxt);
static void _dispatch_wlh_cleanup(void *ctxt);
static void _dispatch_deferred_items_cleanup(void *ctxt);
static void _dispatch_frame_cleanup(void *ctxt);
static void _dispatch_context_cleanup(void *ctxt);
static void _dispatch_queue_barrier_complete(dispatch_queue_t dq,
dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
static void _dispatch_queue_non_barrier_complete(dispatch_queue_t dq);
static void _dispatch_queue_push_sync_waiter(dispatch_queue_t dq,
dispatch_sync_context_t dsc, dispatch_qos_t qos);
#if HAVE_PTHREAD_WORKQUEUE_QOS
static void _dispatch_root_queue_push_override_stealer(dispatch_queue_t orig_rq,
dispatch_queue_t dq, dispatch_qos_t qos);
static inline void _dispatch_queue_class_wakeup_with_override(dispatch_queue_t,
uint64_t dq_state, dispatch_wakeup_flags_t flags);
#endif
#if HAVE_PTHREAD_WORKQUEUES
static void _dispatch_worker_thread4(void *context);
#if HAVE_PTHREAD_WORKQUEUE_QOS
static void _dispatch_worker_thread3(pthread_priority_t priority);
#endif
#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
static void _dispatch_worker_thread2(int priority, int options, void *context);
#endif
#endif
#if DISPATCH_USE_PTHREAD_POOL
static void *_dispatch_worker_thread(void *context);
#endif
#if DISPATCH_COCOA_COMPAT
static dispatch_once_t _dispatch_main_q_handle_pred;
static void _dispatch_runloop_queue_poke(dispatch_queue_t dq,
dispatch_qos_t qos, dispatch_wakeup_flags_t flags);
static void _dispatch_runloop_queue_handle_init(void *ctxt);
static void _dispatch_runloop_queue_handle_dispose(dispatch_queue_t dq);
#endif
#pragma mark -
#pragma mark dispatch_root_queue
struct dispatch_pthread_root_queue_context_s {
pthread_attr_t dpq_thread_attr;
dispatch_block_t dpq_thread_configure;
struct dispatch_semaphore_s dpq_thread_mediator;
dispatch_pthread_root_queue_observer_hooks_s dpq_observer_hooks;
};
typedef struct dispatch_pthread_root_queue_context_s *
dispatch_pthread_root_queue_context_t;
#if DISPATCH_ENABLE_THREAD_POOL
static struct dispatch_pthread_root_queue_context_s
_dispatch_pthread_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {
.dpq_thread_mediator = {
DISPATCH_GLOBAL_OBJECT_HEADER(semaphore),
}},
};
#endif
#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
#endif
struct dispatch_root_queue_context_s {
union {
struct {
int volatile dgq_pending;
#if DISPATCH_USE_WORKQUEUES
qos_class_t dgq_qos;
#if DISPATCH_USE_WORKQ_PRIORITY
int dgq_wq_priority;
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
int dgq_wq_options;
#endif
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
pthread_workqueue_t dgq_kworkqueue;
#endif
#endif // DISPATCH_USE_WORKQUEUES
#if DISPATCH_USE_PTHREAD_POOL
void *dgq_ctxt;
int32_t volatile dgq_thread_pool_size;
#endif
};
char _dgq_pad[DISPATCH_CACHELINE_SIZE];
};
};
typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t;
#define WORKQ_PRIO_INVALID (-1)
#ifndef WORKQ_BG_PRIOQUEUE_CONDITIONAL
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
#endif
#ifndef WORKQ_HIGH_PRIOQUEUE_CONDITIONAL
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL WORKQ_PRIO_INVALID
#endif
DISPATCH_CACHELINE_ALIGN
static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = 0,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS],
#endif
}}},
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
#endif
#if DISPATCH_USE_WORKQ_OPTIONS
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#endif
#if DISPATCH_ENABLE_THREAD_POOL
.dgq_ctxt = &_dispatch_pthread_root_queue_contexts[
DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT],
#endif
}}},
};
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_root_queues[] = {
#define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \
((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \
DISPATCH_ROOT_QUEUE_IDX_##n##_QOS)
#define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \
[_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root), \
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \
.do_ctxt = &_dispatch_root_queue_contexts[ \
_DISPATCH_ROOT_QUEUE_IDX(n, flags)], \
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \
.dq_priority = _dispatch_priority_make(DISPATCH_QOS_##n, 0) | flags | \
DISPATCH_PRIORITY_FLAG_ROOTQUEUE | \
((flags & DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE) ? 0 : \
DISPATCH_QOS_##n << DISPATCH_PRIORITY_OVERRIDE_SHIFT), \
__VA_ARGS__ \
}
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0,
.dq_label = "com.apple.root.maintenance-qos",
.dq_serialnum = 4,
),
_DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.maintenance-qos.overcommit",
.dq_serialnum = 5,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0,
.dq_label = "com.apple.root.background-qos",
.dq_serialnum = 6,
),
_DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.background-qos.overcommit",
.dq_serialnum = 7,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0,
.dq_label = "com.apple.root.utility-qos",
.dq_serialnum = 8,
),
_DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.utility-qos.overcommit",
.dq_serialnum = 9,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE,
.dq_label = "com.apple.root.default-qos",
.dq_serialnum = 10,
),
_DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT,
DISPATCH_PRIORITY_FLAG_DEFAULTQUEUE | DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.default-qos.overcommit",
.dq_serialnum = 11,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0,
.dq_label = "com.apple.root.user-initiated-qos",
.dq_serialnum = 12,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-initiated-qos.overcommit",
.dq_serialnum = 13,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0,
.dq_label = "com.apple.root.user-interactive-qos",
.dq_serialnum = 14,
),
_DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT,
.dq_label = "com.apple.root.user-interactive-qos.overcommit",
.dq_serialnum = 15,
),
};
#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
static const dispatch_queue_t _dispatch_wq2root_queues[][2] = {
[WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS],
[WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
&_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT],
[WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS],
[WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
&_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT],
[WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS],
[WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
&_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT],
[WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS],
[WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] =
&_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT],
};
#endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
static struct dispatch_queue_s _dispatch_mgr_root_queue;
#else
#define _dispatch_mgr_root_queue _dispatch_root_queues[\
DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT]
#endif
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
DISPATCH_CACHELINE_ALIGN
struct dispatch_queue_s _dispatch_mgr_q = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_mgr),
.dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
DISPATCH_QUEUE_ROLE_BASE_ANON,
.do_targetq = &_dispatch_mgr_root_queue,
.dq_label = "com.apple.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(1),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 2,
};
dispatch_queue_t
dispatch_get_global_queue(long priority, unsigned long flags)
{
if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) {
return DISPATCH_BAD_INPUT;
}
dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == QOS_CLASS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
} else if (qos == QOS_CLASS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
#endif
if (qos == DISPATCH_QOS_UNSPECIFIED) {
return DISPATCH_BAD_INPUT;
}
return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT);
}
DISPATCH_ALWAYS_INLINE
static inline dispatch_queue_t
_dispatch_get_current_queue(void)
{
return _dispatch_queue_get_current() ?:
_dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, true);
}
dispatch_queue_t
dispatch_get_current_queue(void)
{
return _dispatch_get_current_queue();
}
DISPATCH_NOINLINE DISPATCH_NORETURN
static void
_dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected)
{
_dispatch_client_assert_fail(
"Block was %sexpected to execute on queue [%s]",
expected ? "" : "not ", dq->dq_label ?: "");
}
DISPATCH_NOINLINE DISPATCH_NORETURN
static void
_dispatch_assert_queue_barrier_fail(dispatch_queue_t dq)
{
_dispatch_client_assert_fail(
"Block was expected to act as a barrier on queue [%s]",
dq->dq_label ?: "");
}
void
dispatch_assert_queue(dispatch_queue_t dq)
{
unsigned long metatype = dx_metatype(dq);
if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
"dispatch_assert_queue()");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
if (likely(_dq_state_drain_locked_by_self(dq_state))) {
return;
}
// we can look at the width: if it is changing while we read it,
// it means that a barrier is running on `dq` concurrently, which
// proves that we're not on `dq`. Hence reading a stale '1' is ok.
//
// However if we can have thread bound queues, these mess with lock
// ownership and we always have to take the slowpath
if (likely(DISPATCH_COCOA_COMPAT || dq->dq_width > 1)) {
if (likely(_dispatch_thread_frame_find_queue(dq))) {
return;
}
}
_dispatch_assert_queue_fail(dq, true);
}
void
dispatch_assert_queue_not(dispatch_queue_t dq)
{
unsigned long metatype = dx_metatype(dq);
if (unlikely(metatype != _DISPATCH_QUEUE_TYPE)) {
DISPATCH_CLIENT_CRASH(metatype, "invalid queue passed to "
"dispatch_assert_queue_not()");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
if (likely(!_dq_state_drain_locked_by_self(dq_state))) {
// we can look at the width: if it is changing while we read it,
// it means that a barrier is running on `dq` concurrently, which
// proves that we're not on `dq`. Hence reading a stale '1' is ok.
//
// However if we can have thread bound queues, these mess with lock
// ownership and we always have to take the slowpath
if (likely(!DISPATCH_COCOA_COMPAT && dq->dq_width == 1)) {
return;
}
if (likely(!_dispatch_thread_frame_find_queue(dq))) {
return;
}
}
_dispatch_assert_queue_fail(dq, false);
}
void
dispatch_assert_queue_barrier(dispatch_queue_t dq)
{
dispatch_assert_queue(dq);
if (likely(dq->dq_width == 1)) {
return;
}
if (likely(dq->do_targetq)) {
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
if (likely(_dq_state_is_in_barrier(dq_state))) {
return;
}
}
_dispatch_assert_queue_barrier_fail(dq);
}
#if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG
#define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__)
#define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__)
#else
#define _dispatch_root_queue_debug(...)
#define _dispatch_debug_root_queue(...)
#endif
#pragma mark -
#pragma mark dispatch_init
static inline bool
_dispatch_root_queues_init_workq(int *wq_supported)
{
int r; (void)r;
bool result = false;
*wq_supported = 0;
#if DISPATCH_USE_WORKQUEUES
bool disable_wq = false; (void)disable_wq;
#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG
disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ"));
#endif
#if DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
bool disable_qos = false;
#if DISPATCH_DEBUG
disable_qos = slowpath(getenv("LIBDISPATCH_DISABLE_QOS"));
#endif
#if DISPATCH_USE_KEVENT_WORKQUEUE
bool disable_kevent_wq = false;
#if DISPATCH_DEBUG || DISPATCH_PROFILE
disable_kevent_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KEVENT_WQ"));
#endif
#endif
if (!disable_wq && !disable_qos) {
*wq_supported = _pthread_workqueue_supported();
#if DISPATCH_USE_KEVENT_WORKQUEUE
if (!disable_kevent_wq && (*wq_supported & WORKQ_FEATURE_KEVENT)) {
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread3,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#if DISPATCH_USE_MGR_THREAD
_dispatch_kevent_workqueue_enabled = !r;
#endif
result = !r;
} else
#endif // DISPATCH_USE_KEVENT_WORKQUEUE
if (*wq_supported & WORKQ_FEATURE_FINEPRIO) {
#if DISPATCH_USE_MGR_THREAD
r = _pthread_workqueue_init(_dispatch_worker_thread3,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
result = !r;
#endif
}
if (!(*wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(*wq_supported,
"QoS Maintenance support required");
}
}
#endif // DISPATCH_USE_KEVENT_WORKQUEUE || HAVE_PTHREAD_WORKQUEUE_QOS
#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
if (!result && !disable_wq) {
pthread_workqueue_setdispatchoffset_np(
offsetof(struct dispatch_queue_s, dq_serialnum));
r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2);
#if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
(void)dispatch_assume_zero(r);
#endif
result = !r;
}
#endif // DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
if (!result) {
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
pthread_workqueue_attr_t pwq_attr;
if (!disable_wq) {
r = pthread_workqueue_attr_init_np(&pwq_attr);
(void)dispatch_assume_zero(r);
}
#endif
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
pthread_workqueue_t pwq = NULL;
dispatch_root_queue_context_t qc;
qc = &_dispatch_root_queue_contexts[i];
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
if (!disable_wq && qc->dgq_wq_priority != WORKQ_PRIO_INVALID) {
r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
qc->dgq_wq_priority);
(void)dispatch_assume_zero(r);
r = pthread_workqueue_attr_setovercommit_np(&pwq_attr,
qc->dgq_wq_options &
WORKQ_ADDTHREADS_OPTION_OVERCOMMIT);
(void)dispatch_assume_zero(r);
r = pthread_workqueue_create_np(&pwq, &pwq_attr);
(void)dispatch_assume_zero(r);
result = result || dispatch_assume(pwq);
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
if (pwq) {
qc->dgq_kworkqueue = pwq;
} else {
qc->dgq_kworkqueue = (void*)(~0ul);
// because the fastpath of _dispatch_global_queue_poke didn't
// know yet that we're using the internal pool implementation
// we have to undo its setting of dgq_pending
qc->dgq_pending = 0;
}
}
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
if (!disable_wq) {
r = pthread_workqueue_attr_destroy_np(&pwq_attr);
(void)dispatch_assume_zero(r);
}
#endif
}
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL
#endif // DISPATCH_USE_WORKQUEUES
return result;
}
#if DISPATCH_USE_PTHREAD_POOL
static inline void
_dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc,
int32_t pool_size, bool overcommit)
{
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
int32_t thread_pool_size = overcommit ? DISPATCH_WORKQ_MAX_PTHREAD_COUNT :
(int32_t)dispatch_hw_config(active_cpus);
if (slowpath(pool_size) && pool_size < thread_pool_size) {
thread_pool_size = pool_size;
}
qc->dgq_thread_pool_size = thread_pool_size;
#if DISPATCH_USE_WORKQUEUES
if (qc->dgq_qos) {
(void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
(void)dispatch_assume_zero(pthread_attr_setdetachstate(
&pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
#if HAVE_PTHREAD_WORKQUEUE_QOS
(void)dispatch_assume_zero(pthread_attr_set_qos_class_np(
&pqc->dpq_thread_attr, qc->dgq_qos, 0));
#endif
}
#endif // HAVE_PTHREAD_WORKQUEUES
_dispatch_sema4_t *sema = &pqc->dpq_thread_mediator.dsema_sema;
_dispatch_sema4_init(sema, _DSEMA4_POLICY_LIFO);
_dispatch_sema4_create(sema, _DSEMA4_POLICY_LIFO);
}
#endif // DISPATCH_USE_PTHREAD_POOL
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
int wq_supported;
_dispatch_fork_becomes_unsafe();
if (!_dispatch_root_queues_init_workq(&wq_supported)) {
#if DISPATCH_ENABLE_THREAD_POOL
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
bool overcommit = true;
#if TARGET_OS_EMBEDDED || (DISPATCH_USE_INTERNAL_WORKQUEUE && HAVE_DISPATCH_WORKQ_MONITORING)
// some software hangs if the non-overcommitting queues do not
// overcommit when threads block. Someday, this behavior should
// apply to all platforms
if (!(i & 1)) {
overcommit = false;
}
#endif
_dispatch_root_queue_init_pthread_pool(
&_dispatch_root_queue_contexts[i], 0, overcommit);
}
#else
DISPATCH_INTERNAL_CRASH((errno << 16) | wq_supported,
"Root queue initialization failed");
#endif // DISPATCH_ENABLE_THREAD_POOL
}
}
void
_dispatch_root_queues_init(void)
{
static dispatch_once_t _dispatch_root_queues_pred;
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
DISPATCH_EXPORT DISPATCH_NOTHROW
void
libdispatch_init(void)
{
dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 2 * DISPATCH_QOS_MAX);
dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW ==
-DISPATCH_QUEUE_PRIORITY_HIGH);
dispatch_assert(countof(_dispatch_root_queues) ==
DISPATCH_ROOT_QUEUE_COUNT);
dispatch_assert(countof(_dispatch_root_queue_contexts) ==
DISPATCH_ROOT_QUEUE_COUNT);
#if DISPATCH_USE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
dispatch_assert(sizeof(_dispatch_wq2root_queues) /
sizeof(_dispatch_wq2root_queues[0][0]) ==
WORKQ_NUM_PRIOQUEUE * 2);
#endif
#if DISPATCH_ENABLE_THREAD_POOL
dispatch_assert(countof(_dispatch_pthread_root_queue_contexts) ==
DISPATCH_ROOT_QUEUE_COUNT);
#endif
dispatch_assert(offsetof(struct dispatch_continuation_s, do_next) ==
offsetof(struct dispatch_object_s, do_next));
dispatch_assert(offsetof(struct dispatch_continuation_s, do_vtable) ==
offsetof(struct dispatch_object_s, do_vtable));
dispatch_assert(sizeof(struct dispatch_apply_s) <=
DISPATCH_CONTINUATION_SIZE);
dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE
== 0);
dispatch_assert(offsetof(struct dispatch_queue_s, dq_state) % _Alignof(uint64_t) == 0);
dispatch_assert(sizeof(struct dispatch_root_queue_context_s) %
DISPATCH_CACHELINE_SIZE == 0);
#if HAVE_PTHREAD_WORKQUEUE_QOS
dispatch_qos_t qos = _dispatch_qos_from_qos_class(qos_class_main());
dispatch_priority_t pri = _dispatch_priority_make(qos, 0);
_dispatch_main_q.dq_priority = _dispatch_priority_with_override_qos(pri, qos);
#if DISPATCH_DEBUG
if (!slowpath(getenv("LIBDISPATCH_DISABLE_SET_QOS"))) {
_dispatch_set_qos_class_enabled = 1;
}
#endif
#endif
#if DISPATCH_USE_THREAD_LOCAL_STORAGE
_dispatch_thread_key_create(&__dispatch_tsd_key, _libdispatch_tsd_cleanup);
#else
_dispatch_thread_key_create(&dispatch_priority_key, NULL);
_dispatch_thread_key_create(&dispatch_r2k_key, NULL);
_dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup);
_dispatch_thread_key_create(&dispatch_frame_key, _dispatch_frame_cleanup);
_dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup);
_dispatch_thread_key_create(&dispatch_context_key, _dispatch_context_cleanup);
_dispatch_thread_key_create(&dispatch_pthread_root_queue_observer_hooks_key,
NULL);
_dispatch_thread_key_create(&dispatch_basepri_key, NULL);
#if DISPATCH_INTROSPECTION
_dispatch_thread_key_create(&dispatch_introspection_key , NULL);
#elif DISPATCH_PERF_MON
_dispatch_thread_key_create(&dispatch_bcounter_key, NULL);
#endif
_dispatch_thread_key_create(&dispatch_wlh_key, _dispatch_wlh_cleanup);
_dispatch_thread_key_create(&dispatch_voucher_key, _voucher_thread_cleanup);
_dispatch_thread_key_create(&dispatch_deferred_items_key,
_dispatch_deferred_items_cleanup);
#endif
#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707
_dispatch_main_q.do_targetq = &_dispatch_root_queues[
DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT];
#endif
_dispatch_queue_set_current(&_dispatch_main_q);
_dispatch_queue_set_bound_thread(&_dispatch_main_q);
#if DISPATCH_USE_PTHREAD_ATFORK
(void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare,
dispatch_atfork_parent, dispatch_atfork_child));
#endif
_dispatch_hw_config_init();
_dispatch_time_init();
_dispatch_vtable_init();
_os_object_init();
_voucher_init();
_dispatch_introspection_init();
}
#if DISPATCH_USE_THREAD_LOCAL_STORAGE
#include <unistd.h>
#include <sys/syscall.h>
#ifndef __ANDROID__
#ifdef SYS_gettid
DISPATCH_ALWAYS_INLINE
static inline pid_t
gettid(void)
{
return (pid_t)syscall(SYS_gettid);
}
#elif defined(__FreeBSD__)
DISPATCH_ALWAYS_INLINE
static inline pid_t
gettid(void)
{
return (pid_t)pthread_getthreadid_np();
}
#else
#error "SYS_gettid unavailable on this system"
#endif /* SYS_gettid */
#endif /* ! __ANDROID__ */
#define _tsd_call_cleanup(k, f) do { \
if ((f) && tsd->k) ((void(*)(void*))(f))(tsd->k); \
} while (0)
#ifdef __ANDROID__
static void (*_dispatch_thread_detach_callback)(void);
void
_dispatch_install_thread_detach_callback(dispatch_function_t cb)
{
if (os_atomic_xchg(&_dispatch_thread_detach_callback, cb, relaxed)) {
DISPATCH_CLIENT_CRASH(0, "Installing a thread detach callback twice");
}
}
#endif
void
_libdispatch_tsd_cleanup(void *ctx)
{
struct dispatch_tsd *tsd = (struct dispatch_tsd*) ctx;
_tsd_call_cleanup(dispatch_priority_key, NULL);
_tsd_call_cleanup(dispatch_r2k_key, NULL);
_tsd_call_cleanup(dispatch_queue_key, _dispatch_queue_cleanup);
_tsd_call_cleanup(dispatch_frame_key, _dispatch_frame_cleanup);
_tsd_call_cleanup(dispatch_cache_key, _dispatch_cache_cleanup);
_tsd_call_cleanup(dispatch_context_key, _dispatch_context_cleanup);
_tsd_call_cleanup(dispatch_pthread_root_queue_observer_hooks_key,
NULL);
_tsd_call_cleanup(dispatch_basepri_key, NULL);
#if DISPATCH_INTROSPECTION
_tsd_call_cleanup(dispatch_introspection_key, NULL);
#elif DISPATCH_PERF_MON
_tsd_call_cleanup(dispatch_bcounter_key, NULL);
#endif
_tsd_call_cleanup(dispatch_wlh_key, _dispatch_wlh_cleanup);
_tsd_call_cleanup(dispatch_voucher_key, _voucher_thread_cleanup);
_tsd_call_cleanup(dispatch_deferred_items_key,
_dispatch_deferred_items_cleanup);
#ifdef __ANDROID__
if (_dispatch_thread_detach_callback) {
_dispatch_thread_detach_callback();
}
#endif
tsd->tid = 0;
}
DISPATCH_NOINLINE
void
libdispatch_tsd_init(void)
{
pthread_setspecific(__dispatch_tsd_key, &__dispatch_tsd);
__dispatch_tsd.tid = gettid();
}
#endif
DISPATCH_NOTHROW
void
_dispatch_queue_atfork_child(void)
{
dispatch_queue_t main_q = &_dispatch_main_q;
void *crash = (void *)0x100;
size_t i;
if (_dispatch_queue_is_thread_bound(main_q)) {
_dispatch_queue_set_bound_thread(main_q);
}
if (!_dispatch_is_multithreaded_inline()) return;
main_q->dq_items_head = crash;
main_q->dq_items_tail = crash;
_dispatch_mgr_q.dq_items_head = crash;
_dispatch_mgr_q.dq_items_tail = crash;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queues[i].dq_items_head = crash;
_dispatch_root_queues[i].dq_items_tail = crash;
}
}
DISPATCH_NOINLINE
void
_dispatch_fork_becomes_unsafe_slow(void)
{
uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
_DISPATCH_UNSAFE_FORK_MULTITHREADED, relaxed);
if (value & _DISPATCH_UNSAFE_FORK_PROHIBIT) {
DISPATCH_CLIENT_CRASH(0, "Transition to multithreaded is prohibited");
}
}
DISPATCH_NOINLINE
void
_dispatch_prohibit_transition_to_multithreaded(bool prohibit)
{
if (prohibit) {
uint8_t value = os_atomic_or(&_dispatch_unsafe_fork,
_DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
if (value & _DISPATCH_UNSAFE_FORK_MULTITHREADED) {
DISPATCH_CLIENT_CRASH(0, "The executable is already multithreaded");
}
} else {
os_atomic_and(&_dispatch_unsafe_fork,
(uint8_t)~_DISPATCH_UNSAFE_FORK_PROHIBIT, relaxed);
}
}
#pragma mark -
#pragma mark dispatch_queue_attr_t
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_qos_class_valid(dispatch_qos_class_t qos_class, int relative_priority)
{
qos_class_t qos = (qos_class_t)qos_class;
switch (qos) {
case QOS_CLASS_MAINTENANCE:
case QOS_CLASS_BACKGROUND:
case QOS_CLASS_UTILITY:
case QOS_CLASS_DEFAULT:
case QOS_CLASS_USER_INITIATED:
case QOS_CLASS_USER_INTERACTIVE:
case QOS_CLASS_UNSPECIFIED:
break;
default:
return false;
}
if (relative_priority > 0 || relative_priority < QOS_MIN_RELATIVE_PRIORITY){
return false;
}
return true;
}
#define DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit) \
((overcommit) == _dispatch_queue_attr_overcommit_disabled ? \
DQA_INDEX_NON_OVERCOMMIT : \
((overcommit) == _dispatch_queue_attr_overcommit_enabled ? \
DQA_INDEX_OVERCOMMIT : DQA_INDEX_UNSPECIFIED_OVERCOMMIT))
#define DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent) \
((concurrent) ? DQA_INDEX_CONCURRENT : DQA_INDEX_SERIAL)
#define DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive) \
((inactive) ? DQA_INDEX_INACTIVE : DQA_INDEX_ACTIVE)
#define DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency) \
(frequency)
#define DISPATCH_QUEUE_ATTR_PRIO2IDX(prio) (-(prio))
#define DISPATCH_QUEUE_ATTR_QOS2IDX(qos) (qos)
static inline dispatch_queue_attr_t
_dispatch_get_queue_attr(dispatch_qos_t qos, int prio,
_dispatch_queue_attr_overcommit_t overcommit,
dispatch_autorelease_frequency_t frequency,
bool concurrent, bool inactive)
{
return (dispatch_queue_attr_t)&_dispatch_queue_attrs
[DISPATCH_QUEUE_ATTR_QOS2IDX(qos)]
[DISPATCH_QUEUE_ATTR_PRIO2IDX(prio)]
[DISPATCH_QUEUE_ATTR_OVERCOMMIT2IDX(overcommit)]
[DISPATCH_QUEUE_ATTR_AUTORELEASE_FREQUENCY2IDX(frequency)]
[DISPATCH_QUEUE_ATTR_CONCURRENT2IDX(concurrent)]
[DISPATCH_QUEUE_ATTR_INACTIVE2IDX(inactive)];
}
dispatch_queue_attr_t
_dispatch_get_default_queue_attr(void)
{
return _dispatch_get_queue_attr(DISPATCH_QOS_UNSPECIFIED, 0,
_dispatch_queue_attr_overcommit_unspecified,
DISPATCH_AUTORELEASE_FREQUENCY_INHERIT, false, false);
}
dispatch_queue_attr_t
dispatch_queue_attr_make_with_qos_class(dispatch_queue_attr_t dqa,
dispatch_qos_class_t qos_class, int relpri)
{
if (!_dispatch_qos_class_valid(qos_class, relpri)) {
return DISPATCH_BAD_INPUT;
}
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
return _dispatch_get_queue_attr(_dispatch_qos_from_qos_class(qos_class),
relpri, dqa->dqa_overcommit, dqa->dqa_autorelease_frequency,
dqa->dqa_concurrent, dqa->dqa_inactive);
}
dispatch_queue_attr_t
dispatch_queue_attr_make_initially_inactive(dispatch_queue_attr_t dqa)
{
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
_dispatch_priority_relpri(pri), dqa->dqa_overcommit,
dqa->dqa_autorelease_frequency, dqa->dqa_concurrent, true);
}
dispatch_queue_attr_t
dispatch_queue_attr_make_with_overcommit(dispatch_queue_attr_t dqa,
bool overcommit)
{
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
_dispatch_priority_relpri(pri), overcommit ?
_dispatch_queue_attr_overcommit_enabled :
_dispatch_queue_attr_overcommit_disabled,
dqa->dqa_autorelease_frequency, dqa->dqa_concurrent,
dqa->dqa_inactive);
}
dispatch_queue_attr_t
dispatch_queue_attr_make_with_autorelease_frequency(dispatch_queue_attr_t dqa,
dispatch_autorelease_frequency_t frequency)
{
switch (frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_INHERIT:
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
break;
default:
return DISPATCH_BAD_INPUT;
}
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
dispatch_priority_t pri = dqa->dqa_qos_and_relpri;
return _dispatch_get_queue_attr(_dispatch_priority_qos(pri),
_dispatch_priority_relpri(pri), dqa->dqa_overcommit,
frequency, dqa->dqa_concurrent, dqa->dqa_inactive);
}
#pragma mark -
#pragma mark dispatch_queue_t
void
dispatch_queue_set_label_nocopy(dispatch_queue_t dq, const char *label)
{
if (dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
return;
}
dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(dq);
if (unlikely(dqf & DQF_LABEL_NEEDS_FREE)) {
DISPATCH_CLIENT_CRASH(dq, "Cannot change label for this queue");
}
dq->dq_label = label;
}
static inline bool
_dispatch_base_queue_is_wlh(dispatch_queue_t dq, dispatch_queue_t tq)
{
(void)dq; (void)tq;
return false;
}
static void
_dispatch_queue_inherit_wlh_from_target(dispatch_queue_t dq,
dispatch_queue_t tq)
{
uint64_t old_state, new_state, role;
if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
role = DISPATCH_QUEUE_ROLE_INNER;
} else if (_dispatch_base_queue_is_wlh(dq, tq)) {
role = DISPATCH_QUEUE_ROLE_BASE_WLH;
} else {
role = DISPATCH_QUEUE_ROLE_BASE_ANON;
}
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
new_state = old_state & ~DISPATCH_QUEUE_ROLE_MASK;
new_state |= role;
if (old_state == new_state) {
os_atomic_rmw_loop_give_up(break);
}
});
dispatch_wlh_t cur_wlh = _dispatch_get_wlh();
if (cur_wlh == (dispatch_wlh_t)dq && !_dq_state_is_base_wlh(new_state)) {
_dispatch_event_loop_leave_immediate(cur_wlh, new_state);
}
if (!dx_hastypeflag(tq, QUEUE_ROOT)) {
#if DISPATCH_ALLOW_NON_LEAF_RETARGET
_dispatch_queue_atomic_flags_set(tq, DQF_TARGETED);
#else
_dispatch_queue_atomic_flags_set_and_clear(tq, DQF_TARGETED, DQF_LEGACY);
#endif
}
}
unsigned long volatile _dispatch_queue_serial_numbers =
DISPATCH_QUEUE_SERIAL_NUMBER_INIT;
dispatch_priority_t
_dispatch_queue_compute_priority_and_wlh(dispatch_queue_t dq,
dispatch_wlh_t *wlh_out)
{
dispatch_priority_t p = dq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
dispatch_queue_t tq = dq->do_targetq;
dispatch_priority_t tqp = tq->dq_priority &DISPATCH_PRIORITY_REQUESTED_MASK;
dispatch_wlh_t wlh = DISPATCH_WLH_ANON;
if (_dq_state_is_base_wlh(dq->dq_state)) {
wlh = (dispatch_wlh_t)dq;
}
while (unlikely(!dx_hastypeflag(tq, QUEUE_ROOT))) {
if (unlikely(tq == &_dispatch_mgr_q)) {
if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
return DISPATCH_PRIORITY_FLAG_MANAGER;
}
if (unlikely(_dispatch_queue_is_thread_bound(tq))) {
// thread-bound hierarchies are weird, we need to install
// from the context of the thread this hierarchy is bound to
if (wlh_out) *wlh_out = NULL;
return 0;
}
if (unlikely(DISPATCH_QUEUE_IS_SUSPENDED(tq))) {
// this queue may not be activated yet, so the queue graph may not
// have stabilized yet
_dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
if (wlh_out) *wlh_out = NULL;
return 0;
}
if (_dq_state_is_base_wlh(tq->dq_state)) {
wlh = (dispatch_wlh_t)tq;
} else if (unlikely(_dispatch_queue_is_legacy(tq))) {
// we're not allowed to dereference tq->do_targetq
_dispatch_ktrace1(DISPATCH_PERF_delayed_registration, dq);
if (wlh_out) *wlh_out = NULL;
return 0;
}
if (!(tq->dq_priority & DISPATCH_PRIORITY_FLAG_INHERIT)) {
if (p < tqp) p = tqp;
}
tq = tq->do_targetq;
tqp = tq->dq_priority & DISPATCH_PRIORITY_REQUESTED_MASK;
}
if (unlikely(!tqp)) {
// pthread root queues opt out of QoS
if (wlh_out) *wlh_out = DISPATCH_WLH_ANON;
return DISPATCH_PRIORITY_FLAG_MANAGER;
}
if (wlh_out) *wlh_out = wlh;
return _dispatch_priority_inherit_from_root_queue(p, tq);
}
DISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq, bool legacy)
{
if (!slowpath(dqa)) {
dqa = _dispatch_get_default_queue_attr();
} else if (dqa->do_vtable != DISPATCH_VTABLE(queue_attr)) {
DISPATCH_CLIENT_CRASH(dqa->do_vtable, "Invalid queue attribute");
}
//
// Step 1: Normalize arguments (qos, overcommit, tq)
//
dispatch_qos_t qos = _dispatch_priority_qos(dqa->dqa_qos_and_relpri);
#if !HAVE_PTHREAD_WORKQUEUE_QOS
if (qos == DISPATCH_QOS_USER_INTERACTIVE) {
qos = DISPATCH_QOS_USER_INITIATED;
}
if (qos == DISPATCH_QOS_MAINTENANCE) {
qos = DISPATCH_QOS_BACKGROUND;
}
#endif // !HAVE_PTHREAD_WORKQUEUE_QOS
_dispatch_queue_attr_overcommit_t overcommit = dqa->dqa_overcommit;
if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) {
if (tq->do_targetq) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and "
"a non-global target queue");
}
}
if (tq && !tq->do_targetq &&
tq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
// Handle discrepancies between attr and target queue, attributes win
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) {
overcommit = _dispatch_queue_attr_overcommit_enabled;
} else {
overcommit = _dispatch_queue_attr_overcommit_disabled;
}
}
if (qos == DISPATCH_QOS_UNSPECIFIED) {
dispatch_qos_t tq_qos = _dispatch_priority_qos(tq->dq_priority);
tq = _dispatch_get_root_queue(tq_qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
} else {
tq = NULL;
}
} else if (tq && !tq->do_targetq) {
// target is a pthread or runloop root queue, setting QoS or overcommit
// is disallowed
if (overcommit != _dispatch_queue_attr_overcommit_unspecified) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute "
"and use this kind of target queue");
}
if (qos != DISPATCH_QOS_UNSPECIFIED) {
DISPATCH_CLIENT_CRASH(tq, "Cannot specify a QoS attribute "
"and use this kind of target queue");
}
} else {
if (overcommit == _dispatch_queue_attr_overcommit_unspecified) {
// Serial queues default to overcommit!
overcommit = dqa->dqa_concurrent ?
_dispatch_queue_attr_overcommit_disabled :
_dispatch_queue_attr_overcommit_enabled;
}
}
if (!tq) {
tq = _dispatch_get_root_queue(
qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos,
overcommit == _dispatch_queue_attr_overcommit_enabled);
if (slowpath(!tq)) {
DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute");
}
}
//
// Step 2: Initialize the queue
//
if (legacy) {
// if any of these attributes is specified, use non legacy classes
if (dqa->dqa_inactive || dqa->dqa_autorelease_frequency) {
legacy = false;
}
}
const void *vtable;
dispatch_queue_flags_t dqf = 0;
if (legacy) {
vtable = DISPATCH_VTABLE(queue);
} else if (dqa->dqa_concurrent) {
vtable = DISPATCH_VTABLE(queue_concurrent);
} else {
vtable = DISPATCH_VTABLE(queue_serial);
}
switch (dqa->dqa_autorelease_frequency) {
case DISPATCH_AUTORELEASE_FREQUENCY_NEVER:
dqf |= DQF_AUTORELEASE_NEVER;
break;
case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM:
dqf |= DQF_AUTORELEASE_ALWAYS;
break;
}
if (legacy) {
dqf |= DQF_LEGACY;
}
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
dispatch_queue_t dq = _dispatch_object_alloc(vtable,
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
_dispatch_queue_init(dq, dqf, dqa->dqa_concurrent ?
DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
(dqa->dqa_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
dq->dq_label = label;
#if HAVE_PTHREAD_WORKQUEUE_QOS
dq->dq_priority = dqa->dqa_qos_and_relpri;
if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
}
#endif
_dispatch_retain(tq);
if (qos == QOS_CLASS_UNSPECIFIED) {
// legacy way of inherithing the QoS from the target
_dispatch_queue_priority_inherit_from_target(dq, tq);
}
if (!dqa->dqa_inactive) {
_dispatch_queue_inherit_wlh_from_target(dq, tq);
}
dq->do_targetq = tq;
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}
dispatch_queue_t
dispatch_queue_create_with_target(const char *label, dispatch_queue_attr_t dqa,
dispatch_queue_t tq)
{
return _dispatch_queue_create_with_target(label, dqa, tq, false);
}
dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return _dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT, true);
}
dispatch_queue_t
dispatch_queue_create_with_accounting_override_voucher(const char *label,
dispatch_queue_attr_t attr, voucher_t voucher)
{
(void)label; (void)attr; (void)voucher;
DISPATCH_CLIENT_CRASH(0, "Unsupported interface");
}
void
_dispatch_queue_destroy(dispatch_queue_t dq, bool *allow_free)
{
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
uint64_t initial_state = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
if (dx_hastypeflag(dq, QUEUE_ROOT)) {
initial_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
}
dq_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
dq_state &= ~DISPATCH_QUEUE_DIRTY;
dq_state &= ~DISPATCH_QUEUE_ROLE_MASK;
if (slowpath(dq_state != initial_state)) {
if (_dq_state_drain_locked(dq_state)) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"Release of a locked queue");
}
#ifndef __LP64__
dq_state >>= 32;
#endif
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"Release of a queue with corrupt state");
}
if (slowpath(dq->dq_items_tail)) {
DISPATCH_CLIENT_CRASH(dq->dq_items_tail,
"Release of a queue while items are enqueued");
}
// trash the queue so that use after free will crash
dq->dq_items_head = (void *)0x200;
dq->dq_items_tail = (void *)0x200;
dispatch_queue_t dqsq = os_atomic_xchg2o(dq, dq_specific_q,
(void *)0x200, relaxed);
if (dqsq) {
_dispatch_release(dqsq);
}
// fastpath for queues that never got their storage retained
if (likely(os_atomic_load2o(dq, dq_sref_cnt, relaxed) == 0)) {
// poison the state with something that is suspended and is easy to spot
dq->dq_state = 0xdead000000000000;
return;
}
// Take over freeing the memory from _dispatch_object_dealloc()
//
// As soon as we call _dispatch_queue_release_storage(), we forfeit
// the possibility for the caller of dx_dispose() to finalize the object
// so that responsibility is ours.
_dispatch_object_finalize(dq);
*allow_free = false;
dq->dq_label = "<released queue, pending free>";
dq->do_targetq = NULL;
dq->do_finalizer = NULL;
dq->do_ctxt = NULL;
return _dispatch_queue_release_storage(dq);
}
// 6618342 Contact the team that owns the Instrument DTrace probe before
// renaming this symbol
void
_dispatch_queue_dispose(dispatch_queue_t dq, bool *allow_free)
{
_dispatch_object_debug(dq, "%s", __func__);
_dispatch_introspection_queue_dispose(dq);
if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
free((void*)dq->dq_label);
}
_dispatch_queue_destroy(dq, allow_free);
}
void
_dispatch_queue_xref_dispose(dispatch_queue_t dq)
{
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
if (unlikely(_dq_state_is_suspended(dq_state))) {
long state = (long)dq_state;
if (sizeof(long) < sizeof(uint64_t)) state = (long)(dq_state >> 32);
if (unlikely(_dq_state_is_inactive(dq_state))) {
// Arguments for and against this assert are within 6705399
DISPATCH_CLIENT_CRASH(state, "Release of an inactive object");
}
DISPATCH_CLIENT_CRASH(dq_state, "Release of a suspended object");
}
os_atomic_or2o(dq, dq_atomic_flags, DQF_RELEASED, relaxed);
}
DISPATCH_NOINLINE
static void
_dispatch_queue_suspend_slow(dispatch_queue_t dq)
{
uint64_t dq_state, value, delta;
_dispatch_queue_sidelock_lock(dq);
// what we want to transfer (remove from dq_state)
delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
// but this is a suspend so add a suspend count at the same time
delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
if (dq->dq_side_suspend_cnt == 0) {
// we substract delta from dq_state, and we want to set this bit
delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
}
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
// unsigned underflow of the substraction can happen because other
// threads could have touched this value while we were trying to acquire
// the lock, or because another thread raced us to do the same operation
// and got to the lock first.
if (unlikely(os_sub_overflow(dq_state, delta, &value))) {
os_atomic_rmw_loop_give_up(goto retry);
}
});
if (unlikely(os_add_overflow(dq->dq_side_suspend_cnt,
DISPATCH_QUEUE_SUSPEND_HALF, &dq->dq_side_suspend_cnt))) {
DISPATCH_CLIENT_CRASH(0, "Too many nested calls to dispatch_suspend()");
}
return _dispatch_queue_sidelock_unlock(dq);
retry:
_dispatch_queue_sidelock_unlock(dq);
return dx_vtable(dq)->do_suspend(dq);
}
void
_dispatch_queue_suspend(dispatch_queue_t dq)
{
dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
uint64_t dq_state, value;
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
value = DISPATCH_QUEUE_SUSPEND_INTERVAL;
if (unlikely(os_add_overflow(dq_state, value, &value))) {
os_atomic_rmw_loop_give_up({
return _dispatch_queue_suspend_slow(dq);
});
}
if (!_dq_state_drain_locked(dq_state)) {
value |= DLOCK_OWNER_MASK;
}
});
if (!_dq_state_is_suspended(dq_state)) {
// rdar://8181908 we need to extend the queue life for the duration
// of the call to wakeup at _dispatch_queue_resume() time.
_dispatch_retain_2(dq);
}
}
DISPATCH_NOINLINE
static void
_dispatch_queue_resume_slow(dispatch_queue_t dq)
{
uint64_t dq_state, value, delta;
_dispatch_queue_sidelock_lock(dq);
// what we want to transfer
delta = DISPATCH_QUEUE_SUSPEND_HALF * DISPATCH_QUEUE_SUSPEND_INTERVAL;
// but this is a resume so consume a suspend count at the same time
delta -= DISPATCH_QUEUE_SUSPEND_INTERVAL;
switch (dq->dq_side_suspend_cnt) {
case 0:
goto retry;
case DISPATCH_QUEUE_SUSPEND_HALF:
// we will transition the side count to 0, so we want to clear this bit
delta -= DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT;
break;
}
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
// unsigned overflow of the addition can happen because other
// threads could have touched this value while we were trying to acquire
// the lock, or because another thread raced us to do the same operation
// and got to the lock first.
if (unlikely(os_add_overflow(dq_state, delta, &value))) {
os_atomic_rmw_loop_give_up(goto retry);
}
});
dq->dq_side_suspend_cnt -= DISPATCH_QUEUE_SUSPEND_HALF;
return _dispatch_queue_sidelock_unlock(dq);
retry:
_dispatch_queue_sidelock_unlock(dq);
return dx_vtable(dq)->do_resume(dq, false);
}
DISPATCH_NOINLINE
static void
_dispatch_queue_resume_finalize_activation(dispatch_queue_t dq)
{
bool allow_resume = true;
// Step 2: run the activation finalizer
if (dx_vtable(dq)->do_finalize_activation) {
dx_vtable(dq)->do_finalize_activation(dq, &allow_resume);
}
// Step 3: consume the suspend count
if (allow_resume) {
return dx_vtable(dq)->do_resume(dq, false);
}
}
void
_dispatch_queue_resume(dispatch_queue_t dq, bool activate)
{
// covers all suspend and inactive bits, including side suspend bit
const uint64_t suspend_bits = DISPATCH_QUEUE_SUSPEND_BITS_MASK;
uint64_t pending_barrier_width =
(dq->dq_width - 1) * DISPATCH_QUEUE_WIDTH_INTERVAL;
uint64_t set_owner_and_set_full_width_and_in_barrier =
_dispatch_lock_value_for_self() | DISPATCH_QUEUE_WIDTH_FULL_BIT |
DISPATCH_QUEUE_IN_BARRIER;
// backward compatibility: only dispatch sources can abuse
// dispatch_resume() to really mean dispatch_activate()
bool is_source = (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
uint64_t dq_state, value;
dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT);
// Activation is a bit tricky as it needs to finalize before the wakeup.
//
// If after doing its updates to the suspend count and/or inactive bit,
// the last suspension related bit that would remain is the
// NEEDS_ACTIVATION one, then this function:
//
// 1. moves the state to { sc:1 i:0 na:0 } (converts the needs-activate into
// a suspend count)
// 2. runs the activation finalizer
// 3. consumes the suspend count set in (1), and finishes the resume flow
//
// Concurrently, some property setters such as setting dispatch source
// handlers or _dispatch_queue_set_target_queue try to do in-place changes
// before activation. These protect their action by taking a suspend count.
// Step (1) above cannot happen if such a setter has locked the object.
if (activate) {
// relaxed atomic because this doesn't publish anything, this is only
// about picking the thread that gets to finalize the activation
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, relaxed, {
if ((dq_state & suspend_bits) ==
DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
// { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
value = dq_state - DISPATCH_QUEUE_INACTIVE
- DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_SUSPEND_INTERVAL;
} else if (_dq_state_is_inactive(dq_state)) {
// { sc:>0 i:1 na:1 } -> { i:0 na:1 }
// simple activation because sc is not 0
// resume will deal with na:1 later
value = dq_state - DISPATCH_QUEUE_INACTIVE;
} else {
// object already active, this is a no-op, just exit
os_atomic_rmw_loop_give_up(return);
}
});
} else {
// release barrier needed to publish the effect of
// - dispatch_set_target_queue()
// - dispatch_set_*_handler()
// - do_finalize_activation()
os_atomic_rmw_loop2o(dq, dq_state, dq_state, value, release, {
if ((dq_state & suspend_bits) == DISPATCH_QUEUE_SUSPEND_INTERVAL
+ DISPATCH_QUEUE_NEEDS_ACTIVATION) {
// { sc:1 i:0 na:1 } -> { sc:1 i:0 na:0 }
value = dq_state - DISPATCH_QUEUE_NEEDS_ACTIVATION;
} else if (is_source && (dq_state & suspend_bits) ==
DISPATCH_QUEUE_NEEDS_ACTIVATION + DISPATCH_QUEUE_INACTIVE) {
// { sc:0 i:1 na:1 } -> { sc:1 i:0 na:0 }
value = dq_state - DISPATCH_QUEUE_INACTIVE
- DISPATCH_QUEUE_NEEDS_ACTIVATION
+ DISPATCH_QUEUE_SUSPEND_INTERVAL;
} else if (unlikely(os_sub_overflow(dq_state,
DISPATCH_QUEUE_SUSPEND_INTERVAL, &value))) {
// underflow means over-resume or a suspend count transfer
// to the side count is needed
os_atomic_rmw_loop_give_up({
if (!(dq_state & DISPATCH_QUEUE_HAS_SIDE_SUSPEND_CNT)) {
goto over_resume;
}
return _dispatch_queue_resume_slow(dq);
});
//
// below this, value = dq_state - DISPATCH_QUEUE_SUSPEND_INTERVAL
//
} else if (!_dq_state_is_runnable(value)) {
// Out of width or still suspended.
// For the former, force _dispatch_queue_non_barrier_complete
// to reconsider whether it has work to do
value |= DISPATCH_QUEUE_DIRTY;
} else if (!_dq_state_drain_locked_by(value, DLOCK_OWNER_MASK)) {
dispatch_assert(_dq_state_drain_locked(value));
// still locked by someone else, make drain_try_unlock() fail
// and reconsider whether it has work to do
value |= DISPATCH_QUEUE_DIRTY;
} else if (!is_source && (_dq_state_has_pending_barrier(value) ||
value + pending_barrier_width <
DISPATCH_QUEUE_WIDTH_FULL_BIT)) {
// if we can, acquire the full width drain lock
// and then perform a lock transfer
//
// However this is never useful for a source where there are no
// sync waiters, so never take the lock and do a plain wakeup
value &= DISPATCH_QUEUE_DRAIN_PRESERVED_BITS_MASK;
value |= set_owner_and_set_full_width_and_in_barrier;
} else {
// clear overrides and force a wakeup
value &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
value &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
}
});
}
if ((dq_state ^ value) & DISPATCH_QUEUE_NEEDS_ACTIVATION) {
// we cleared the NEEDS_ACTIVATION bit and we have a valid suspend count
return _dispatch_queue_resume_finalize_activation(dq);
}
if (activate) {
// if we're still in an activate codepath here we should have
// { sc:>0 na:1 }, if not we've got a corrupt state
if (unlikely(!_dq_state_is_suspended(value))) {
DISPATCH_CLIENT_CRASH(dq, "Invalid suspension state");
}
return;
}
if (_dq_state_is_suspended(value)) {
return;
}
if (_dq_state_is_dirty(dq_state)) {
// <rdar://problem/14637483>
// dependency ordering for dq state changes that were flushed
// and not acted upon
os_atomic_thread_fence(dependency);
dq = os_atomic_force_dependency_on(dq, dq_state);
}
// Balancing the retain_2 done in suspend() for rdar://8181908
dispatch_wakeup_flags_t flags = DISPATCH_WAKEUP_CONSUME_2;
if ((dq_state ^ value) & DISPATCH_QUEUE_IN_BARRIER) {
flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
} else if (!_dq_state_is_runnable(value)) {
if (_dq_state_is_base_wlh(dq_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
return _dispatch_release_2(dq);
}
dispatch_assert(!_dq_state_received_sync_wait(dq_state));
dispatch_assert(!_dq_state_in_sync_transfer(dq_state));
return dx_wakeup(dq, _dq_state_max_qos(dq_state), flags);
over_resume:
if (unlikely(_dq_state_is_inactive(dq_state))) {
DISPATCH_CLIENT_CRASH(dq, "Over-resume of an inactive object");
}
DISPATCH_CLIENT_CRASH(dq, "Over-resume of an object");
}
const char *
dispatch_queue_get_label(dispatch_queue_t dq)
{
if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) {
dq = _dispatch_get_current_queue();
}
return dq->dq_label ? dq->dq_label : "";
}
qos_class_t
dispatch_queue_get_qos_class(dispatch_queue_t dq, int *relpri_ptr)
{
dispatch_qos_class_t qos = _dispatch_priority_qos(dq->dq_priority);
if (relpri_ptr) {
*relpri_ptr = qos ? _dispatch_priority_relpri(dq->dq_priority) : 0;
}
return _dispatch_qos_to_qos_class(qos);
}
static void
_dispatch_queue_set_width2(void *ctxt)
{
int w = (int)(intptr_t)ctxt; // intentional truncation
uint32_t tmp;
dispatch_queue_t dq = _dispatch_queue_get_current();
if (w >= 0) {
tmp = w ? (unsigned int)w : 1;
} else {
dispatch_qos_t qos = _dispatch_qos_from_pp(_dispatch_get_priority());
switch (w) {
case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS:
tmp = _dispatch_qos_max_parallelism(qos,
DISPATCH_MAX_PARALLELISM_PHYSICAL);
break;
case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS:
tmp = _dispatch_qos_max_parallelism(qos,
DISPATCH_MAX_PARALLELISM_ACTIVE);
break;
case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS:
default:
tmp = _dispatch_qos_max_parallelism(qos, 0);
break;
}
}
if (tmp > DISPATCH_QUEUE_WIDTH_MAX) {
tmp = DISPATCH_QUEUE_WIDTH_MAX;
}
dispatch_queue_flags_t old_dqf, new_dqf;
os_atomic_rmw_loop2o(dq, dq_atomic_flags, old_dqf, new_dqf, relaxed, {
new_dqf = (old_dqf & DQF_FLAGS_MASK) | DQF_WIDTH(tmp);
});
_dispatch_queue_inherit_wlh_from_target(dq, dq->do_targetq);
_dispatch_object_debug(dq, "%s", __func__);
}
void
dispatch_queue_set_width(dispatch_queue_t dq, long width)
{
if (unlikely(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT ||
dx_hastypeflag(dq, QUEUE_ROOT) ||
dx_hastypeflag(dq, QUEUE_BASE))) {
return;
}
unsigned long type = dx_type(dq);
switch (type) {
case DISPATCH_QUEUE_LEGACY_TYPE:
case DISPATCH_QUEUE_CONCURRENT_TYPE:
break;
case DISPATCH_QUEUE_SERIAL_TYPE:
DISPATCH_CLIENT_CRASH(type, "Cannot set width of a serial queue");
default:
DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
}
if (likely((int)width >= 0)) {
_dispatch_barrier_trysync_or_async_f(dq, (void*)(intptr_t)width,
_dispatch_queue_set_width2);
} else {
// The negative width constants need to execute on the queue to
// query the queue QoS
_dispatch_barrier_async_detached_f(dq, (void*)(intptr_t)width,
_dispatch_queue_set_width2);
}
}
static void
_dispatch_queue_legacy_set_target_queue(void *ctxt)
{
dispatch_queue_t dq = _dispatch_queue_get_current();
dispatch_queue_t tq = ctxt;
dispatch_queue_t otq = dq->do_targetq;
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
#if DISPATCH_ALLOW_NON_LEAF_RETARGET
_dispatch_ktrace3(DISPATCH_PERF_non_leaf_retarget, dq, otq, tq);
_dispatch_bug_deprecated("Changing the target of a queue "
"already targeted by other dispatch objects");
#else
DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
"already targeted by other dispatch objects");
#endif
}
_dispatch_queue_priority_inherit_from_target(dq, tq);
_dispatch_queue_inherit_wlh_from_target(dq, tq);
#if HAVE_PTHREAD_WORKQUEUE_QOS
// see _dispatch_queue_class_wakeup()
_dispatch_queue_sidelock_lock(dq);
#endif
dq->do_targetq = tq;
#if HAVE_PTHREAD_WORKQUEUE_QOS
// see _dispatch_queue_class_wakeup()
_dispatch_queue_sidelock_unlock(dq);
#endif
_dispatch_object_debug(dq, "%s", __func__);
_dispatch_introspection_target_queue_changed(dq);
_dispatch_release_tailcall(otq);
}
void
_dispatch_queue_set_target_queue(dispatch_queue_t dq, dispatch_queue_t tq)
{
dispatch_assert(dq->do_ref_cnt != DISPATCH_OBJECT_GLOBAL_REFCNT &&
dq->do_targetq);
if (unlikely(!tq)) {
bool is_concurrent_q = (dq->dq_width > 1);
tq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, !is_concurrent_q);
}
if (_dispatch_queue_try_inactive_suspend(dq)) {
_dispatch_object_set_target_queue_inline(dq, tq);
return dx_vtable(dq)->do_resume(dq, false);
}
#if !DISPATCH_ALLOW_NON_LEAF_RETARGET
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
"already targeted by other dispatch objects");
}
#endif
if (unlikely(!_dispatch_queue_is_legacy(dq))) {
#if DISPATCH_ALLOW_NON_LEAF_RETARGET
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
DISPATCH_CLIENT_CRASH(0, "Cannot change the target of a queue "
"already targeted by other dispatch objects");
}
#endif
DISPATCH_CLIENT_CRASH(0, "Cannot change the target of this object "
"after it has been activated");
}
unsigned long type = dx_type(dq);
switch (type) {
case DISPATCH_QUEUE_LEGACY_TYPE:
#if DISPATCH_ALLOW_NON_LEAF_RETARGET
if (_dispatch_queue_atomic_flags(dq) & DQF_TARGETED) {
_dispatch_bug_deprecated("Changing the target of a queue "
"already targeted by other dispatch objects");
}
#endif
break;
case DISPATCH_SOURCE_KEVENT_TYPE:
case DISPATCH_MACH_CHANNEL_TYPE:
_dispatch_ktrace1(DISPATCH_PERF_post_activate_retarget, dq);
_dispatch_bug_deprecated("Changing the target of a source "
"after it has been activated");
break;
default:
DISPATCH_CLIENT_CRASH(type, "Unexpected dispatch object type");
}
_dispatch_retain(tq);
return _dispatch_barrier_trysync_or_async_f(dq, tq,
_dispatch_queue_legacy_set_target_queue);
}
#pragma mark -
#pragma mark dispatch_mgr_queue
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
static struct dispatch_pthread_root_queue_context_s
_dispatch_mgr_root_queue_pthread_context;
static struct dispatch_root_queue_context_s
_dispatch_mgr_root_queue_context = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_kworkqueue = (void*)(~0ul),
#endif
.dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context,
.dgq_thread_pool_size = 1,
}}};
static struct dispatch_queue_s _dispatch_mgr_root_queue = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_root),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = &_dispatch_mgr_root_queue_context,
.dq_label = "com.apple.root.libdispatch-manager",
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = DISPATCH_PRIORITY_FLAG_MANAGER |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = 3,
};
#endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
static struct {
volatile int prio;
volatile qos_class_t qos;
int default_prio;
int policy;
pthread_t tid;
} _dispatch_mgr_sched;
static dispatch_once_t _dispatch_mgr_sched_pred;
#if HAVE_PTHREAD_WORKQUEUE_QOS
// TODO: switch to "event-reflector thread" property <rdar://problem/18126138>
// Must be kept in sync with list of qos classes in sys/qos.h
static const int _dispatch_mgr_sched_qos2prio[] = {
[QOS_CLASS_MAINTENANCE] = 4,
[QOS_CLASS_BACKGROUND] = 4,
[QOS_CLASS_UTILITY] = 20,
[QOS_CLASS_DEFAULT] = 31,
[QOS_CLASS_USER_INITIATED] = 37,
[QOS_CLASS_USER_INTERACTIVE] = 47,
};
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
static void
_dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED)
{
struct sched_param param;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
pthread_attr_t *attr;
attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
#else
pthread_attr_t a, *attr = &a;
#endif
(void)dispatch_assume_zero(pthread_attr_init(attr));
(void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr,
&_dispatch_mgr_sched.policy));
(void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
#if HAVE_PTHREAD_WORKQUEUE_QOS
qos_class_t qos = qos_class_main();
if (qos == QOS_CLASS_DEFAULT) {
qos = QOS_CLASS_USER_INITIATED; // rdar://problem/17279292
}
if (qos) {
_dispatch_mgr_sched.qos = qos;
param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
}
#endif
_dispatch_mgr_sched.default_prio = param.sched_priority;
_dispatch_mgr_sched.prio = _dispatch_mgr_sched.default_prio;
}
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_USE_KEVENT_WORKQUEUE
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
DISPATCH_NOINLINE
static pthread_t *
_dispatch_mgr_root_queue_init(void)
{
dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
struct sched_param param;
pthread_attr_t *attr;
attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
(void)dispatch_assume_zero(pthread_attr_setdetachstate(attr,
PTHREAD_CREATE_DETACHED));
#if !DISPATCH_DEBUG
(void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024));
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
qos_class_t qos = _dispatch_mgr_sched.qos;
if (qos) {
if (_dispatch_set_qos_class_enabled) {
(void)dispatch_assume_zero(pthread_attr_set_qos_class_np(attr,
qos, 0));
}
}
#endif
param.sched_priority = _dispatch_mgr_sched.prio;
if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(pthread_attr_setschedparam(attr, &param));
}
return &_dispatch_mgr_sched.tid;
}
static inline void
_dispatch_mgr_priority_apply(void)
{
struct sched_param param;
do {
param.sched_priority = _dispatch_mgr_sched.prio;
if (param.sched_priority > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(pthread_setschedparam(
_dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy,
&param));
}
} while (_dispatch_mgr_sched.prio > param.sched_priority);
}
DISPATCH_NOINLINE
void
_dispatch_mgr_priority_init(void)
{
struct sched_param param;
pthread_attr_t *attr;
attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr;
(void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
#if HAVE_PTHREAD_WORKQUEUE_QOS
qos_class_t qos = 0;
(void)pthread_attr_get_qos_class_np(attr, &qos, NULL);
if (_dispatch_mgr_sched.qos > qos && _dispatch_set_qos_class_enabled) {
(void)pthread_set_qos_class_self_np(_dispatch_mgr_sched.qos, 0);
int p = _dispatch_mgr_sched_qos2prio[_dispatch_mgr_sched.qos];
if (p > param.sched_priority) {
param.sched_priority = p;
}
}
#endif
if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) {
return _dispatch_mgr_priority_apply();
}
}
#endif // DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
DISPATCH_NOINLINE
static void
_dispatch_mgr_priority_raise(const pthread_attr_t *attr)
{
dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
struct sched_param param;
(void)dispatch_assume_zero(pthread_attr_getschedparam(attr, &param));
#if HAVE_PTHREAD_WORKQUEUE_QOS
qos_class_t q, qos = 0;
(void)pthread_attr_get_qos_class_np((pthread_attr_t *)attr, &qos, NULL);
if (qos) {
param.sched_priority = _dispatch_mgr_sched_qos2prio[qos];
os_atomic_rmw_loop2o(&_dispatch_mgr_sched, qos, q, qos, relaxed, {
if (q >= qos) os_atomic_rmw_loop_give_up(break);
});
}
#endif
int p, prio = param.sched_priority;
os_atomic_rmw_loop2o(&_dispatch_mgr_sched, prio, p, prio, relaxed, {
if (p >= prio) os_atomic_rmw_loop_give_up(return);
});
#if DISPATCH_USE_KEVENT_WORKQUEUE
_dispatch_root_queues_init();
if (_dispatch_kevent_workqueue_enabled) {
pthread_priority_t pp = 0;
if (prio > _dispatch_mgr_sched.default_prio) {
// The values of _PTHREAD_PRIORITY_SCHED_PRI_FLAG and
// _PTHREAD_PRIORITY_ROOTQUEUE_FLAG overlap, but that is not
// problematic in this case, since it the second one is only ever
// used on dq_priority fields.
// We never pass the _PTHREAD_PRIORITY_ROOTQUEUE_FLAG to a syscall,
// it is meaningful to libdispatch only.
pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
} else if (qos) {
pp = _pthread_qos_class_encode(qos, 0, 0);
}
if (pp) {
int r = _pthread_workqueue_set_event_manager_priority(pp);
(void)dispatch_assume_zero(r);
}
return;
}
#endif
#if DISPATCH_USE_MGR_THREAD
if (_dispatch_mgr_sched.tid) {
return _dispatch_mgr_priority_apply();
}
#endif
}
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
#if DISPATCH_USE_KEVENT_WORKQUEUE
void
_dispatch_kevent_workqueue_init(void)
{
// Initialize kevent workqueue support
_dispatch_root_queues_init();
if (!_dispatch_kevent_workqueue_enabled) return;
dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init);
qos_class_t qos = _dispatch_mgr_sched.qos;
int prio = _dispatch_mgr_sched.prio;
pthread_priority_t pp = 0;
if (qos) {
pp = _pthread_qos_class_encode(qos, 0, 0);
}
if (prio > _dispatch_mgr_sched.default_prio) {
pp = (pthread_priority_t)prio | _PTHREAD_PRIORITY_SCHED_PRI_FLAG;
}
if (pp) {
int r = _pthread_workqueue_set_event_manager_priority(pp);
(void)dispatch_assume_zero(r);
}
}
#endif // DISPATCH_USE_KEVENT_WORKQUEUE
#pragma mark -
#pragma mark dispatch_pthread_root_queue
#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
static dispatch_queue_t
_dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
const pthread_attr_t *attr, dispatch_block_t configure,
dispatch_pthread_root_queue_observer_hooks_t observer_hooks)
{
dispatch_queue_t dq;
dispatch_root_queue_context_t qc;
dispatch_pthread_root_queue_context_t pqc;
dispatch_queue_flags_t dqf = 0;
size_t dqs;
int32_t pool_size = flags & _DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE ?
(int8_t)(flags & ~_DISPATCH_PTHREAD_ROOT_QUEUE_FLAG_POOL_SIZE) : 0;
dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD;
dqs = roundup(dqs, _Alignof(struct dispatch_root_queue_context_s));
dq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_root), dqs +
sizeof(struct dispatch_root_queue_context_s) +
sizeof(struct dispatch_pthread_root_queue_context_s));
qc = (void*)dq + dqs;
dispatch_assert((uintptr_t)qc % _Alignof(typeof(*qc)) == 0);
pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s);
dispatch_assert((uintptr_t)pqc % _Alignof(typeof(*pqc)) == 0);
if (label) {
const char *tmp = _dispatch_strdup_if_mutable(label);
if (tmp != label) {
dqf |= DQF_LABEL_NEEDS_FREE;
label = tmp;
}
}
_dispatch_queue_init(dq, dqf, DISPATCH_QUEUE_WIDTH_POOL, 0);
dq->dq_label = label;
dq->dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE;
dq->do_ctxt = qc;
dq->dq_priority = DISPATCH_PRIORITY_SATURATED_OVERRIDE;
pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore);
qc->dgq_ctxt = pqc;
#if DISPATCH_USE_WORKQUEUES
qc->dgq_kworkqueue = (void*)(~0ul);
#endif
_dispatch_root_queue_init_pthread_pool(qc, pool_size, true);
if (attr) {
memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t));
_dispatch_mgr_priority_raise(&pqc->dpq_thread_attr);
} else {
(void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr));
}
(void)dispatch_assume_zero(pthread_attr_setdetachstate(
&pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED));
if (configure) {
pqc->dpq_thread_configure = _dispatch_Block_copy(configure);
}
if (observer_hooks) {
pqc->dpq_observer_hooks = *observer_hooks;
}
_dispatch_object_debug(dq, "%s", __func__);
return _dispatch_introspection_queue_create(dq);
}
dispatch_queue_t
dispatch_pthread_root_queue_create(const char *label, unsigned long flags,
const pthread_attr_t *attr, dispatch_block_t configure)
{
return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
NULL);
}
#if DISPATCH_IOHID_SPI
dispatch_queue_t
_dispatch_pthread_root_queue_create_with_observer_hooks_4IOHID(const char *label,
unsigned long flags, const pthread_attr_t *attr,
dispatch_pthread_root_queue_observer_hooks_t observer_hooks,
dispatch_block_t configure)
{
if (!observer_hooks->queue_will_execute ||
!observer_hooks->queue_did_execute) {
DISPATCH_CLIENT_CRASH(0, "Invalid pthread root queue observer hooks");
}
return _dispatch_pthread_root_queue_create(label, flags, attr, configure,
observer_hooks);
}
#endif
dispatch_queue_t
dispatch_pthread_root_queue_copy_current(void)
{
dispatch_queue_t dq = _dispatch_queue_get_current();
if (!dq) return NULL;
while (unlikely(dq->do_targetq)) {
dq = dq->do_targetq;
}
if (dx_type(dq) != DISPATCH_QUEUE_GLOBAL_ROOT_TYPE ||
dq->do_xref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) {
return NULL;
}
return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
}
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
void
_dispatch_pthread_root_queue_dispose(dispatch_queue_t dq, bool *allow_free)
{
if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) {
DISPATCH_INTERNAL_CRASH(dq, "Global root queue disposed");
}
_dispatch_object_debug(dq, "%s", __func__);
_dispatch_introspection_queue_dispose(dq);
#if DISPATCH_USE_PTHREAD_POOL
dispatch_root_queue_context_t qc = dq->do_ctxt;
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
pthread_attr_destroy(&pqc->dpq_thread_attr);
_dispatch_semaphore_dispose(&pqc->dpq_thread_mediator, NULL);
if (pqc->dpq_thread_configure) {
Block_release(pqc->dpq_thread_configure);
}
dq->do_targetq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
#endif
if (dq->dq_label && _dispatch_queue_label_needs_free(dq)) {
free((void*)dq->dq_label);
}
_dispatch_queue_destroy(dq, allow_free);
}
#pragma mark -
#pragma mark dispatch_queue_specific
struct dispatch_queue_specific_queue_s {
DISPATCH_QUEUE_HEADER(queue_specific_queue);
TAILQ_HEAD(dispatch_queue_specific_head_s,
dispatch_queue_specific_s) dqsq_contexts;
} DISPATCH_ATOMIC64_ALIGN;
struct dispatch_queue_specific_s {
const void *dqs_key;
void *dqs_ctxt;
dispatch_function_t dqs_destructor;
TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list;
};
DISPATCH_DECL(dispatch_queue_specific);
void
_dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq,
bool *allow_free)
{
dispatch_queue_specific_t dqs, tmp;
dispatch_queue_t rq = _dispatch_get_root_queue(DISPATCH_QOS_DEFAULT, false);
TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) {
if (dqs->dqs_destructor) {
dispatch_async_f(rq, dqs->dqs_ctxt, dqs->dqs_destructor);
}
free(dqs);
}
_dispatch_queue_destroy(dqsq->_as_dq, allow_free);
}
static void
_dispatch_queue_init_specific(dispatch_queue_t dq)
{
dispatch_queue_specific_queue_t dqsq;
dqsq = _dispatch_object_alloc(DISPATCH_VTABLE(queue_specific_queue),
sizeof(struct dispatch_queue_specific_queue_s));
_dispatch_queue_init(dqsq->_as_dq, DQF_NONE, DISPATCH_QUEUE_WIDTH_MAX,
DISPATCH_QUEUE_ROLE_BASE_ANON);
dqsq->do_xref_cnt = -1;
dqsq->do_targetq = _dispatch_get_root_queue(
DISPATCH_QOS_USER_INITIATED, true);
dqsq->dq_label = "queue-specific";
TAILQ_INIT(&dqsq->dqsq_contexts);
if (slowpath(!os_atomic_cmpxchg2o(dq, dq_specific_q, NULL,
dqsq->_as_dq, release))) {
_dispatch_release(dqsq->_as_dq);
}
}
static void
_dispatch_queue_set_specific(void *ctxt)
{
dispatch_queue_specific_t dqs, dqsn = ctxt;
dispatch_queue_specific_queue_t dqsq =
(dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
if (dqs->dqs_key == dqsn->dqs_key) {
// Destroy previous context for existing key
if (dqs->dqs_destructor) {
dispatch_async_f(_dispatch_get_root_queue(
DISPATCH_QOS_DEFAULT, false), dqs->dqs_ctxt,
dqs->dqs_destructor);
}
if (dqsn->dqs_ctxt) {
// Copy new context for existing key
dqs->dqs_ctxt = dqsn->dqs_ctxt;
dqs->dqs_destructor = dqsn->dqs_destructor;
} else {
// Remove context storage for existing key
TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list);
free(dqs);
}
return free(dqsn);
}
}
// Insert context storage for new key
TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list);
}
DISPATCH_NOINLINE
void
dispatch_queue_set_specific(dispatch_queue_t dq, const void *key,
void *ctxt, dispatch_function_t destructor)
{
if (slowpath(!key)) {
return;
}
dispatch_queue_specific_t dqs;
dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s));
dqs->dqs_key = key;
dqs->dqs_ctxt = ctxt;
dqs->dqs_destructor = destructor;
if (slowpath(!dq->dq_specific_q)) {
_dispatch_queue_init_specific(dq);
}
_dispatch_barrier_trysync_or_async_f(dq->dq_specific_q, dqs,
_dispatch_queue_set_specific);
}
static void
_dispatch_queue_get_specific(void *ctxt)
{
void **ctxtp = ctxt;
void *key = *ctxtp;
dispatch_queue_specific_queue_t dqsq =
(dispatch_queue_specific_queue_t)_dispatch_queue_get_current();
dispatch_queue_specific_t dqs;
TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) {
if (dqs->dqs_key == key) {
*ctxtp = dqs->dqs_ctxt;
return;
}
}
*ctxtp = NULL;
}
DISPATCH_ALWAYS_INLINE
static inline void *
_dispatch_queue_get_specific_inline(dispatch_queue_t dq, const void *key)
{
void *ctxt = NULL;
if (fastpath(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE && dq->dq_specific_q)){
ctxt = (void *)key;
dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific);
}
return ctxt;
}
DISPATCH_NOINLINE
void *
dispatch_queue_get_specific(dispatch_queue_t dq, const void *key)
{
if (slowpath(!key)) {
return NULL;
}
return _dispatch_queue_get_specific_inline(dq, key);
}
DISPATCH_NOINLINE
void *
dispatch_get_specific(const void *key)
{
if (slowpath(!key)) {
return NULL;
}
void *ctxt = NULL;
dispatch_queue_t dq = _dispatch_queue_get_current();
while (slowpath(dq)) {
ctxt = _dispatch_queue_get_specific_inline(dq, key);
if (ctxt) break;
dq = dq->do_targetq;
}
return ctxt;
}
#if DISPATCH_IOHID_SPI
bool
_dispatch_queue_is_exclusively_owned_by_current_thread_4IOHID(
dispatch_queue_t dq) // rdar://problem/18033810
{
if (dq->dq_width != 1) {
DISPATCH_CLIENT_CRASH(dq->dq_width, "Invalid queue type");
}
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
return _dq_state_drain_locked_by_self(dq_state);
}
#endif
#pragma mark -
#pragma mark dispatch_queue_debug
size_t
_dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz)
{
size_t offset = 0;
dispatch_queue_t target = dq->do_targetq;
const char *tlabel = target && target->dq_label ? target->dq_label : "";
uint64_t dq_state = os_atomic_load2o(dq, dq_state, relaxed);
offset += dsnprintf(&buf[offset], bufsiz - offset, "sref = %d, "
"target = %s[%p], width = 0x%x, state = 0x%016llx",
dq->dq_sref_cnt + 1, tlabel, target, dq->dq_width,
(unsigned long long)dq_state);
if (_dq_state_is_suspended(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", suspended = %d",
_dq_state_suspend_cnt(dq_state));
}
if (_dq_state_is_inactive(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", inactive");
} else if (_dq_state_needs_activation(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", needs-activation");
}
if (_dq_state_is_enqueued(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", enqueued");
}
if (_dq_state_is_dirty(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", dirty");
}
dispatch_qos_t qos = _dq_state_max_qos(dq_state);
if (qos) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", max qos %d", qos);
}
mach_port_t owner = _dq_state_drain_owner(dq_state);
if (!_dispatch_queue_is_thread_bound(dq) && owner) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", draining on 0x%x",
owner);
}
if (_dq_state_is_in_barrier(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-barrier");
} else {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", in-flight = %d",
_dq_state_used_width(dq_state, dq->dq_width));
}
if (_dq_state_has_pending_barrier(dq_state)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", pending-barrier");
}
if (_dispatch_queue_is_thread_bound(dq)) {
offset += dsnprintf(&buf[offset], bufsiz - offset, ", thread = 0x%x ",
owner);
}
return offset;
}
size_t
dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz)
{
size_t offset = 0;
offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
dq->dq_label ? dq->dq_label : dx_kind(dq), dq);
offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset);
offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset);
offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
return offset;
}
#if DISPATCH_DEBUG
void
dispatch_debug_queue(dispatch_queue_t dq, const char* str) {
if (fastpath(dq)) {
_dispatch_object_debug(dq, "%s", str);
} else {
_dispatch_log("queue[NULL]: %s", str);
}
}
#endif
#if DISPATCH_PERF_MON
#define DISPATCH_PERF_MON_BUCKETS 8
static struct {
uint64_t volatile time_total;
uint64_t volatile count_total;
uint64_t volatile thread_total;
} _dispatch_stats[DISPATCH_PERF_MON_BUCKETS];
DISPATCH_USED static size_t _dispatch_stat_buckets = DISPATCH_PERF_MON_BUCKETS;
void
_dispatch_queue_merge_stats(uint64_t start, bool trace, perfmon_thread_type type)
{
uint64_t delta = _dispatch_absolute_time() - start;
unsigned long count;
int bucket = 0;
count = (unsigned long)_dispatch_thread_getspecific(dispatch_bcounter_key);
_dispatch_thread_setspecific(dispatch_bcounter_key, NULL);
if (count == 0) {
bucket = 0;
if (trace) _dispatch_ktrace1(DISPATCH_PERF_MON_worker_useless, type);
} else {
bucket = MIN(DISPATCH_PERF_MON_BUCKETS - 1,
(int)sizeof(count) * CHAR_BIT - __builtin_clzl(count));
os_atomic_add(&_dispatch_stats[bucket].count_total, count, relaxed);
}
os_atomic_add(&_dispatch_stats[bucket].time_total, delta, relaxed);
os_atomic_inc(&_dispatch_stats[bucket].thread_total, relaxed);
if (trace) {
_dispatch_ktrace3(DISPATCH_PERF_MON_worker_thread_end, count, delta, type);
}
}
#endif
#pragma mark -
#pragma mark _dispatch_set_priority_and_mach_voucher
#if HAVE_PTHREAD_WORKQUEUE_QOS
DISPATCH_NOINLINE
void
_dispatch_set_priority_and_mach_voucher_slow(pthread_priority_t pp,
mach_voucher_t kv)
{
_pthread_set_flags_t pflags = 0;
if (pp && _dispatch_set_qos_class_enabled) {
pthread_priority_t old_pri = _dispatch_get_priority();
if (pp != old_pri) {
if (old_pri & _PTHREAD_PRIORITY_NEEDS_UNBIND_FLAG) {
pflags |= _PTHREAD_SET_SELF_WQ_KEVENT_UNBIND;
// when we unbind, overcomitness can flip, so we need to learn
// it from the defaultpri, see _dispatch_priority_compute_update
pp |= (_dispatch_get_basepri() &
DISPATCH_PRIORITY_FLAG_OVERCOMMIT);
} else {
// else we need to keep the one that is set in the current pri
pp |= (old_pri & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG);
}
if (likely(old_pri & ~_PTHREAD_PRIORITY_FLAGS_MASK)) {
pflags |= _PTHREAD_SET_SELF_QOS_FLAG;
}
uint64_t mgr_dq_state =
os_atomic_load2o(&_dispatch_mgr_q, dq_state, relaxed);
if (unlikely(_dq_state_drain_locked_by_self(mgr_dq_state))) {
DISPATCH_INTERNAL_CRASH(pp,
"Changing the QoS while on the manager queue");
}
if (unlikely(pp & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG)) {
DISPATCH_INTERNAL_CRASH(pp, "Cannot raise oneself to manager");
}
if (old_pri & _PTHREAD_PRIORITY_EVENT_MANAGER_FLAG) {
DISPATCH_INTERNAL_CRASH(old_pri,
"Cannot turn a manager thread into a normal one");
}
}
}
if (kv != VOUCHER_NO_MACH_VOUCHER) {
#if VOUCHER_USE_MACH_VOUCHER
pflags |= _PTHREAD_SET_SELF_VOUCHER_FLAG;
#endif
}
if (!pflags) return;
int r = _pthread_set_properties_self(pflags, pp, kv);
if (r == EINVAL) {
DISPATCH_INTERNAL_CRASH(pp, "_pthread_set_properties_self failed");
}
(void)dispatch_assume_zero(r);
}
DISPATCH_NOINLINE
voucher_t
_dispatch_set_priority_and_voucher_slow(pthread_priority_t priority,
voucher_t v, dispatch_thread_set_self_t flags)
{
voucher_t ov = DISPATCH_NO_VOUCHER;
mach_voucher_t kv = VOUCHER_NO_MACH_VOUCHER;
if (v != DISPATCH_NO_VOUCHER) {
bool retained = flags & DISPATCH_VOUCHER_CONSUME;
ov = _voucher_get();
if (ov == v && (flags & DISPATCH_VOUCHER_REPLACE)) {
if (retained && v) _voucher_release_no_dispose(v);
ov = DISPATCH_NO_VOUCHER;
} else {
if (!retained && v) _voucher_retain(v);
kv = _voucher_swap_and_get_mach_voucher(ov, v);
}
}
if (!(flags & DISPATCH_THREAD_PARK)) {
_dispatch_set_priority_and_mach_voucher_slow(priority, kv);
}
if (ov != DISPATCH_NO_VOUCHER && (flags & DISPATCH_VOUCHER_REPLACE)) {
if (ov) _voucher_release(ov);
ov = DISPATCH_NO_VOUCHER;
}
return ov;
}
#endif
#pragma mark -
#pragma mark dispatch_continuation_t
const struct dispatch_continuation_vtable_s _dispatch_continuation_vtables[] = {
DC_VTABLE_ENTRY(ASYNC_REDIRECT,
.do_kind = "dc-redirect",
.do_invoke = _dispatch_async_redirect_invoke),
#if HAVE_MACH
DC_VTABLE_ENTRY(MACH_SEND_BARRRIER_DRAIN,
.do_kind = "dc-mach-send-drain",
.do_invoke = _dispatch_mach_send_barrier_drain_invoke),
DC_VTABLE_ENTRY(MACH_SEND_BARRIER,
.do_kind = "dc-mach-send-barrier",
.do_invoke = _dispatch_mach_barrier_invoke),
DC_VTABLE_ENTRY(MACH_RECV_BARRIER,
.do_kind = "dc-mach-recv-barrier",
.do_invoke = _dispatch_mach_barrier_invoke),
DC_VTABLE_ENTRY(MACH_ASYNC_REPLY,
.do_kind = "dc-mach-async-reply",
.do_invoke = _dispatch_mach_msg_async_reply_invoke),
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
DC_VTABLE_ENTRY(OVERRIDE_STEALING,
.do_kind = "dc-override-stealing",
.do_invoke = _dispatch_queue_override_invoke),
DC_VTABLE_ENTRY(OVERRIDE_OWNING,
.do_kind = "dc-override-owning",
.do_invoke = _dispatch_queue_override_invoke),
#endif
};
static void
_dispatch_force_cache_cleanup(void)
{
dispatch_continuation_t dc;
dc = _dispatch_thread_getspecific(dispatch_cache_key);
if (dc) {
_dispatch_thread_setspecific(dispatch_cache_key, NULL);
_dispatch_cache_cleanup(dc);
}
}