Merge pull request #253 from dgrove-oss/linux-qos-prioritty
Convert dispatch_workq from legacy priorities to qos
diff --git a/src/event/workqueue.c b/src/event/workqueue.c
index 0b9bc0a..dbc6593 100644
--- a/src/event/workqueue.c
+++ b/src/event/workqueue.c
@@ -69,7 +69,7 @@
int num_registered_tids;
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
-static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
+static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
#pragma mark Implementation of the monitoring subsystem.
@@ -80,12 +80,13 @@
static dispatch_once_t _dispatch_workq_init_once_pred;
void
-_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
+_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
{
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
#if HAVE_DISPATCH_WORKQ_MONITORING
- dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
+ dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_thread_getspecific(tid);
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@
}
void
-_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
+_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
- dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
+ dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_tid tid = _dispatch_thread_getspecific(tid);
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +179,10 @@
static void
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
{
- // TODO: Once we switch away from the legacy priorities to
- // newer QoS, we can loop in order of decreasing QoS
- // and track the total number of runnable threads seen
- // across pools. We can then use that number to
- // implement a global policy where low QoS queues
- // are not eligible for over-subscription if the higher
- // QoS queues have already consumed the target
- // number of threads.
- for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
- dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
+ int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
+ int global_runnable = 0;
+ for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
dispatch_queue_t dq = mon->dq;
if (!_dispatch_queue_class_probe(dq)) {
@@ -198,8 +194,10 @@
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
dq->dq_label, mon->num_runnable, mon->target_runnable);
+ global_runnable += mon->num_runnable;
+
if (mon->num_runnable == 0) {
- // We are below target, and no worker is runnable.
+ // We have work, but no worker is runnable.
// It is likely the program is stalled. Therefore treat
// this as if dq were an overcommit queue and call poke
// with the limit being the maximum number of workers for dq.
@@ -207,7 +205,9 @@
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
- } else if (mon->num_runnable < mon->target_runnable) {
+ global_runnable += 1; // account for poke in global estimate
+ } else if (mon->num_runnable < mon->target_runnable &&
+ global_runnable < global_soft_max) {
// We are below target, but some workers are still runnable.
// We want to oversubscribe to hit the desired load target.
// However, this under-utilization may be transitory so set the
@@ -218,42 +218,20 @@
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
+ global_runnable += 1; // account for poke in global estimate
}
}
}
#endif // HAVE_DISPATCH_WORKQ_MONITORING
-
-// temporary until we switch over to QoS based interface.
-static dispatch_queue_t
-get_root_queue_from_legacy_priority(int priority)
-{
- switch (priority) {
- case WORKQ_HIGH_PRIOQUEUE:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
- case WORKQ_DEFAULT_PRIOQUEUE:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
- case WORKQ_LOW_PRIOQUEUE:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
- case WORKQ_BG_PRIOQUEUE:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
- case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
- case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
- return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
- default:
- return NULL;
- }
-}
-
static void
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
int target_runnable = dispatch_hw_config(active_cpus);
- for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
- dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
- mon->dq = get_root_queue_from_legacy_priority(i);
+ for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+ dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
+ mon->dq = _dispatch_get_root_queue(i, false);
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
mon->registered_tids = buf;
mon->target_runnable = target_runnable;
diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h
index 9f8fc3a..94dfe4e 100644
--- a/src/event/workqueue_internal.h
+++ b/src/event/workqueue_internal.h
@@ -27,22 +27,12 @@
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
#define __DISPATCH_WORKQUEUE_INTERNAL__
-/* Work queue priority attributes. */
-#define WORKQ_HIGH_PRIOQUEUE 0
-#define WORKQ_DEFAULT_PRIOQUEUE 1
-#define WORKQ_LOW_PRIOQUEUE 2
-#define WORKQ_BG_PRIOQUEUE 3
-#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
-#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
-
-#define WORKQ_NUM_PRIORITIES 6
-
#define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
-void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
-void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
+void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
+void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
#if defined(__linux__)
#define HAVE_DISPATCH_WORKQ_MONITORING 1
diff --git a/src/queue.c b/src/queue.c
index 435ac96..4d506ef 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -37,6 +37,9 @@
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
#endif
+#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
+#define DISPATCH_USE_WORKQ_PRIORITY 1
+#endif
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
#define pthread_workqueue_t void*
@@ -158,7 +161,10 @@
int volatile dgq_pending;
#if DISPATCH_USE_WORKQUEUES
qos_class_t dgq_qos;
- int dgq_wq_priority, dgq_wq_options;
+#if DISPATCH_USE_WORKQ_PRIORITY
+ int dgq_wq_priority;
+#endif
+ int dgq_wq_options;
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
pthread_workqueue_t dgq_kworkqueue;
#endif
@@ -186,7 +192,9 @@
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -197,7 +205,9 @@
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -208,7 +218,9 @@
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -219,7 +231,9 @@
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -230,7 +244,9 @@
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -241,7 +257,9 @@
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -252,7 +270,9 @@
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -263,7 +283,9 @@
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -274,7 +296,9 @@
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -285,7 +309,9 @@
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -296,7 +322,9 @@
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
.dgq_wq_options = 0,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -307,7 +335,9 @@
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
#if DISPATCH_USE_WORKQUEUES
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
#endif
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5809,7 +5839,7 @@
bool manager = (dq == &_dispatch_mgr_root_queue);
bool monitored = !(overcommit || manager);
if (monitored) {
- _dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
+ _dispatch_workq_worker_register(dq, qc->dgq_qos);
}
#endif
@@ -5823,7 +5853,7 @@
#if DISPATCH_USE_INTERNAL_WORKQUEUE
if (monitored) {
- _dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
+ _dispatch_workq_worker_unregister(dq, qc->dgq_qos);
}
#endif
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);