Merge topic 'cmWorkerPool_Tweaks'

56890ede2a cmWorkerPool: Factor our worker thread class (internals)
9794b72d38 cmWorkerPool: Set worker thread count separately to Process()

Acked-by: Kitware Robot <kwrobot@kitware.com>
Merge-request: !3260
diff --git a/Source/cmQtAutoMocUic.cxx b/Source/cmQtAutoMocUic.cxx
index 75c5d8a..005c27d 100644
--- a/Source/cmQtAutoMocUic.cxx
+++ b/Source/cmQtAutoMocUic.cxx
@@ -1186,6 +1186,7 @@
       num = std::min<unsigned long>(num, ParallelMax);
       Base_.NumThreads = static_cast<unsigned int>(num);
     }
+    WorkerPool_.SetThreadCount(Base_.NumThreads);
   }
 
   // - Files and directories
@@ -1482,15 +1483,12 @@
   if (!CreateDirectories()) {
     return false;
   }
-
-  if (!WorkerPool_.Process(Base().NumThreads, this)) {
+  if (!WorkerPool_.Process(this)) {
     return false;
   }
-
   if (JobError_) {
     return false;
   }
-
   return SettingsFileWrite();
 }
 
diff --git a/Source/cmWorkerPool.cxx b/Source/cmWorkerPool.cxx
index 464182c..cbf070e 100644
--- a/Source/cmWorkerPool.cxx
+++ b/Source/cmWorkerPool.cxx
@@ -371,137 +371,62 @@
 }
 
 /**
- * @brief Private worker pool internals
+ * @brief Worker pool worker thread
  */
-class cmWorkerPoolInternal
+class cmWorkerPoolWorker
 {
 public:
-  // -- Types
+  cmWorkerPoolWorker(uv_loop_t& uvLoop);
+  ~cmWorkerPoolWorker();
+
+  cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
+  cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
 
   /**
-   * @brief Worker thread
+   * Set the internal thread
    */
-  class WorkerT
+  void SetThread(std::thread&& aThread) { Thread_ = std::move(aThread); }
+
+  /**
+   * Run an external process
+   */
+  bool RunProcess(cmWorkerPool::ProcessResultT& result,
+                  std::vector<std::string> const& command,
+                  std::string const& workingDirectory);
+
+private:
+  // -- Libuv callbacks
+  static void UVProcessStart(uv_async_t* handle);
+  void UVProcessFinished();
+
+private:
+  // -- Process management
+  struct
   {
-  public:
-    WorkerT(unsigned int index);
-    ~WorkerT();
-
-    WorkerT(WorkerT const&) = delete;
-    WorkerT& operator=(WorkerT const&) = delete;
-
-    /**
-     * Start the thread
-     */
-    void Start(cmWorkerPoolInternal* internal);
-
-    /**
-     * @brief Run an external process
-     */
-    bool RunProcess(cmWorkerPool::ProcessResultT& result,
-                    std::vector<std::string> const& command,
-                    std::string const& workingDirectory);
-
-    // -- Accessors
-    unsigned int Index() const { return Index_; }
-    cmWorkerPool::JobHandleT& JobHandle() { return JobHandle_; }
-
-  private:
-    // -- Libuv callbacks
-    static void UVProcessStart(uv_async_t* handle);
-    void UVProcessFinished();
-
-  private:
-    //! @brief Job handle
-    cmWorkerPool::JobHandleT JobHandle_;
-    //! @brief Worker index
-    unsigned int Index_;
-    // -- Process management
-    struct
-    {
-      std::mutex Mutex;
-      cm::uv_async_ptr Request;
-      std::condition_variable Condition;
-      std::unique_ptr<cmUVReadOnlyProcess> ROP;
-    } Proc_;
-    // -- System thread
-    std::thread Thread_;
-  };
-
-public:
-  // -- Constructors
-  cmWorkerPoolInternal(cmWorkerPool* pool);
-  ~cmWorkerPoolInternal();
-
-  /**
-   * @brief Runs the libuv loop
-   */
-  bool Process();
-
-  /**
-   * @brief Clear queue and abort threads
-   */
-  void Abort();
-
-  /**
-   * @brief Push a job to the queue and notify a worker
-   */
-  bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
-
-  /**
-   * @brief Worker thread main loop method
-   */
-  void Work(WorkerT* worker);
-
-  // -- Request slots
-  static void UVSlotBegin(uv_async_t* handle);
-  static void UVSlotEnd(uv_async_t* handle);
-
-public:
-  // -- UV loop
-#ifdef CMAKE_UV_SIGNAL_HACK
-  std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
-#endif
-  std::unique_ptr<uv_loop_t> UVLoop;
-  cm::uv_async_ptr UVRequestBegin;
-  cm::uv_async_ptr UVRequestEnd;
-
-  // -- Thread pool and job queue
-  std::mutex Mutex;
-  bool Aborting = false;
-  bool FenceProcessing = false;
-  unsigned int WorkersRunning = 0;
-  unsigned int WorkersIdle = 0;
-  unsigned int JobsProcessing = 0;
-  std::deque<cmWorkerPool::JobHandleT> Queue;
-  std::condition_variable Condition;
-  std::vector<std::unique_ptr<WorkerT>> Workers;
-
-  // -- References
-  cmWorkerPool* Pool = nullptr;
+    std::mutex Mutex;
+    cm::uv_async_ptr Request;
+    std::condition_variable Condition;
+    std::unique_ptr<cmUVReadOnlyProcess> ROP;
+  } Proc_;
+  // -- System thread
+  std::thread Thread_;
 };
 
-cmWorkerPoolInternal::WorkerT::WorkerT(unsigned int index)
-  : Index_(index)
+cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
 {
+  Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
 }
 
-cmWorkerPoolInternal::WorkerT::~WorkerT()
+cmWorkerPoolWorker::~cmWorkerPoolWorker()
 {
   if (Thread_.joinable()) {
     Thread_.join();
   }
 }
 
-void cmWorkerPoolInternal::WorkerT::Start(cmWorkerPoolInternal* internal)
-{
-  Proc_.Request.init(*(internal->UVLoop), &WorkerT::UVProcessStart, this);
-  Thread_ = std::thread(&cmWorkerPoolInternal::Work, internal, this);
-}
-
-bool cmWorkerPoolInternal::WorkerT::RunProcess(
-  cmWorkerPool::ProcessResultT& result,
-  std::vector<std::string> const& command, std::string const& workingDirectory)
+bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
+                                    std::vector<std::string> const& command,
+                                    std::string const& workingDirectory)
 {
   if (command.empty()) {
     return false;
@@ -524,9 +449,9 @@
   return !result.error();
 }
 
-void cmWorkerPoolInternal::WorkerT::UVProcessStart(uv_async_t* handle)
+void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
 {
-  auto* wrk = reinterpret_cast<WorkerT*>(handle->data);
+  auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
   bool startFailed = false;
   {
     auto& Proc = wrk->Proc_;
@@ -542,7 +467,7 @@
   }
 }
 
-void cmWorkerPoolInternal::WorkerT::UVProcessFinished()
+void cmWorkerPoolWorker::UVProcessFinished()
 {
   {
     std::lock_guard<std::mutex> lock(Proc_.Mutex);
@@ -554,6 +479,65 @@
   Proc_.Condition.notify_one();
 }
 
+/**
+ * @brief Private worker pool internals
+ */
+class cmWorkerPoolInternal
+{
+public:
+  // -- Constructors
+  cmWorkerPoolInternal(cmWorkerPool* pool);
+  ~cmWorkerPoolInternal();
+
+  /**
+   * Runs the libuv loop.
+   */
+  bool Process();
+
+  /**
+   * Clear queue and abort threads.
+   */
+  void Abort();
+
+  /**
+   * Push a job to the queue and notify a worker.
+   */
+  bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
+
+  /**
+   * Worker thread main loop method.
+   */
+  void Work(unsigned int workerIndex);
+
+  // -- Request slots
+  static void UVSlotBegin(uv_async_t* handle);
+  static void UVSlotEnd(uv_async_t* handle);
+
+public:
+  // -- UV loop
+#ifdef CMAKE_UV_SIGNAL_HACK
+  std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
+#endif
+  std::unique_ptr<uv_loop_t> UVLoop;
+  cm::uv_async_ptr UVRequestBegin;
+  cm::uv_async_ptr UVRequestEnd;
+
+  // -- Thread pool and job queue
+  std::mutex Mutex;
+  bool Processing = false;
+  bool Aborting = false;
+  bool FenceProcessing = false;
+  unsigned int WorkersRunning = 0;
+  unsigned int WorkersIdle = 0;
+  unsigned int JobsProcessing = 0;
+  std::deque<cmWorkerPool::JobHandleT> Queue;
+  std::condition_variable Condition;
+  std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
+
+  // -- References
+  cmWorkerPool* Pool = nullptr;
+};
+
 void cmWorkerPool::ProcessResultT::reset()
 {
   ExitStatus = 0;
@@ -591,7 +575,8 @@
 
 bool cmWorkerPoolInternal::Process()
 {
-  // Reset state
+  // Reset state flags
+  Processing = true;
   Aborting = false;
   // Initialize libuv asynchronous request
   UVRequestBegin.init(*UVLoop, &cmWorkerPoolInternal::UVSlotBegin, this);
@@ -599,23 +584,27 @@
   // Send begin request
   UVRequestBegin.send();
   // Run libuv loop
-  return (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
+  bool success = (uv_run(UVLoop.get(), UV_RUN_DEFAULT) == 0);
+  // Update state flags
+  Processing = false;
+  Aborting = false;
+  return success;
 }
 
 void cmWorkerPoolInternal::Abort()
 {
-  bool firstCall = false;
+  bool notifyThreads = false;
   // Clear all jobs and set abort flag
   {
     std::lock_guard<std::mutex> guard(Mutex);
-    if (!Aborting) {
+    if (Processing && !Aborting) {
       // Register abort and clear queue
       Aborting = true;
       Queue.clear();
-      firstCall = true;
+      notifyThreads = true;
     }
   }
-  if (firstCall) {
+  if (notifyThreads) {
     // Wake threads
     Condition.notify_all();
   }
@@ -627,15 +616,13 @@
   if (Aborting) {
     return false;
   }
-
   // Append the job to the queue
   Queue.emplace_back(std::move(jobHandle));
-
   // Notify an idle worker if there's one
   if (WorkersIdle != 0) {
     Condition.notify_one();
   }
-
+  // Return success
   return true;
 }
 
@@ -648,11 +635,13 @@
     // Create workers
     gint.Workers.reserve(num);
     for (unsigned int ii = 0; ii != num; ++ii) {
-      gint.Workers.emplace_back(cm::make_unique<WorkerT>(ii));
+      gint.Workers.emplace_back(
+        cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
     }
-    // Start workers
-    for (auto& wrk : gint.Workers) {
-      wrk->Start(&gint);
+    // Start worker threads
+    for (unsigned int ii = 0; ii != num; ++ii) {
+      gint.Workers[ii]->SetThread(
+        std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
     }
   }
   // Destroy begin request
@@ -668,8 +657,9 @@
   gint.UVRequestEnd.reset();
 }
 
-void cmWorkerPoolInternal::Work(WorkerT* worker)
+void cmWorkerPoolInternal::Work(unsigned int workerIndex)
 {
+  cmWorkerPool::JobHandleT jobHandle;
   std::unique_lock<std::mutex> uLock(Mutex);
   // Increment running workers count
   ++WorkersRunning;
@@ -698,15 +688,15 @@
     }
 
     // Pop next job from queue
-    worker->JobHandle() = std::move(Queue.front());
+    jobHandle = std::move(Queue.front());
     Queue.pop_front();
 
     // Unlocked scope for job processing
     ++JobsProcessing;
     {
       uLock.unlock();
-      worker->JobHandle()->Work(Pool, worker->Index()); // Process job
-      worker->JobHandle().reset();                      // Destroy job
+      jobHandle->Work(Pool, workerIndex); // Process job
+      jobHandle.reset();                  // Destroy job
       uLock.lock();
     }
     --JobsProcessing;
@@ -743,19 +733,22 @@
 
 cmWorkerPool::~cmWorkerPool() = default;
 
-bool cmWorkerPool::Process(unsigned int threadCount, void* userData)
+void cmWorkerPool::SetThreadCount(unsigned int threadCount)
+{
+  if (!Int_->Processing) {
+    ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
+  }
+}
+
+bool cmWorkerPool::Process(void* userData)
 {
   // Setup user data
   UserData_ = userData;
-  ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
-
   // Run libuv loop
   bool success = Int_->Process();
-
   // Clear user data
   UserData_ = nullptr;
-  ThreadCount_ = 0;
-
+  // Return
   return success;
 }
 
diff --git a/Source/cmWorkerPool.h b/Source/cmWorkerPool.h
index 71c7d84..f08bb4f 100644
--- a/Source/cmWorkerPool.h
+++ b/Source/cmWorkerPool.h
@@ -50,12 +50,12 @@
     JobT& operator=(JobT const&) = delete;
 
     /**
-     * @brief Virtual destructor.
+     * Virtual destructor.
      */
     virtual ~JobT();
 
     /**
-     * @brief Fence job flag
+     * Fence job flag
      *
      * Fence jobs require that:
      * - all jobs before in the queue have been processed
@@ -66,7 +66,7 @@
 
   protected:
     /**
-     * @brief Protected default constructor
+     * Protected default constructor
      */
     JobT(bool fence = false)
       : Fence_(fence)
@@ -125,12 +125,12 @@
   };
 
   /**
-   * @brief Job handle type
+   * Job handle type
    */
   typedef std::unique_ptr<JobT> JobHandleT;
 
   /**
-   * @brief Fence job base class
+   * Fence job base class
    */
   class JobFenceT : public JobT
   {
@@ -144,8 +144,9 @@
   };
 
   /**
-   * @brief Fence job that aborts the worker pool.
-   * This class is useful as the last job in the job queue.
+   * Fence job that aborts the worker pool.
+   *
+   * Useful as the last job in the job queue.
    */
   class JobEndT : JobFenceT
   {
@@ -160,23 +161,29 @@
   ~cmWorkerPool();
 
   /**
-   * @brief Blocking function that starts threads to process all Jobs in
-   *        the queue.
+   * Number of worker threads.
+   */
+  unsigned int ThreadCount() const { return ThreadCount_; }
+
+  /**
+   * Set the number of worker threads.
+   *
+   * Calling this method during Process() has no effect.
+   */
+  void SetThreadCount(unsigned int threadCount);
+
+  /**
+   * Blocking function that starts threads to process all Jobs in the queue.
    *
    * This method blocks until a job calls the Abort() method.
    * @arg threadCount Number of threads to process jobs.
    * @arg userData Common user data pointer available in all Jobs.
    */
-  bool Process(unsigned int threadCount, void* userData = nullptr);
-
-  /**
-   * Number of worker threads passed to Process().
-   * Only valid during Process().
-   */
-  unsigned int ThreadCount() const { return ThreadCount_; }
+  bool Process(void* userData = nullptr);
 
   /**
    * User data reference passed to Process().
+   *
    * Only valid during Process().
    */
   void* UserData() const { return UserData_; }
@@ -184,14 +191,14 @@
   // -- Job processing interface
 
   /**
-   * @brief Clears the job queue and aborts all worker threads.
+   * Clears the job queue and aborts all worker threads.
    *
    * This method is thread safe and can be called from inside a job.
    */
   void Abort();
 
   /**
-   * @brief Push job to the queue.
+   * Push job to the queue.
    *
    * This method is thread safe and can be called from inside a job or before
    * Process().
@@ -199,7 +206,7 @@
   bool PushJob(JobHandleT&& jobHandle);
 
   /**
-   * @brief Push job to the queue
+   * Push job to the queue
    *
    * This method is thread safe and can be called from inside a job or before
    * Process().
@@ -212,7 +219,7 @@
 
 private:
   void* UserData_ = nullptr;
-  unsigned int ThreadCount_ = 0;
+  unsigned int ThreadCount_ = 1;
   std::unique_ptr<cmWorkerPoolInternal> Int_;
 };