| /* Distributed under the OSI-approved BSD 3-Clause License.  See accompanying | 
 |    file Copyright.txt or https://cmake.org/licensing for details.  */ | 
 | #include "cmWorkerPool.h" | 
 |  | 
 | #include <algorithm> | 
 | #include <array> | 
 | #include <condition_variable> | 
 | #include <cstddef> | 
 | #include <deque> | 
 | #include <functional> | 
 | #include <mutex> | 
 | #include <thread> | 
 |  | 
 | #include <cm/memory> | 
 |  | 
 | #include <cm3p/uv.h> | 
 |  | 
 | #include "cmRange.h" | 
 | #include "cmStringAlgorithms.h" | 
 | #include "cmUVHandlePtr.h" | 
 | #include "cmUVSignalHackRAII.h" // IWYU pragma: keep | 
 |  | 
 | /** | 
 |  * @brief libuv pipe buffer class | 
 |  */ | 
 | class cmUVPipeBuffer | 
 | { | 
 | public: | 
 |   using DataRange = cmRange<const char*>; | 
 |   using DataFunction = std::function<void(DataRange)>; | 
 |   /// On error the ssize_t argument is a non zero libuv error code | 
 |   using EndFunction = std::function<void(ssize_t)>; | 
 |  | 
 |   /** | 
 |    * Reset to construction state | 
 |    */ | 
 |   void reset(); | 
 |  | 
 |   /** | 
 |    * Initializes uv_pipe(), uv_stream() and uv_handle() | 
 |    * @return true on success | 
 |    */ | 
 |   bool init(uv_loop_t* uv_loop); | 
 |  | 
 |   /** | 
 |    * Start reading | 
 |    * @return true on success | 
 |    */ | 
 |   bool startRead(DataFunction dataFunction, EndFunction endFunction); | 
 |  | 
 |   //! libuv pipe | 
 |   uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); } | 
 |   //! uv_pipe() casted to libuv stream | 
 |   uv_stream_t* uv_stream() const | 
 |   { | 
 |     return static_cast<uv_stream_t*>(this->UVPipe_); | 
 |   } | 
 |   //! uv_pipe() casted to libuv handle | 
 |   uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); } | 
 |  | 
 | private: | 
 |   // -- Libuv callbacks | 
 |   static void UVAlloc(uv_handle_t* handle, size_t suggestedSize, | 
 |                       uv_buf_t* buf); | 
 |   static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); | 
 |  | 
 |   cm::uv_pipe_ptr UVPipe_; | 
 |   std::vector<char> Buffer_; | 
 |   DataFunction DataFunction_; | 
 |   EndFunction EndFunction_; | 
 | }; | 
 |  | 
 | void cmUVPipeBuffer::reset() | 
 | { | 
 |   if (this->UVPipe_.get() != nullptr) { | 
 |     this->EndFunction_ = nullptr; | 
 |     this->DataFunction_ = nullptr; | 
 |     this->Buffer_.clear(); | 
 |     this->Buffer_.shrink_to_fit(); | 
 |     this->UVPipe_.reset(); | 
 |   } | 
 | } | 
 |  | 
 | bool cmUVPipeBuffer::init(uv_loop_t* uv_loop) | 
 | { | 
 |   this->reset(); | 
 |   if (uv_loop == nullptr) { | 
 |     return false; | 
 |   } | 
 |   int ret = this->UVPipe_.init(*uv_loop, 0, this); | 
 |   return (ret == 0); | 
 | } | 
 |  | 
 | bool cmUVPipeBuffer::startRead(DataFunction dataFunction, | 
 |                                EndFunction endFunction) | 
 | { | 
 |   if (this->UVPipe_.get() == nullptr) { | 
 |     return false; | 
 |   } | 
 |   if (!dataFunction || !endFunction) { | 
 |     return false; | 
 |   } | 
 |   this->DataFunction_ = std::move(dataFunction); | 
 |   this->EndFunction_ = std::move(endFunction); | 
 |   int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc, | 
 |                           &cmUVPipeBuffer::UVData); | 
 |   return (ret == 0); | 
 | } | 
 |  | 
 | void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize, | 
 |                              uv_buf_t* buf) | 
 | { | 
 |   auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data); | 
 |   pipe.Buffer_.resize(suggestedSize); | 
 |   buf->base = pipe.Buffer_.data(); | 
 |   buf->len = static_cast<unsigned long>(pipe.Buffer_.size()); | 
 | } | 
 |  | 
 | void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread, | 
 |                             const uv_buf_t* buf) | 
 | { | 
 |   auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data); | 
 |   if (nread > 0) { | 
 |     if (buf->base != nullptr) { | 
 |       // Call data function | 
 |       pipe.DataFunction_(DataRange(buf->base, buf->base + nread)); | 
 |     } | 
 |   } else if (nread < 0) { | 
 |     // Save the end function on the stack before resetting the pipe | 
 |     EndFunction efunc; | 
 |     efunc.swap(pipe.EndFunction_); | 
 |     // Reset pipe before calling the end function | 
 |     pipe.reset(); | 
 |     // Call end function | 
 |     efunc((nread == UV_EOF) ? 0 : nread); | 
 |   } | 
 | } | 
 |  | 
 | /** | 
 |  * @brief External process management class | 
 |  */ | 
 | class cmUVReadOnlyProcess | 
 | { | 
 | public: | 
 |   // -- Types | 
 |   //! @brief Process settings | 
 |   struct SetupT | 
 |   { | 
 |     std::string WorkingDirectory; | 
 |     std::vector<std::string> Command; | 
 |     cmWorkerPool::ProcessResultT* Result = nullptr; | 
 |     bool MergedOutput = false; | 
 |   }; | 
 |  | 
 |   // -- Const accessors | 
 |   SetupT const& Setup() const { return this->Setup_; } | 
 |   cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; } | 
 |   bool IsStarted() const { return this->IsStarted_; } | 
 |   bool IsFinished() const { return this->IsFinished_; } | 
 |  | 
 |   // -- Runtime | 
 |   void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput, | 
 |              std::vector<std::string> const& command, | 
 |              std::string const& workingDirectory = std::string()); | 
 |   bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback); | 
 |  | 
 | private: | 
 |   // -- Libuv callbacks | 
 |   static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal); | 
 |   void UVPipeOutData(cmUVPipeBuffer::DataRange data) const; | 
 |   void UVPipeOutEnd(ssize_t error); | 
 |   void UVPipeErrData(cmUVPipeBuffer::DataRange data) const; | 
 |   void UVPipeErrEnd(ssize_t error); | 
 |   void UVTryFinish(); | 
 |  | 
 |   // -- Setup | 
 |   SetupT Setup_; | 
 |   // -- Runtime | 
 |   bool IsStarted_ = false; | 
 |   bool IsFinished_ = false; | 
 |   std::function<void()> FinishedCallback_; | 
 |   std::vector<const char*> CommandPtr_; | 
 |   std::array<uv_stdio_container_t, 3> UVOptionsStdIO_; | 
 |   uv_process_options_t UVOptions_; | 
 |   cm::uv_process_ptr UVProcess_; | 
 |   cmUVPipeBuffer UVPipeOut_; | 
 |   cmUVPipeBuffer UVPipeErr_; | 
 | }; | 
 |  | 
 | void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result, | 
 |                                 bool mergedOutput, | 
 |                                 std::vector<std::string> const& command, | 
 |                                 std::string const& workingDirectory) | 
 | { | 
 |   this->Setup_.WorkingDirectory = workingDirectory; | 
 |   this->Setup_.Command = command; | 
 |   this->Setup_.Result = result; | 
 |   this->Setup_.MergedOutput = mergedOutput; | 
 | } | 
 |  | 
 | bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop, | 
 |                                 std::function<void()> finishedCallback) | 
 | { | 
 |   if (this->IsStarted() || (this->Result() == nullptr)) { | 
 |     return false; | 
 |   } | 
 |  | 
 |   // Reset result before the start | 
 |   this->Result()->reset(); | 
 |  | 
 |   // Fill command string pointers | 
 |   if (!this->Setup().Command.empty()) { | 
 |     this->CommandPtr_.reserve(this->Setup().Command.size() + 1); | 
 |     for (std::string const& arg : this->Setup().Command) { | 
 |       this->CommandPtr_.push_back(arg.c_str()); | 
 |     } | 
 |     this->CommandPtr_.push_back(nullptr); | 
 |   } else { | 
 |     this->Result()->ErrorMessage = "Empty command"; | 
 |   } | 
 |  | 
 |   if (!this->Result()->error()) { | 
 |     if (!this->UVPipeOut_.init(uv_loop)) { | 
 |       this->Result()->ErrorMessage = "libuv stdout pipe initialization failed"; | 
 |     } | 
 |   } | 
 |   if (!this->Result()->error()) { | 
 |     if (!this->UVPipeErr_.init(uv_loop)) { | 
 |       this->Result()->ErrorMessage = "libuv stderr pipe initialization failed"; | 
 |     } | 
 |   } | 
 |   if (!this->Result()->error()) { | 
 |     // -- Setup process stdio options | 
 |     // stdin | 
 |     this->UVOptionsStdIO_[0].flags = UV_IGNORE; | 
 |     this->UVOptionsStdIO_[0].data.stream = nullptr; | 
 |     // stdout | 
 |     this->UVOptionsStdIO_[1].flags = | 
 |       static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE); | 
 |     this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream(); | 
 |     // stderr | 
 |     this->UVOptionsStdIO_[2].flags = | 
 |       static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE); | 
 |     this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream(); | 
 |  | 
 |     // -- Setup process options | 
 |     std::fill_n(reinterpret_cast<char*>(&this->UVOptions_), | 
 |                 sizeof(this->UVOptions_), 0); | 
 |     this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit; | 
 |     this->UVOptions_.file = this->CommandPtr_[0]; | 
 |     this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data()); | 
 |     this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str(); | 
 |     this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE; | 
 |     this->UVOptions_.stdio_count = | 
 |       static_cast<int>(this->UVOptionsStdIO_.size()); | 
 |     this->UVOptions_.stdio = this->UVOptionsStdIO_.data(); | 
 |  | 
 |     // -- Spawn process | 
 |     int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this); | 
 |     if (uvErrorCode != 0) { | 
 |       this->Result()->ErrorMessage = "libuv process spawn failed"; | 
 |       if (const char* uvErr = uv_strerror(uvErrorCode)) { | 
 |         this->Result()->ErrorMessage += ": "; | 
 |         this->Result()->ErrorMessage += uvErr; | 
 |       } | 
 |     } | 
 |   } | 
 |   // -- Start reading from stdio streams | 
 |   if (!this->Result()->error()) { | 
 |     if (!this->UVPipeOut_.startRead( | 
 |           [this](cmUVPipeBuffer::DataRange range) { | 
 |             this->UVPipeOutData(range); | 
 |           }, | 
 |           [this](ssize_t error) { this->UVPipeOutEnd(error); })) { | 
 |       this->Result()->ErrorMessage = | 
 |         "libuv start reading from stdout pipe failed"; | 
 |     } | 
 |   } | 
 |   if (!this->Result()->error()) { | 
 |     if (!this->UVPipeErr_.startRead( | 
 |           [this](cmUVPipeBuffer::DataRange range) { | 
 |             this->UVPipeErrData(range); | 
 |           }, | 
 |           [this](ssize_t error) { this->UVPipeErrEnd(error); })) { | 
 |       this->Result()->ErrorMessage = | 
 |         "libuv start reading from stderr pipe failed"; | 
 |     } | 
 |   } | 
 |  | 
 |   if (!this->Result()->error()) { | 
 |     this->IsStarted_ = true; | 
 |     this->FinishedCallback_ = std::move(finishedCallback); | 
 |   } else { | 
 |     // Clear libuv handles and finish | 
 |     this->UVProcess_.reset(); | 
 |     this->UVPipeOut_.reset(); | 
 |     this->UVPipeErr_.reset(); | 
 |     this->CommandPtr_.clear(); | 
 |   } | 
 |  | 
 |   return this->IsStarted(); | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus, | 
 |                                  int termSignal) | 
 | { | 
 |   auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data); | 
 |   if (proc.IsStarted() && !proc.IsFinished()) { | 
 |     // Set error message on demand | 
 |     proc.Result()->ExitStatus = exitStatus; | 
 |     proc.Result()->TermSignal = termSignal; | 
 |     if (!proc.Result()->error()) { | 
 |       if (termSignal != 0) { | 
 |         proc.Result()->ErrorMessage = cmStrCat( | 
 |           "Process was terminated by signal ", proc.Result()->TermSignal); | 
 |       } else if (exitStatus != 0) { | 
 |         proc.Result()->ErrorMessage = cmStrCat( | 
 |           "Process failed with return value ", proc.Result()->ExitStatus); | 
 |       } | 
 |     } | 
 |  | 
 |     // Reset process handle | 
 |     proc.UVProcess_.reset(); | 
 |     // Try finish | 
 |     proc.UVTryFinish(); | 
 |   } | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const | 
 | { | 
 |   this->Result()->StdOut.append(data.begin(), data.end()); | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error) | 
 | { | 
 |   // Process pipe error | 
 |   if ((error != 0) && !this->Result()->error()) { | 
 |     this->Result()->ErrorMessage = cmStrCat( | 
 |       "Reading from stdout pipe failed with libuv error code ", error); | 
 |   } | 
 |   // Try finish | 
 |   this->UVTryFinish(); | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const | 
 | { | 
 |   std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut | 
 |                                                : &this->Result()->StdErr; | 
 |   str->append(data.begin(), data.end()); | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error) | 
 | { | 
 |   // Process pipe error | 
 |   if ((error != 0) && !this->Result()->error()) { | 
 |     this->Result()->ErrorMessage = cmStrCat( | 
 |       "Reading from stderr pipe failed with libuv error code ", error); | 
 |   } | 
 |   // Try finish | 
 |   this->UVTryFinish(); | 
 | } | 
 |  | 
 | void cmUVReadOnlyProcess::UVTryFinish() | 
 | { | 
 |   // There still might be data in the pipes after the process has finished. | 
 |   // Therefore check if the process is finished AND all pipes are closed | 
 |   // before signaling the worker thread to continue. | 
 |   if ((this->UVProcess_.get() != nullptr) || | 
 |       (this->UVPipeOut_.uv_pipe() != nullptr) || | 
 |       (this->UVPipeErr_.uv_pipe() != nullptr)) { | 
 |     return; | 
 |   } | 
 |   this->IsFinished_ = true; | 
 |   this->FinishedCallback_(); | 
 | } | 
 |  | 
 | /** | 
 |  * @brief Worker pool worker thread | 
 |  */ | 
 | class cmWorkerPoolWorker | 
 | { | 
 | public: | 
 |   cmWorkerPoolWorker(uv_loop_t& uvLoop); | 
 |   ~cmWorkerPoolWorker(); | 
 |  | 
 |   cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete; | 
 |   cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete; | 
 |  | 
 |   /** | 
 |    * Set the internal thread | 
 |    */ | 
 |   void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); } | 
 |  | 
 |   /** | 
 |    * Run an external process | 
 |    */ | 
 |   bool RunProcess(cmWorkerPool::ProcessResultT& result, | 
 |                   std::vector<std::string> const& command, | 
 |                   std::string const& workingDirectory); | 
 |  | 
 | private: | 
 |   // -- Libuv callbacks | 
 |   static void UVProcessStart(uv_async_t* handle); | 
 |   void UVProcessFinished(); | 
 |  | 
 |   // -- Process management | 
 |   struct | 
 |   { | 
 |     std::mutex Mutex; | 
 |     cm::uv_async_ptr Request; | 
 |     std::condition_variable Condition; | 
 |     std::unique_ptr<cmUVReadOnlyProcess> ROP; | 
 |   } Proc_; | 
 |   // -- System thread | 
 |   std::thread Thread_; | 
 | }; | 
 |  | 
 | cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop) | 
 | { | 
 |   this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this); | 
 | } | 
 |  | 
 | cmWorkerPoolWorker::~cmWorkerPoolWorker() | 
 | { | 
 |   if (this->Thread_.joinable()) { | 
 |     this->Thread_.join(); | 
 |   } | 
 | } | 
 |  | 
 | bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result, | 
 |                                     std::vector<std::string> const& command, | 
 |                                     std::string const& workingDirectory) | 
 | { | 
 |   if (command.empty()) { | 
 |     return false; | 
 |   } | 
 |   // Create process instance | 
 |   { | 
 |     std::lock_guard<std::mutex> lock(this->Proc_.Mutex); | 
 |     this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>(); | 
 |     this->Proc_.ROP->setup(&result, true, command, workingDirectory); | 
 |   } | 
 |   // Send asynchronous process start request to libuv loop | 
 |   this->Proc_.Request.send(); | 
 |   // Wait until the process has been finished and destroyed | 
 |   { | 
 |     std::unique_lock<std::mutex> ulock(this->Proc_.Mutex); | 
 |     while (this->Proc_.ROP) { | 
 |       this->Proc_.Condition.wait(ulock); | 
 |     } | 
 |   } | 
 |   return !result.error(); | 
 | } | 
 |  | 
 | void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle) | 
 | { | 
 |   auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data); | 
 |   bool startFailed = false; | 
 |   { | 
 |     auto& Proc = wrk->Proc_; | 
 |     std::lock_guard<std::mutex> lock(Proc.Mutex); | 
 |     if (Proc.ROP && !Proc.ROP->IsStarted()) { | 
 |       startFailed = | 
 |         !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); }); | 
 |     } | 
 |   } | 
 |   // Clean up if starting of the process failed | 
 |   if (startFailed) { | 
 |     wrk->UVProcessFinished(); | 
 |   } | 
 | } | 
 |  | 
 | void cmWorkerPoolWorker::UVProcessFinished() | 
 | { | 
 |   std::lock_guard<std::mutex> lock(this->Proc_.Mutex); | 
 |   if (this->Proc_.ROP && | 
 |       (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) { | 
 |     this->Proc_.ROP.reset(); | 
 |   } | 
 |   // Notify idling thread | 
 |   this->Proc_.Condition.notify_one(); | 
 | } | 
 |  | 
 | /** | 
 |  * @brief Private worker pool internals | 
 |  */ | 
 | class cmWorkerPoolInternal | 
 | { | 
 | public: | 
 |   // -- Constructors | 
 |   cmWorkerPoolInternal(cmWorkerPool* pool); | 
 |   ~cmWorkerPoolInternal(); | 
 |  | 
 |   /** | 
 |    * Runs the libuv loop. | 
 |    */ | 
 |   bool Process(); | 
 |  | 
 |   /** | 
 |    * Clear queue and abort threads. | 
 |    */ | 
 |   void Abort(); | 
 |  | 
 |   /** | 
 |    * Push a job to the queue and notify a worker. | 
 |    */ | 
 |   bool PushJob(cmWorkerPool::JobHandleT&& jobHandle); | 
 |  | 
 |   /** | 
 |    * Worker thread main loop method. | 
 |    */ | 
 |   void Work(unsigned int workerIndex); | 
 |  | 
 |   // -- Request slots | 
 |   static void UVSlotBegin(uv_async_t* handle); | 
 |   static void UVSlotEnd(uv_async_t* handle); | 
 |  | 
 |   // -- UV loop | 
 | #ifdef CMAKE_UV_SIGNAL_HACK | 
 |   std::unique_ptr<cmUVSignalHackRAII> UVHackRAII; | 
 | #endif | 
 |   std::unique_ptr<uv_loop_t> UVLoop; | 
 |   cm::uv_async_ptr UVRequestBegin; | 
 |   cm::uv_async_ptr UVRequestEnd; | 
 |  | 
 |   // -- Thread pool and job queue | 
 |   std::mutex Mutex; | 
 |   bool Processing = false; | 
 |   bool Aborting = false; | 
 |   bool FenceProcessing = false; | 
 |   unsigned int WorkersRunning = 0; | 
 |   unsigned int WorkersIdle = 0; | 
 |   unsigned int JobsProcessing = 0; | 
 |   std::deque<cmWorkerPool::JobHandleT> Queue; | 
 |   std::condition_variable Condition; | 
 |   std::condition_variable ConditionFence; | 
 |   std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers; | 
 |  | 
 |   // -- References | 
 |   cmWorkerPool* Pool = nullptr; | 
 | }; | 
 |  | 
 | void cmWorkerPool::ProcessResultT::reset() | 
 | { | 
 |   this->ExitStatus = 0; | 
 |   this->TermSignal = 0; | 
 |   if (!this->StdOut.empty()) { | 
 |     this->StdOut.clear(); | 
 |     this->StdOut.shrink_to_fit(); | 
 |   } | 
 |   if (!this->StdErr.empty()) { | 
 |     this->StdErr.clear(); | 
 |     this->StdErr.shrink_to_fit(); | 
 |   } | 
 |   if (!this->ErrorMessage.empty()) { | 
 |     this->ErrorMessage.clear(); | 
 |     this->ErrorMessage.shrink_to_fit(); | 
 |   } | 
 | } | 
 |  | 
 | cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool) | 
 |   : Pool(pool) | 
 | { | 
 |   // Initialize libuv loop | 
 |   uv_disable_stdio_inheritance(); | 
 | #ifdef CMAKE_UV_SIGNAL_HACK | 
 |   UVHackRAII = cm::make_unique<cmUVSignalHackRAII>(); | 
 | #endif | 
 |   this->UVLoop = cm::make_unique<uv_loop_t>(); | 
 |   uv_loop_init(this->UVLoop.get()); | 
 | } | 
 |  | 
 | cmWorkerPoolInternal::~cmWorkerPoolInternal() | 
 | { | 
 |   uv_loop_close(this->UVLoop.get()); | 
 | } | 
 |  | 
 | bool cmWorkerPoolInternal::Process() | 
 | { | 
 |   // Reset state flags | 
 |   this->Processing = true; | 
 |   this->Aborting = false; | 
 |   // Initialize libuv asynchronous request | 
 |   this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin, | 
 |                             this); | 
 |   this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd, | 
 |                           this); | 
 |   // Send begin request | 
 |   this->UVRequestBegin.send(); | 
 |   // Run libuv loop | 
 |   bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0); | 
 |   // Update state flags | 
 |   this->Processing = false; | 
 |   this->Aborting = false; | 
 |   return success; | 
 | } | 
 |  | 
 | void cmWorkerPoolInternal::Abort() | 
 | { | 
 |   // Clear all jobs and set abort flag | 
 |   std::lock_guard<std::mutex> guard(this->Mutex); | 
 |   if (!this->Aborting) { | 
 |     // Register abort and clear queue | 
 |     this->Aborting = true; | 
 |     this->Queue.clear(); | 
 |     this->Condition.notify_all(); | 
 |   } | 
 | } | 
 |  | 
 | inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle) | 
 | { | 
 |   std::lock_guard<std::mutex> guard(this->Mutex); | 
 |   if (this->Aborting) { | 
 |     return false; | 
 |   } | 
 |   // Append the job to the queue | 
 |   this->Queue.emplace_back(std::move(jobHandle)); | 
 |   // Notify an idle worker if there's one | 
 |   if (this->WorkersIdle != 0) { | 
 |     this->Condition.notify_one(); | 
 |   } | 
 |   // Return success | 
 |   return true; | 
 | } | 
 |  | 
 | void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle) | 
 | { | 
 |   auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data); | 
 |   // Create worker threads | 
 |   { | 
 |     unsigned int const num = gint.Pool->ThreadCount(); | 
 |     // Create workers | 
 |     gint.Workers.reserve(num); | 
 |     for (unsigned int ii = 0; ii != num; ++ii) { | 
 |       gint.Workers.emplace_back( | 
 |         cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop)); | 
 |     } | 
 |     // Start worker threads | 
 |     for (unsigned int ii = 0; ii != num; ++ii) { | 
 |       gint.Workers[ii]->SetThread( | 
 |         std::thread(&cmWorkerPoolInternal::Work, &gint, ii)); | 
 |     } | 
 |   } | 
 |   // Destroy begin request | 
 |   gint.UVRequestBegin.reset(); | 
 | } | 
 |  | 
 | void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle) | 
 | { | 
 |   auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data); | 
 |   // Join and destroy worker threads | 
 |   gint.Workers.clear(); | 
 |   // Destroy end request | 
 |   gint.UVRequestEnd.reset(); | 
 | } | 
 |  | 
 | void cmWorkerPoolInternal::Work(unsigned int workerIndex) | 
 | { | 
 |   cmWorkerPool::JobHandleT jobHandle; | 
 |   std::unique_lock<std::mutex> uLock(this->Mutex); | 
 |   // Increment running workers count | 
 |   ++this->WorkersRunning; | 
 |   // Enter worker main loop | 
 |   while (true) { | 
 |     // Abort on request | 
 |     if (this->Aborting) { | 
 |       break; | 
 |     } | 
 |     // Wait for new jobs on the main CV | 
 |     if (this->Queue.empty()) { | 
 |       ++this->WorkersIdle; | 
 |       this->Condition.wait(uLock); | 
 |       --this->WorkersIdle; | 
 |       continue; | 
 |     } | 
 |  | 
 |     // If there is a fence currently active or waiting, | 
 |     // sleep on the main CV and try again. | 
 |     if (this->FenceProcessing) { | 
 |       this->Condition.wait(uLock); | 
 |       continue; | 
 |     } | 
 |  | 
 |     // Pop next job from queue | 
 |     jobHandle = std::move(this->Queue.front()); | 
 |     this->Queue.pop_front(); | 
 |  | 
 |     // Check for fence jobs | 
 |     bool raisedFence = false; | 
 |     if (jobHandle->IsFence()) { | 
 |       this->FenceProcessing = true; | 
 |       raisedFence = true; | 
 |       // Wait on the Fence CV until all pending jobs are done. | 
 |       while (this->JobsProcessing != 0 && !this->Aborting) { | 
 |         this->ConditionFence.wait(uLock); | 
 |       } | 
 |       // When aborting, explicitly kick all threads alive once more. | 
 |       if (this->Aborting) { | 
 |         this->FenceProcessing = false; | 
 |         this->Condition.notify_all(); | 
 |         break; | 
 |       } | 
 |     } | 
 |  | 
 |     // Unlocked scope for job processing | 
 |     ++this->JobsProcessing; | 
 |     { | 
 |       uLock.unlock(); | 
 |       jobHandle->Work(this->Pool, workerIndex); // Process job | 
 |       jobHandle.reset();                        // Destroy job | 
 |       uLock.lock(); | 
 |     } | 
 |     --this->JobsProcessing; | 
 |  | 
 |     // If this was the thread that entered fence processing | 
 |     // originally, notify all idling workers that the fence | 
 |     // is done. | 
 |     if (raisedFence) { | 
 |       this->FenceProcessing = false; | 
 |       this->Condition.notify_all(); | 
 |     } | 
 |     // If fence processing is still not done, notify the | 
 |     // the fencing worker when all active jobs are done. | 
 |     if (this->FenceProcessing && this->JobsProcessing == 0) { | 
 |       this->ConditionFence.notify_all(); | 
 |     } | 
 |   } | 
 |  | 
 |   // Decrement running workers count | 
 |   if (--this->WorkersRunning == 0) { | 
 |     // Last worker thread about to finish. Send libuv event. | 
 |     this->UVRequestEnd.send(); | 
 |   } | 
 | } | 
 |  | 
 | cmWorkerPool::JobT::~JobT() = default; | 
 |  | 
 | bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result, | 
 |                                     std::vector<std::string> const& command, | 
 |                                     std::string const& workingDirectory) | 
 | { | 
 |   // Get worker by index | 
 |   auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get(); | 
 |   return wrk->RunProcess(result, command, workingDirectory); | 
 | } | 
 |  | 
 | cmWorkerPool::cmWorkerPool() | 
 |   : Int_(cm::make_unique<cmWorkerPoolInternal>(this)) | 
 | { | 
 | } | 
 |  | 
 | cmWorkerPool::~cmWorkerPool() = default; | 
 |  | 
 | void cmWorkerPool::SetThreadCount(unsigned int threadCount) | 
 | { | 
 |   if (!this->Int_->Processing) { | 
 |     this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u; | 
 |   } | 
 | } | 
 |  | 
 | bool cmWorkerPool::Process(void* userData) | 
 | { | 
 |   // Setup user data | 
 |   this->UserData_ = userData; | 
 |   // Run libuv loop | 
 |   bool success = this->Int_->Process(); | 
 |   // Clear user data | 
 |   this->UserData_ = nullptr; | 
 |   // Return | 
 |   return success; | 
 | } | 
 |  | 
 | bool cmWorkerPool::PushJob(JobHandleT&& jobHandle) | 
 | { | 
 |   return this->Int_->PushJob(std::move(jobHandle)); | 
 | } | 
 |  | 
 | void cmWorkerPool::Abort() | 
 | { | 
 |   this->Int_->Abort(); | 
 | } |