blob: 73362a58afbffec6b2283f69dfea1358ada59ee4 [file] [log] [blame]
/*
* Copyright (c) 2017-2017 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @APPLE_APACHE_LICENSE_HEADER_END@
*/
#include "internal.h"
#if DISPATCH_USE_INTERNAL_WORKQUEUE
/*
* dispatch_workq monitors the thread pool that is
* executing the work enqueued on libdispatch's pthread
* root queues and dynamically adjusts its size.
*
* The dynamic monitoring could be implemented using either
* (a) low-frequency user-level approximation of the number of runnable
* worker threads via reading the /proc file system
* (b) a Linux kernel extension that hooks the process change handler
* to accurately track the number of runnable normal worker threads
* This file provides an implementation of option (a).
*
* Using either form of monitoring, if (i) there appears to be
* work available in the monitored pthread root queue, (ii) the
* number of runnable workers is below the target size for the pool,
* and (iii) the total number of worker threads is below an upper limit,
* then an additional worker thread will be added to the pool.
*/
#pragma mark static data for monitoring subsystem
/*
* State for the user-level monitoring of a workqueue.
*/
typedef struct dispatch_workq_monitor_s {
/* The dispatch_queue we are monitoring */
dispatch_queue_t dq;
/* The observed number of runnable worker threads */
int32_t num_runnable;
/* The desired number of runnable worker threads */
int32_t target_runnable;
/*
* Tracking of registered workers; all accesses must hold lock.
* Invariant: registered_tids[0]...registered_tids[num_registered_tids-1]
* contain the dispatch_tids of the worker threads we are monitoring.
*/
dispatch_unfair_lock_s registered_tid_lock;
dispatch_tid *registered_tids;
int num_registered_tids;
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
#pragma mark Implementation of the monitoring subsystem.
#define WORKQ_MAX_TRACKED_TIDS DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define WORKQ_OVERSUBSCRIBE_FACTOR 2
static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
static dispatch_once_t _dispatch_workq_init_once_pred;
void
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
{
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_tid_self();
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1);
int worker_id = mon->num_registered_tids++;
mon->registered_tids[worker_id] = tid;
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
void
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
dispatch_assert(mon->dq == root_q);
dispatch_tid tid = _dispatch_tid_self();
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
if (mon->registered_tids[i] == tid) {
int last = mon->num_registered_tids - 1;
mon->registered_tids[i] = mon->registered_tids[last];
mon->registered_tids[last] = 0;
mon->num_registered_tids--;
break;
}
}
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
#if HAVE_DISPATCH_WORKQ_MONITORING
#if defined(__linux__)
/*
* For each pid that is a registered worker, read /proc/[pid]/stat
* to get a count of the number of them that are actually runnable.
* See the proc(5) man page for the format of the contents of /proc/[pid]/stat
*/
static void
_dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
{
char path[128];
char buf[4096];
int running_count = 0;
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
for (int i = 0; i < mon->num_registered_tids; i++) {
dispatch_tid tid = mon->registered_tids[i];
int fd;
ssize_t bytes_read = -1;
int r = snprintf(path, sizeof(path), "/proc/%d/stat", tid);
dispatch_assert(r > 0 && r < (int)sizeof(path));
fd = open(path, O_RDONLY | O_NONBLOCK);
if (unlikely(fd == -1)) {
DISPATCH_CLIENT_CRASH(tid,
"workq: registered worker exited prematurely");
} else {
bytes_read = read(fd, buf, sizeof(buf)-1);
(void)close(fd);
}
if (bytes_read > 0) {
buf[bytes_read] = '\0';
char state;
if (sscanf(buf, "%*d %*s %c", &state) == 1) {
// _dispatch_debug("workq: Worker %d, state %c\n", tid, state);
if (state == 'R') {
running_count++;
}
} else {
_dispatch_debug("workq: sscanf of state failed for %d", tid);
}
} else {
_dispatch_debug("workq: Failed to read %s", path);
}
}
mon->num_runnable = running_count;
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
}
#else
#error must define _dispatch_workq_count_runnable_workers
#endif
static void
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
{
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * (int)dispatch_hw_config(active_cpus);
int global_runnable = 0;
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
dispatch_queue_t dq = mon->dq;
if (!_dispatch_queue_class_probe(dq)) {
_dispatch_debug("workq: %s is empty.", dq->dq_label);
continue;
}
_dispatch_workq_count_runnable_workers(mon);
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
dq->dq_label, mon->num_runnable, mon->target_runnable);
global_runnable += mon->num_runnable;
if (mon->num_runnable == 0) {
// We have work, but no worker is runnable.
// It is likely the program is stalled. Therefore treat
// this as if dq were an overcommit queue and call poke
// with the limit being the maximum number of workers for dq.
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
global_runnable += 1; // account for poke in global estimate
} else if (mon->num_runnable < mon->target_runnable &&
global_runnable < global_soft_max) {
// We are below target, but some workers are still runnable.
// We want to oversubscribe to hit the desired load target.
// However, this under-utilization may be transitory so set the
// floor as a small multiple of threads per core.
int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable;
int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
floor = MAX(floor, floor2);
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
dq->dq_label, floor);
_dispatch_global_queue_poke(dq, 1, floor);
global_runnable += 1; // account for poke in global estimate
}
}
}
#endif // HAVE_DISPATCH_WORKQ_MONITORING
static void
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
{
#if HAVE_DISPATCH_WORKQ_MONITORING
int target_runnable = (int)dispatch_hw_config(active_cpus);
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
mon->dq = _dispatch_get_root_queue(i, false);
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
mon->registered_tids = buf;
mon->target_runnable = target_runnable;
}
// Create monitoring timer that will periodically run on dispatch_mgr_q
dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,
0, 0, &_dispatch_mgr_q);
dispatch_source_set_timer(ds, dispatch_time(DISPATCH_TIME_NOW, 0),
NSEC_PER_SEC, 0);
dispatch_source_set_event_handler_f(ds, _dispatch_workq_monitor_pools);
dispatch_set_context(ds, ds); // avoid appearing as leaked
dispatch_activate(ds);
#endif // HAVE_DISPATCH_WORKQ_MONITORING
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE