Merge pull request #253 from dgrove-oss/linux-qos-prioritty

Convert dispatch_workq from legacy priorities to qos
diff --git a/src/event/workqueue.c b/src/event/workqueue.c
index 0b9bc0a..dbc6593 100644
--- a/src/event/workqueue.c
+++ b/src/event/workqueue.c
@@ -69,7 +69,7 @@
 	int num_registered_tids;
 } dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
 
-static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
+static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
 
 #pragma mark Implementation of the monitoring subsystem.
 
@@ -80,12 +80,13 @@
 static dispatch_once_t _dispatch_workq_init_once_pred;
 
 void
-_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
+_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
 {
 	dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
 
 #if HAVE_DISPATCH_WORKQ_MONITORING
-	dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
+	dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+	dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
 	dispatch_assert(mon->dq == root_q);
 	dispatch_tid tid = _dispatch_thread_getspecific(tid);
 	_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@
 }
 
 void
-_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
+_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
 {
 #if HAVE_DISPATCH_WORKQ_MONITORING
-	dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
+	dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
+	dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
 	dispatch_tid tid = _dispatch_thread_getspecific(tid);
 	_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
 	for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +179,10 @@
 static void
 _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
 {
-	// TODO: Once we switch away from the legacy priorities to
-	//       newer QoS, we can loop in order of decreasing QoS
-	//       and track the total number of runnable threads seen
-	//       across pools.  We can then use that number to
-	//       implement a global policy where low QoS queues
-	//       are not eligible for over-subscription if the higher
-	//       QoS queues have already consumed the target
-	//       number of threads.
-	for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
-		dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
+	int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
+	int global_runnable = 0;
+	for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+		dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
 		dispatch_queue_t dq = mon->dq;
 
 		if (!_dispatch_queue_class_probe(dq)) {
@@ -198,8 +194,10 @@
 		_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
 				dq->dq_label, mon->num_runnable, mon->target_runnable);
 
+		global_runnable += mon->num_runnable;
+
 		if (mon->num_runnable == 0) {
-			// We are below target, and no worker is runnable.
+			// We have work, but no worker is runnable.
 			// It is likely the program is stalled. Therefore treat
 			// this as if dq were an overcommit queue and call poke
 			// with the limit being the maximum number of workers for dq.
@@ -207,7 +205,9 @@
 			_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
 					dq->dq_label, floor);
 			_dispatch_global_queue_poke(dq, 1, floor);
-		} else if (mon->num_runnable < mon->target_runnable) {
+			global_runnable += 1; // account for poke in global estimate
+		} else if (mon->num_runnable < mon->target_runnable &&
+				   global_runnable < global_soft_max) {
 			// We are below target, but some workers are still runnable.
 			// We want to oversubscribe to hit the desired load target.
 			// However, this under-utilization may be transitory so set the
@@ -218,42 +218,20 @@
 			_dispatch_debug("workq: %s under utilization target; poking with floor %d",
 					dq->dq_label, floor);
 			_dispatch_global_queue_poke(dq, 1, floor);
+			global_runnable += 1; // account for poke in global estimate
 		}
 	}
 }
 #endif // HAVE_DISPATCH_WORKQ_MONITORING
 
-
-// temporary until we switch over to QoS based interface.
-static dispatch_queue_t
-get_root_queue_from_legacy_priority(int priority)
-{
-	switch (priority) {
-	case WORKQ_HIGH_PRIOQUEUE:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
-	case WORKQ_DEFAULT_PRIOQUEUE:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
-	case WORKQ_LOW_PRIOQUEUE:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
-	case WORKQ_BG_PRIOQUEUE:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
-	case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
-	case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
-		return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
-	default:
-		return NULL;
-	}
-}
-
 static void
 _dispatch_workq_init_once(void *context DISPATCH_UNUSED)
 {
 #if HAVE_DISPATCH_WORKQ_MONITORING
 	int target_runnable = dispatch_hw_config(active_cpus);
-	for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
-		dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
-		mon->dq = get_root_queue_from_legacy_priority(i);
+	for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
+		dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
+		mon->dq = _dispatch_get_root_queue(i, false);
 		void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
 		mon->registered_tids = buf;
 		mon->target_runnable = target_runnable;
diff --git a/src/event/workqueue_internal.h b/src/event/workqueue_internal.h
index 9f8fc3a..94dfe4e 100644
--- a/src/event/workqueue_internal.h
+++ b/src/event/workqueue_internal.h
@@ -27,22 +27,12 @@
 #ifndef __DISPATCH_WORKQUEUE_INTERNAL__
 #define __DISPATCH_WORKQUEUE_INTERNAL__
 
-/* Work queue priority attributes. */
-#define WORKQ_HIGH_PRIOQUEUE             0
-#define WORKQ_DEFAULT_PRIOQUEUE          1
-#define WORKQ_LOW_PRIOQUEUE              2
-#define WORKQ_BG_PRIOQUEUE               3
-#define WORKQ_BG_PRIOQUEUE_CONDITIONAL   4
-#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
-
-#define WORKQ_NUM_PRIORITIES 6
-
 #define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1
 
 #define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
 
-void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
-void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
+void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
+void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
 
 #if defined(__linux__)
 #define HAVE_DISPATCH_WORKQ_MONITORING 1
diff --git a/src/queue.c b/src/queue.c
index 435ac96..4d506ef 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -37,6 +37,9 @@
 		!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
 #define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
 #endif
+#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
+#define DISPATCH_USE_WORKQ_PRIORITY 1
+#endif
 #if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
 		!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
 #define pthread_workqueue_t void*
@@ -158,7 +161,10 @@
 			int volatile dgq_pending;
 #if DISPATCH_USE_WORKQUEUES
 			qos_class_t dgq_qos;
-			int dgq_wq_priority, dgq_wq_options;
+#if DISPATCH_USE_WORKQ_PRIORITY
+			int dgq_wq_priority;
+#endif
+			int dgq_wq_options;
 #if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
 			pthread_workqueue_t dgq_kworkqueue;
 #endif
@@ -186,7 +192,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -197,7 +205,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_MAINTENANCE,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -208,7 +218,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -219,7 +231,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_BACKGROUND,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -230,7 +244,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -241,7 +257,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_UTILITY,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -252,7 +270,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -263,7 +283,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_DEFAULT,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -274,7 +296,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -285,7 +309,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_USER_INITIATED,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -296,7 +322,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
 		.dgq_wq_options = 0,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -307,7 +335,9 @@
 	[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
 #if DISPATCH_USE_WORKQUEUES
 		.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
+#if DISPATCH_USE_WORKQ_PRIORITY
 		.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
+#endif
 		.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
 #endif
 #if DISPATCH_ENABLE_THREAD_POOL
@@ -5809,7 +5839,7 @@
 	bool manager = (dq == &_dispatch_mgr_root_queue);
 	bool monitored = !(overcommit || manager);
 	if (monitored) {
-		_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
+		_dispatch_workq_worker_register(dq, qc->dgq_qos);
 	}
 #endif
 
@@ -5823,7 +5853,7 @@
 
 #if DISPATCH_USE_INTERNAL_WORKQUEUE
 	if (monitored) {
-		_dispatch_workq_worker_unregister(dq,  qc->dgq_wq_priority);
+		_dispatch_workq_worker_unregister(dq,  qc->dgq_qos);
 	}
 #endif
 	(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);