blob: 74b6c0c8f5b69beb14dc2957f2d3dc6ebf3401b2 [file] [log] [blame]
/*
* Copyright (c) 2010-2011 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @APPLE_APACHE_LICENSE_HEADER_END@
*/
#include <dispatch/dispatch.h>
#include <dispatch/private.h>
#include <stdlib.h>
#include <stdio.h>
#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
#include <sys/types.h>
#include <unistd.h>
#ifdef __ANDROID__
#include <linux/sysctl.h>
#else
#include <sys/sysctl.h>
#endif /* __ANDROID__ */
#endif
#include <bsdtests.h>
#include "dispatch_test.h"
static volatile size_t done, concur;
static int use_group_async;
static uint32_t activecpu;
static uint32_t min_acceptable_concurrency;
static dispatch_queue_t q;
static dispatch_group_t g, gw;
const size_t workers = 4;
static void
nop(void* ctxt __attribute__((unused)))
{
return;
}
static void
work(void* ctxt __attribute__((unused)))
{
usleep(1000);
__sync_add_and_fetch(&done, 1);
if (!use_group_async) dispatch_group_leave(gw);
}
static void
submit_work(void* ctxt)
{
size_t c = __sync_add_and_fetch(&concur, 1), *m = (size_t *)ctxt, i;
if (c > *m) *m = c;
for (i = 0; i < workers; ++i) {
if (use_group_async) {
dispatch_group_async_f(gw, q, NULL, work);
} else {
dispatch_group_enter(gw);
dispatch_async_f(q, NULL, work);
}
}
usleep(10000);
__sync_sub_and_fetch(&concur, 1);
if (!use_group_async) dispatch_group_leave(g);
}
static void
test_concur_async(size_t n, size_t qw)
{
size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
done = concur = 0;
dispatch_suspend(q);
for (i = 0, mc = mcs; i < n; i++, mc++) {
if (use_group_async) {
dispatch_group_async_f(g, q, mc, submit_work);
} else {
dispatch_group_enter(g);
dispatch_async_f(q, mc, submit_work);
}
}
dispatch_resume(q);
dispatch_group_wait(g, DISPATCH_TIME_FOREVER);
if (qw > 1) {
size_t concurrency = MIN(n * workers, qw);
if (done > min_acceptable_concurrency) {
test_sizet_less_than_or_equal("concurrently completed workers", done, concurrency);
} else {
test_sizet("concurrently completed workers", done, concurrency);
}
} else {
test_sizet_less_than_or_equal("concurrently completed workers", done, 1);
}
for (i = 0, mc = mcs; i < n; i++, mc++) {
if (*mc > max_concur) max_concur = *mc;
}
free(mcs);
size_t expect = MIN(n, qw);
if (max_concur > min_acceptable_concurrency) {
test_sizet_less_than_or_equal("max submission concurrency", max_concur, expect);
} else {
test_sizet("max submission concurrency", max_concur, expect);
}
dispatch_group_wait(gw, DISPATCH_TIME_FOREVER);
usleep(1000);
}
static void
sync_work(void* ctxt)
{
size_t c = __sync_add_and_fetch(&concur, 1), *m = (size_t *)ctxt;
if (c > *m) *m = c;
usleep(10000);
__sync_sub_and_fetch(&concur, 1);
}
static void
test_concur_sync(size_t n, size_t qw)
{
size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
concur = 0;
for (i = 0, mc = mcs; i < n; i++, mc++) {
dispatch_group_async(g,
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
DISPATCH_QUEUE_OVERCOMMIT), ^{
usleep(100000);
dispatch_sync_f(q, mc, sync_work);
});
}
dispatch_group_wait(g, DISPATCH_TIME_FOREVER);
for (i = 0, mc = mcs; i < n; i++, mc++) {
if (*mc > max_concur) max_concur = *mc;
}
free(mcs);
size_t expect = qw == 1 ? 1 : n;
if (max_concur > min_acceptable_concurrency) {
test_sizet_less_than_or_equal("max sync concurrency", max_concur, expect);
} else {
test_sizet("max sync concurrency", max_concur, expect);
}
}
static void
apply_work(void* ctxt, size_t i)
{
size_t c = __sync_add_and_fetch(&concur, 1), *m = ((size_t *)ctxt) + i;
if (c > *m) *m = c;
usleep(100000);
__sync_sub_and_fetch(&concur, 1);
}
static void
test_concur_apply(size_t n, size_t qw)
{
size_t i, max_concur = 0, *mcs = calloc(n, sizeof(size_t)), *mc;
concur = 0;
dispatch_apply_f(n, q, mcs, apply_work);
for (i = 0, mc = mcs; i < n; i++, mc++) {
if (*mc > max_concur) max_concur = *mc;
}
free(mcs);
size_t expect = MIN(n, qw);
if (max_concur > min_acceptable_concurrency) {
test_sizet_less_than_or_equal("max apply concurrency", max_concur, expect);
} else {
test_sizet("max apply concurrency", max_concur, expect);
}
}
static dispatch_queue_t
create_queue(long width, dispatch_queue_t tq, long *qw, const char **ql)
{
if (!width) {
*qw = LONG_MAX;
*ql = "global";
return dispatch_get_global_queue(0, 0);
};
dispatch_queue_t queue;
dispatch_queue_attr_t qattr = NULL;
*qw = width;
*ql = width < LONG_MAX ? ( width == 1 ? "serial": "wide" ) : "concurrent";
#if DISPATCH_API_VERSION >= 20100518 // <rdar://problem/7790099>
qattr = width < LONG_MAX ? NULL : DISPATCH_QUEUE_CONCURRENT;
#endif
queue = dispatch_queue_create(*ql, qattr);
if (!qattr) {
dispatch_queue_set_width(queue, width);
}
if (tq) {
dispatch_set_target_queue(queue, tq);
}
if (!qattr || tq) {
dispatch_barrier_sync_f(queue, NULL, nop); // wait for changes to take effect
}
return queue;
}
int
main(int argc __attribute__((unused)), char* argv[] __attribute__((unused)))
{
dispatch_test_start("Dispatch Private Concurrent/Wide Queue"); // <rdar://problem/8049506&8169448&8186485>
#ifdef __linux__
activecpu = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN);
#elif defined(_WIN32)
SYSTEM_INFO si;
GetSystemInfo(&si);
activecpu = si.dwNumberOfProcessors;
#else
size_t s = sizeof(activecpu);
sysctlbyname("hw.activecpu", &activecpu, &s, NULL, 0);
#endif
size_t n = activecpu / 2 > 1 ? activecpu / 2 : 1, w = activecpu * 2;
min_acceptable_concurrency = (uint32_t)n;
dispatch_queue_t tq, ttq;
long qw, tqw, ttqw;
const char *ql, *tql, *ttql;
size_t qi, tqi, ttqi;
long qws[] = {
0, LONG_MAX, (long)w, 1, // 0 <=> global queue
};
g = dispatch_group_create();
gw = dispatch_group_create();
for (ttqi = 0; ttqi < sizeof(qws)/sizeof(*qws); ttqi++) {
ttq = create_queue(qws[ttqi], NULL, &ttqw, &ttql);
for (tqi = 0; tqi < sizeof(qws)/sizeof(*qws); tqi++) {
if (!qws[tqi] && qws[ttqi]) continue;
tq = create_queue(qws[tqi], ttq, &tqw, &tql);
for (qi = 0; qi < sizeof(qws)/sizeof(*qws); qi++) {
if (!qws[qi] && qws[tqi]) continue;
q = create_queue(qws[qi], tq, &qw, &ql);
for (use_group_async = 0; use_group_async < 2; use_group_async++) {
fprintf(stdout, "Testing dispatch%s_async on "
"queue hierarchy: %s -> %s -> %s\n",
use_group_async ? "_group" : "", ql, tql, ttql);
fflush(stdout);
test_concur_async(n, (size_t)MIN(qw, MIN(tqw, ttqw)));
}
fprintf(stdout, "Testing dispatch_sync on "
"queue hierarchy: %s -> %s -> %s\n", ql, tql, ttql);
fflush(stdout);
test_concur_sync(w, (size_t)MIN(qw, MIN(tqw, ttqw)));
fprintf(stdout, "Testing dispatch_apply on "
"queue hierarchy: %s -> %s -> %s\n", ql, tql, ttql);
fflush(stdout);
test_concur_apply(activecpu, (size_t)MIN(qw, MIN(tqw, ttqw)));
dispatch_release(q);
}
dispatch_release(tq);
}
dispatch_release(ttq);
}
dispatch_release(g);
dispatch_release(gw);
test_stop();
return 0;
}