blob: 667c847959744b1d624576526d830b1e26414ef1 [file] [log] [blame]
//===--- Concurrent.cpp - Concurrent data structure tests -----------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
#ifndef THREADING_HELPERS_H
#define THREADING_HELPERS_H
#include <thread>
// When true many of the threaded tests log activity to help triage issues.
static bool trace = false;
template <typename ThreadBody, typename AfterSpinRelease>
void threadedExecute(int threadCount, ThreadBody threadBody,
AfterSpinRelease afterSpinRelease) {
std::vector<std::thread> threads;
// Block the threads we are about to create.
std::atomic<bool> spinWait(true);
std::atomic<int> readyCount(0);
std::atomic<int> activeCount(0);
for (int i = 0; i < threadCount; ++i) {
threads.push_back(std::thread([&, i] {
readyCount++;
while (spinWait) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
activeCount++;
threadBody(i);
}));
}
while (readyCount < threadCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// Allow our threads to fight for the lock.
spinWait = false;
while (activeCount < threadCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
afterSpinRelease();
// Wait until all of our threads have finished.
for (auto &thread : threads) {
thread.join();
}
}
template <typename ThreadBody>
void threadedExecute(int threadCount, ThreadBody threadBody) {
threadedExecute(threadCount, threadBody, [] {});
}
template <typename M, typename C, typename ConsumerBody, typename ProducerBody>
void threadedExecute(M &mutex, C &condition, bool &doneCondition,
ConsumerBody consumerBody, ProducerBody producerBody) {
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
// Block the threads we are about to create.
std::atomic<bool> spinWait(true);
for (int i = 1; i <= 8; ++i) {
consumers.push_back(std::thread([&, i] {
while (spinWait) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
consumerBody(i);
if (trace)
printf("### Consumer[%d] thread exiting.\n", i);
}));
}
for (int i = 1; i <= 5; ++i) {
producers.push_back(std::thread([&, i] {
while (spinWait) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
producerBody(i);
if (trace)
printf("### Producer[%d] thread exiting.\n", i);
}));
}
// Poor mans attempt to get as many threads ready as possible before
// dropping spinWait, it doesn't have to be perfect.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// Allow our threads to fight for the lock.
spinWait = false;
// Wait until all of our producer threads have finished.
for (auto &thread : producers) {
thread.join();
}
// Inform consumers that producers are done.
mutex.withLockThenNotifyAll(condition, [&] {
if (trace)
printf("### Informing consumers we are done.\n");
doneCondition = true;
});
// Wait for consumers to finish.
for (auto &thread : consumers) {
thread.join();
}
}
#endif