blob: 895edf840fa425d6ecf6a06c47b2b57667f4d1a3 [file] [log] [blame]
#include "test/jemalloc_test.h"
#include "jemalloc/internal/mpsc_queue.h"
typedef struct elem_s elem_t;
typedef ql_head(elem_t) elem_list_t;
typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
struct elem_s {
int thread;
int idx;
ql_elm(elem_t) link;
};
/* Include both proto and gen to make sure they match up. */
mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
elem_list_t);
mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
elem_list_t, link);
static void
init_elems_simple(elem_t *elems, int nelems, int thread) {
for (int i = 0; i < nelems; i++) {
elems[i].thread = thread;
elems[i].idx = i;
ql_elm_new(&elems[i], link);
}
}
static void
check_elems_simple(elem_list_t *list, int nelems, int thread) {
elem_t *elem;
int next_idx = 0;
ql_foreach(elem, list, link) {
expect_d_lt(next_idx, nelems, "Too many list items");
expect_d_eq(thread, elem->thread, "");
expect_d_eq(next_idx, elem->idx, "List out of order");
next_idx++;
}
}
TEST_BEGIN(test_simple) {
enum {NELEMS = 10};
elem_t elems[NELEMS];
elem_list_t list;
elem_mpsc_queue_t queue;
/* Pop empty queue onto empty list -> empty list */
ql_new(&list);
elem_mpsc_queue_new(&queue);
elem_mpsc_queue_pop_batch(&queue, &list);
expect_true(ql_empty(&list), "");
/* Pop empty queue onto nonempty list -> list unchanged */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS; i++) {
ql_tail_insert(&list, &elems[i], link);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
/* Pop nonempty queue onto empty list -> list takes queue contents */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[i]);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
/* Pop nonempty queue onto nonempty list -> list gains queue contents */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS / 2; i++) {
ql_tail_insert(&list, &elems[i], link);
}
for (int i = NELEMS / 2; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[i]);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
}
TEST_END
TEST_BEGIN(test_push_single_or_batch) {
enum {
BATCH_MAX = 10,
/*
* We'll push i items one-at-a-time, then i items as a batch,
* then i items as a batch again, as i ranges from 1 to
* BATCH_MAX. So we need 3 times the sum of the numbers from 1
* to BATCH_MAX elements total.
*/
NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
};
elem_t elems[NELEMS];
init_elems_simple(elems, NELEMS, 0);
elem_list_t list;
ql_new(&list);
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
int next_idx = 0;
for (int i = 1; i < 10; i++) {
/* Push i items 1 at a time. */
for (int j = 0; j < i; j++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Push i items in batch. */
for (int j = 0; j < i; j++) {
ql_tail_insert(&list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &list);
expect_true(ql_empty(&list), "Batch push should empty source");
/*
* Push i items in batch, again. This tests two batches
* proceeding one after the other.
*/
for (int j = 0; j < i; j++) {
ql_tail_insert(&list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &list);
expect_true(ql_empty(&list), "Batch push should empty source");
}
expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
expect_true(ql_empty(&list), "");
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
}
TEST_END
TEST_BEGIN(test_multi_op) {
enum {NELEMS = 20};
elem_t elems[NELEMS];
init_elems_simple(elems, NELEMS, 0);
elem_list_t push_list;
ql_new(&push_list);
elem_list_t result_list;
ql_new(&result_list);
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
int next_idx = 0;
/* Push first quarter 1-at-a-time. */
for (int i = 0; i < NELEMS / 4; i++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Push second quarter in batch. */
for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
ql_tail_insert(&push_list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &push_list);
/* Batch pop all pushed elements. */
elem_mpsc_queue_pop_batch(&queue, &result_list);
/* Push third quarter in batch. */
for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
ql_tail_insert(&push_list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &push_list);
/* Push last quarter one-at-a-time. */
for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Pop them again. Order of existing list should be preserved. */
elem_mpsc_queue_pop_batch(&queue, &result_list);
check_elems_simple(&result_list, NELEMS, 0);
}
TEST_END
typedef struct pusher_arg_s pusher_arg_t;
struct pusher_arg_s {
elem_mpsc_queue_t *queue;
int thread;
elem_t *elems;
int nelems;
};
typedef struct popper_arg_s popper_arg_t;
struct popper_arg_s {
elem_mpsc_queue_t *queue;
int npushers;
int nelems_per_pusher;
int *pusher_counts;
};
static void *
thd_pusher(void *void_arg) {
pusher_arg_t *arg = (pusher_arg_t *)void_arg;
int next_idx = 0;
while (next_idx < arg->nelems) {
/* Push 10 items in batch. */
elem_list_t list;
ql_new(&list);
int limit = next_idx + 10;
while (next_idx < arg->nelems && next_idx < limit) {
ql_tail_insert(&list, &arg->elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(arg->queue, &list);
/* Push 10 items one-at-a-time. */
limit = next_idx + 10;
while (next_idx < arg->nelems && next_idx < limit) {
elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
next_idx++;
}
}
return NULL;
}
static void *
thd_popper(void *void_arg) {
popper_arg_t *arg = (popper_arg_t *)void_arg;
int done_pushers = 0;
while (done_pushers < arg->npushers) {
elem_list_t list;
ql_new(&list);
elem_mpsc_queue_pop_batch(arg->queue, &list);
elem_t *elem;
ql_foreach(elem, &list, link) {
int thread = elem->thread;
int idx = elem->idx;
expect_d_eq(arg->pusher_counts[thread], idx,
"Thread's pushes reordered");
arg->pusher_counts[thread]++;
if (arg->pusher_counts[thread]
== arg->nelems_per_pusher) {
done_pushers++;
}
}
}
return NULL;
}
TEST_BEGIN(test_multiple_threads) {
enum {
NPUSHERS = 4,
NELEMS_PER_PUSHER = 1000*1000,
};
thd_t pushers[NPUSHERS];
pusher_arg_t pusher_arg[NPUSHERS];
thd_t popper;
popper_arg_t popper_arg;
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
elem_t *elem_iter = elems;
for (int i = 0; i < NPUSHERS; i++) {
pusher_arg[i].queue = &queue;
pusher_arg[i].thread = i;
pusher_arg[i].elems = elem_iter;
pusher_arg[i].nelems = NELEMS_PER_PUSHER;
init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
elem_iter += NELEMS_PER_PUSHER;
}
popper_arg.queue = &queue;
popper_arg.npushers = NPUSHERS;
popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
int pusher_counts[NPUSHERS] = {0};
popper_arg.pusher_counts = pusher_counts;
thd_create(&popper, thd_popper, (void *)&popper_arg);
for (int i = 0; i < NPUSHERS; i++) {
thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
}
thd_join(popper, NULL);
for (int i = 0; i < NPUSHERS; i++) {
thd_join(pushers[i], NULL);
}
for (int i = 0; i < NPUSHERS; i++) {
expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
}
free(elems);
}
TEST_END
int
main(void) {
return test_no_reentrancy(
test_simple,
test_push_single_or_batch,
test_multi_op,
test_multiple_threads);
}