cmWorkerPool: Factor our worker thread class (internals)
This moves the `cmWorkerPoolInternal::WorkerT` class to `cmWorkerPoolWorker`
and changes the thread start interface to make it independent of the
`cmWorkerPoolInternal` type.
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx
index 75ca47a..cbf070e 100644
--- a/Source/cmWorkerPool.cxx
+++ b/Source/cmWorkerPool.cxx
@@ -371,87 +371,143 @@
}
/**
+ * @brief Worker pool worker thread
+ */
+class cmWorkerPoolWorker
+{
+public:
+ cmWorkerPoolWorker(uv_loop_t& uvLoop);
+ ~cmWorkerPoolWorker();
+
+ cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
+ cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
+
+ /**
+ * Set the internal thread
+ */
+ void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
+
+ /**
+ * Run an external process
+ */
+ bool RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory);
+
+private:
+ // -- Libuv callbacks
+ static void UVProcessStart(uv_async_t* handle);
+ void UVProcessFinished();
+
+private:
+ // -- Process management
+ struct
+ {
+ std::mutex Mutex;
+ cm::uv_async_ptr Request;
+ std::condition_variable Condition;
+ std::unique_ptr<cmUVReadOnlyProcess> ROP;
+ } Proc_;
+ // -- System thread
+ std::thread Thread_;
+};
+
+cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
+{
+ Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
+}
+
+cmWorkerPoolWorker::~cmWorkerPoolWorker()
+{
+ if (Thread_.joinable()) {
+ Thread_.join();
+ }
+}
+
+bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
+ std::vector<std::string> const& command,
+ std::string const& workingDirectory)
+{
+ if (command.empty()) {
+ return false;
+ }
+ // Create process instance
+ {
+ std::lock_guard<std::mutex> lock(Proc_.Mutex);
+ Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
+ Proc_.ROP->setup(&result, true, command, workingDirectory);
+ }
+ // Send asynchronous process start request to libuv loop
+ Proc_.Request.send();
+ // Wait until the process has been finished and destroyed
+ {
+ std::unique_lock<std::mutex> ulock(Proc_.Mutex);
+ while (Proc_.ROP) {
+ Proc_.Condition.wait(ulock);
+ }
+ }
+ return !result.error();
+}
+
+void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
+{
+ auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
+ bool startFailed = false;
+ {
+ auto& Proc = wrk->Proc_;
+ std::lock_guard<std::mutex> lock(Proc.Mutex);
+ if (Proc.ROP && !Proc.ROP->IsStarted()) {
+ startFailed =
+ !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
+ }
+ }
+ // Clean up if starting of the process failed
+ if (startFailed) {
+ wrk->UVProcessFinished();
+ }
+}
+
+void cmWorkerPoolWorker::UVProcessFinished()
+{
+ {
+ std::lock_guard<std::mutex> lock(Proc_.Mutex);
+ if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
+ Proc_.ROP.reset();
+ }
+ }
+ // Notify idling thread
+ Proc_.Condition.notify_one();
+}
+
+/**
* @brief Private worker pool internals
*/
class cmWorkerPoolInternal
{
public:
- // -- Types
-
- /**
- * @brief Worker thread
- */
- class WorkerT
- {
- public:
- WorkerT(unsigned int index);
- ~WorkerT();
-
- WorkerT(WorkerT const&) = delete;
- WorkerT& operator=(WorkerT const&) = delete;
-
- /**
- * Start the thread
- */
- void Start(cmWorkerPoolInternal* internal);
-
- /**
- * @brief Run an external process
- */
- bool RunProcess(cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory);
-
- // -- Accessors
- unsigned int Index() const { return Index_; }
- cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; }
-
- private:
- // -- Libuv callbacks
- static void UVProcessStart(uv_async_t* handle);
- void UVProcessFinished();
-
- private:
- //! @brief Job handle
- cmWorkerPool::JobHandleT JobHandle_;
- //! @brief Worker index
- unsigned int Index_;
- // -- Process management
- struct
- {
- std::mutex Mutex;
- cm::uv_async_ptr Request;
- std::condition_variable Condition;
- std::unique_ptr<cmUVReadOnlyProcess> ROP;
- } Proc_;
- // -- System thread
- std::thread Thread_;
- };
-
-public:
// -- Constructors
cmWorkerPoolInternal(cmWorkerPool* pool);
~cmWorkerPoolInternal();
/**
- * @brief Runs the libuv loop
+ * Runs the libuv loop.
*/
bool Process();
/**
- * @brief Clear queue and abort threads
+ * Clear queue and abort threads.
*/
void Abort();
/**
- * @brief Push a job to the queue and notify a worker
+ * Push a job to the queue and notify a worker.
*/
bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
/**
- * @brief Worker thread main loop method
+ * Worker thread main loop method.
*/
- void Work(WorkerT* worker);
+ void Work(unsigned int workerIndex);
// -- Request slots
static void UVSlotBegin(uv_async_t* handle);
@@ -476,85 +532,12 @@
unsigned int JobsProcessing = 0;
std::deque<cmWorkerPool::JobHandleT> Queue;
std::condition_variable Condition;
- std::vector<std::unique_ptr<WorkerT>> Workers;
+ std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
// -- References
cmWorkerPool* Pool = nullptr;
};
-cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index)
- : Index_(index)
-{
-}
-
-cmWorkerPoolInternal::WorkerT::~WorkerT()
-{
- if (Thread_.joinable()) {
- Thread_.join();
- }
-}
-
-void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal)
-{
- Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this);
- Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this);
-}
-
-bool cmWorkerPoolInternal::WorkerT::RunProcess(
- cmWorkerPool::ProcessResultT& result,
- std::vector<std::string> const& command, std::string const& workingDirectory)
-{
- if (command.empty()) {
- return false;
- }
- // Create process instance
- {
- std::lock_guard<std::mutex> lock(Proc_.Mutex);
- Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
- Proc_.ROP->setup(&result, true, command, workingDirectory);
- }
- // Send asynchronous process start request to libuv loop
- Proc_.Request.send();
- // Wait until the process has been finished and destroyed
- {
- std::unique_lock<std::mutex> ulock(Proc_.Mutex);
- while (Proc_.ROP) {
- Proc_.Condition.wait(ulock);
- }
- }
- return !result.error();
-}
-
-void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
-{
- auto* wrk = reinterpret_cast<WorkerT*>(handle->data);
- bool startFailed = false;
- {
- auto& Proc = wrk->Proc_;
- std::lock_guard<std::mutex> lock(Proc.Mutex);
- if (Proc.ROP && !Proc.ROP->IsStarted()) {
- startFailed =
- !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
- }
- }
- // Clean up if starting of the process failed
- if (startFailed) {
- wrk->UVProcessFinished();
- }
-}
-
-void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
-{
- {
- std::lock_guard<std::mutex> lock(Proc_.Mutex);
- if (Proc_.ROP && (Proc_.ROP->IsFinished() || !Proc_.ROP->IsStarted())) {
- Proc_.ROP.reset();
- }
- }
- // Notify idling thread
- Proc_.Condition.notify_one();
-}
-
void cmWorkerPool::ProcessResultT::reset()
{
ExitStatus = 0;
@@ -652,11 +635,13 @@
// Create workers
gint.Workers.reserve(num);
for (unsigned int ii = 0; ii != num; ++ii) {
- gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii));
+ gint.Workers.emplace_back(
+ cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
}
- // Start workers
- for (auto& wrk : gint.Workers) {
- wrk->Start(&gint);
+ // Start worker threads
+ for (unsigned int ii = 0; ii != num; ++ii) {
+ gint.Workers[ii]->SetThread(
+ std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
}
}
// Destroy begin request
@@ -672,8 +657,9 @@
gint.UVRequestEnd.reset();
}
-void cmWorkerPoolInternal::Work(WorkerT* worker)
+void cmWorkerPoolInternal::Work(unsigned int workerIndex)
{
+ cmWorkerPool::JobHandleT jobHandle;
std::unique_lock<std::mutex> uLock(Mutex);
// Increment running workers count
++WorkersRunning;
@@ -702,15 +688,15 @@
}
// Pop next job from queue
- worker->JobHandle() = std::move(Queue.front());
+ jobHandle = std::move(Queue.front());
Queue.pop_front();
// Unlocked scope for job processing
++JobsProcessing;
{
uLock.unlock();
- worker->JobHandle()->Work(Pool, worker->Index()); // Process job
- worker->JobHandle().reset(); // Destroy job
+ jobHandle->Work(Pool, workerIndex); // Process job
+ jobHandle.reset(); // Destroy job
uLock.lock();
}
--JobsProcessing;