| //===--- TaskQueue.inc - Unix-specific TaskQueue ----------------*- C++ -*-===// |
| // |
| // This source file is part of the Swift.org open source project |
| // |
| // Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors |
| // Licensed under Apache License v2.0 with Runtime Library Exception |
| // |
| // See https://swift.org/LICENSE.txt for license information |
| // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors |
| // |
| //===----------------------------------------------------------------------===// |
| |
| #include "swift/Basic/TaskQueue.h" |
| #include "swift/Basic/Statistic.h" |
| |
| #include "llvm/ADT/StringRef.h" |
| #include "llvm/ADT/DenseMap.h" |
| #include "llvm/ADT/DenseSet.h" |
| #include "llvm/Support/ErrorHandling.h" |
| |
| #include <string> |
| #include <cerrno> |
| |
| #if HAVE_POSIX_SPAWN |
| #include <spawn.h> |
| #endif |
| |
| #if HAVE_UNISTD_H |
| #include <unistd.h> |
| #endif |
| |
| #if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__) |
| #include <sys/resource.h> |
| #endif |
| |
| #include <poll.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| |
| #if !defined(__APPLE__) |
| extern char **environ; |
| #else |
| extern "C" { |
| // _NSGetEnviron is from crt_externs.h which is missing in the iOS SDK. |
| extern char ***_NSGetEnviron(void); |
| } |
| #endif |
| |
| namespace swift { |
| namespace sys { |
| |
| #if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__) |
| TaskProcessInformation::TaskProcessInformation(ProcessId Pid, struct rusage Usage) |
| : TaskProcessInformation(Pid, |
| uint64_t(Usage.ru_utime.tv_sec) * 1000000 + |
| uint64_t(Usage.ru_utime.tv_usec), |
| uint64_t(Usage.ru_stime.tv_sec) * 1000000 + |
| uint64_t(Usage.ru_stime.tv_usec), |
| Usage.ru_maxrss) { |
| #ifndef __APPLE__ |
| // Apple systems report bytes; everything else appears to report KB. |
| this->ProcessUsage.getValue().Maxrss <<= 10; |
| #endif // __APPLE__ |
| } |
| #endif // defined(HAVE_GETRUSAGE) && !defined(__HAIKU__) |
| |
| class Task { |
| /// The path to the executable which this Task will execute. |
| const char *ExecPath; |
| |
| /// Any arguments which should be passed during execution. |
| ArrayRef<const char *> Args; |
| |
| /// The environment which will be used during execution. If empty, uses |
| /// this process's environment. |
| ArrayRef<const char *> Env; |
| |
| /// Context which should be associated with this task. |
| void *Context; |
| |
| /// True if the errors of the Task should be stored in Errors instead of Output. |
| bool SeparateErrors; |
| |
| /// The pid of this Task when executing. |
| pid_t Pid; |
| |
| /// A pipe for reading output from the child process. |
| int Pipe; |
| |
| /// A pipe for reading errors from the child prcess, if SeparateErrors is true. |
| int ErrorPipe; |
| |
| /// The current state of the Task. |
| enum class TaskState { Preparing, Executing, Finished } State; |
| |
| /// Once the Task has finished, this contains the buffered output of the Task. |
| std::string Output; |
| |
| /// Once the Task has finished, if SeparateErrors is true, this contains the |
| /// errors from the Task. |
| std::string Errors; |
| |
| /// Optional place to count I/O and subprocess events. |
| UnifiedStatsReporter *Stats; |
| |
| public: |
| Task(const char *ExecPath, ArrayRef<const char *> Args, |
| ArrayRef<const char *> Env, void *Context, bool SeparateErrors, |
| UnifiedStatsReporter *USR) |
| : ExecPath(ExecPath), Args(Args), Env(Env), Context(Context), |
| SeparateErrors(SeparateErrors), Pid(-1), Pipe(-1), ErrorPipe(-1), |
| State(TaskState::Preparing), Stats(USR) { |
| assert((Env.empty() || Env.back() == nullptr) && |
| "Env must either be empty or null-terminated!"); |
| } |
| |
| const char *getExecPath() const { return ExecPath; } |
| ArrayRef<const char *> getArgs() const { return Args; } |
| StringRef getOutput() const { return Output; } |
| StringRef getErrors() const { return Errors; } |
| void *getContext() const { return Context; } |
| pid_t getPid() const { return Pid; } |
| int getPipe() const { return Pipe; } |
| int getErrorPipe() const { return ErrorPipe; } |
| |
| /// \brief Begins execution of this Task. |
| /// \returns true on error. |
| bool execute(); |
| |
| /// \brief Reads data from the pipes, if any is available. |
| /// |
| /// If \p UntilEnd is true, reads until the end of the stream; otherwise reads |
| /// once (possibly with a retry on EINTR), and returns. |
| /// \returns true on error. |
| bool readFromPipes(bool UntilEnd); |
| |
| /// \brief Performs any post-execution work for this Task, such as reading |
| /// piped output and closing the pipe. |
| void finishExecution(); |
| }; |
| |
| } // end namespace sys |
| } // end namespace swift |
| |
| bool Task::execute() { |
| assert(State < TaskState::Executing && "This Task cannot be executed twice!"); |
| State = TaskState::Executing; |
| |
| // Construct argv. |
| SmallVector<const char *, 128> Argv; |
| Argv.push_back(ExecPath); |
| Argv.append(Args.begin(), Args.end()); |
| Argv.push_back(0); // argv is expected to be null-terminated. |
| |
| // Set up the pipe. |
| int FullPipe[2]; |
| pipe(FullPipe); |
| Pipe = FullPipe[0]; |
| |
| int FullErrorPipe[2]; |
| if (SeparateErrors) { |
| pipe(FullErrorPipe); |
| ErrorPipe = FullErrorPipe[0]; |
| } |
| |
| // Get the environment to pass down to the subtask. |
| const char *const *envp = Env.empty() ? nullptr : Env.data(); |
| if (!envp) { |
| #if __APPLE__ |
| envp = *_NSGetEnviron(); |
| #else |
| envp = environ; |
| #endif |
| } |
| |
| const char **argvp = Argv.data(); |
| |
| #if HAVE_POSIX_SPAWN |
| posix_spawn_file_actions_t FileActions; |
| posix_spawn_file_actions_init(&FileActions); |
| |
| posix_spawn_file_actions_adddup2(&FileActions, FullPipe[1], STDOUT_FILENO); |
| |
| if (SeparateErrors) { |
| posix_spawn_file_actions_adddup2(&FileActions, FullErrorPipe[1], |
| STDERR_FILENO); |
| } else { |
| posix_spawn_file_actions_adddup2(&FileActions, STDOUT_FILENO, |
| STDERR_FILENO); |
| } |
| |
| posix_spawn_file_actions_addclose(&FileActions, FullPipe[0]); |
| if (SeparateErrors) { |
| posix_spawn_file_actions_addclose(&FileActions, FullErrorPipe[0]); |
| } |
| |
| // Spawn the subtask. |
| int spawnErr = |
| posix_spawn(&Pid, ExecPath, &FileActions, nullptr, |
| const_cast<char **>(argvp), const_cast<char **>(envp)); |
| |
| posix_spawn_file_actions_destroy(&FileActions); |
| close(FullPipe[1]); |
| if (SeparateErrors) { |
| close(FullErrorPipe[1]); |
| } |
| |
| if (spawnErr != 0 || Pid == 0) { |
| close(FullPipe[0]); |
| if (SeparateErrors) { |
| close(FullErrorPipe[0]); |
| } |
| State = TaskState::Finished; |
| return true; |
| } |
| #else |
| Pid = fork(); |
| switch (Pid) { |
| case -1: { |
| close(FullPipe[0]); |
| if (SeparateErrors) { |
| close(FullErrorPipe[0]); |
| } |
| State = TaskState::Finished; |
| Pid = 0; |
| break; |
| } |
| case 0: { |
| // Child process: Execute the program. |
| dup2(FullPipe[1], STDOUT_FILENO); |
| if (SeparateErrors) { |
| dup2(FullErrorPipe[1], STDERR_FILENO); |
| } else { |
| dup2(STDOUT_FILENO, STDERR_FILENO); |
| } |
| close(FullPipe[0]); |
| if (SeparateErrors) { |
| close(FullErrorPipe[0]); |
| } |
| execve(ExecPath, const_cast<char **>(argvp), const_cast<char **>(envp)); |
| |
| // If the execve() failed, we should exit. Follow Unix protocol and |
| // return 127 if the executable was not found, and 126 otherwise. |
| // Use _exit rather than exit so that atexit functions and static |
| // object destructors cloned from the parent process aren't |
| // redundantly run, and so that any data buffered in stdio buffers |
| // cloned from the parent aren't redundantly written out. |
| _exit(errno == ENOENT ? 127 : 126); |
| } |
| default: |
| // Parent process: Break out of the switch to do our processing. |
| break; |
| } |
| |
| close(FullPipe[1]); |
| if (SeparateErrors) { |
| close(FullErrorPipe[1]); |
| } |
| |
| if (Pid == 0) |
| return true; |
| #endif |
| |
| return false; |
| } |
| |
| /// Read the data in \p Pipe, and append it to \p Output. |
| /// \p Pipe must be in blocking mode, and must contain unread data. |
| /// If \p UntilEnd is true, keep reading, and possibly blocking, till the pipe |
| /// is closed. If \p UntilEnd is false, just read once. Return true if error |
| static bool readFromAPipe(std::string &Output, int Pipe, |
| UnifiedStatsReporter *Stats, bool UntilEnd) { |
| char outputBuffer[1024]; |
| ssize_t readBytes = 0; |
| while ((readBytes = read(Pipe, outputBuffer, sizeof(outputBuffer))) != 0) { |
| if (readBytes < 0) { |
| if (errno == EINTR) |
| // read() was interrupted, so try again. |
| // Q: Why isn't there a counter to break out of this loop if there are |
| // more than some number of EINTRs? |
| // A: EINTR on a blocking read means only one thing: the syscall was |
| // interrupted and the program should retry. So there is no need to |
| // stop retrying after any particular number of interruptions (any |
| // more than the program would stop reading after a particular number |
| // of bytes or whatever). |
| continue; |
| return true; |
| } |
| Output.append(outputBuffer, readBytes); |
| if (Stats) |
| Stats->getDriverCounters().NumDriverPipeReads++; |
| if (!UntilEnd) |
| break; |
| } |
| return false; |
| } |
| |
| bool Task::readFromPipes(bool UntilEnd) { |
| bool Ret = readFromAPipe(Output, Pipe, Stats, UntilEnd); |
| if (SeparateErrors) { |
| Ret |= readFromAPipe(Errors, ErrorPipe, Stats, UntilEnd); |
| } |
| return Ret; |
| } |
| |
| void Task::finishExecution() { |
| assert(State == TaskState::Executing && |
| "This Task must be executing to finish execution!"); |
| |
| State = TaskState::Finished; |
| |
| // Read the output of the command, so we can use it later. |
| readFromPipes(/*UntilEnd*/ false); |
| |
| close(Pipe); |
| if (SeparateErrors) { |
| close(ErrorPipe); |
| } |
| } |
| |
| bool TaskQueue::supportsBufferingOutput() { |
| // The Unix implementation supports buffering output. |
| return true; |
| } |
| |
| bool TaskQueue::supportsParallelExecution() { |
| // The Unix implementation supports parallel execution. |
| return true; |
| } |
| |
| unsigned TaskQueue::getNumberOfParallelTasks() const { |
| // TODO: add support for choosing a better default value for |
| // MaxNumberOfParallelTasks if NumberOfParallelTasks is 0. (Optimally, this |
| // should choose a value > 1 tailored to the current system.) |
| return NumberOfParallelTasks > 0 ? NumberOfParallelTasks : 1; |
| } |
| |
| void TaskQueue::addTask(const char *ExecPath, ArrayRef<const char *> Args, |
| ArrayRef<const char *> Env, void *Context, |
| bool SeparateErrors) { |
| std::unique_ptr<Task> T( |
| new Task(ExecPath, Args, Env, Context, SeparateErrors, Stats)); |
| QueuedTasks.push(std::move(T)); |
| } |
| |
| /// Owns Tasks, handles correspondence between Tasks, file descriptors, and |
| /// process IDs. |
| /// FIXME: only handles stdout pipes, ignores stderr pipes. |
| class TaskMap { |
| using PidToTaskMap = llvm::DenseMap<pid_t, std::unique_ptr<Task>>; |
| PidToTaskMap TasksByPid; |
| |
| public: |
| TaskMap() = default; |
| |
| bool empty() const { return TasksByPid.empty(); } |
| unsigned size() const { return TasksByPid.size(); } |
| |
| void add(std::unique_ptr<Task> T) { TasksByPid[T->getPid()] = std::move(T); } |
| |
| Task &findTaskForFd(const int fd) { |
| auto predicate = [&fd](PidToTaskMap::value_type &value) -> bool { |
| return value.second->getPipe() == fd; |
| }; |
| auto iter = std::find_if(TasksByPid.begin(), TasksByPid.end(), predicate); |
| assert(iter != TasksByPid.end() && |
| "All outstanding fds must be associated with a Task"); |
| return *iter->second; |
| } |
| |
| void destroyTask(Task &T) { TasksByPid.erase(T.getPid()); } |
| }; |
| |
| /// Concurrently execute the tasks in the TaskQueue, collecting the outputs from |
| /// each task. |
| /// Maintain invarients connecting tasks to execute, tasks currently executing, |
| /// and fds being polled. These invarients include: |
| /// A task is not in both TasksToBeExecuted and TasksBeingExecuted, |
| /// A task is executing iff it is in TasksBeingExecuted, |
| /// A task is executing iff any of its fds being polled are in FdsBeingPolled |
| /// (These should be all of its output fds, but today is only stdout.) |
| /// When a task has finished executing, wait for it to die, takes |
| /// action appropriate to the cause of death, then reclaim its |
| /// storage. |
| class TaskMonitor { |
| std::queue<std::unique_ptr<Task>> &TasksToBeExecuted; |
| TaskMap TasksBeingExecuted; |
| |
| std::vector<struct pollfd> FdsBeingPolled; |
| |
| const unsigned MaxNumberOfParallelTasks; |
| |
| public: |
| struct Callbacks { |
| const TaskQueue::TaskBeganCallback TaskBegan; |
| const TaskQueue::TaskFinishedCallback TaskFinished; |
| const TaskQueue::TaskSignalledCallback TaskSignalled; |
| const std::function<void()> PolledAnFd; |
| }; |
| |
| private: |
| Callbacks callbacks; |
| |
| public: |
| TaskMonitor(std::queue<std::unique_ptr<Task>> &TasksToBeExecuted, |
| const unsigned NumberOfParallelTasks, const Callbacks &callbacks) |
| : TasksToBeExecuted(TasksToBeExecuted), |
| MaxNumberOfParallelTasks( |
| NumberOfParallelTasks == 0 ? 1 : NumberOfParallelTasks), |
| callbacks(callbacks) {} |
| |
| /// Run the tasks to be executed. |
| /// \return true on error. |
| bool executeTasks(); |
| |
| private: |
| bool isFinishedExecutingTasks() const { |
| return TasksBeingExecuted.empty() && TasksToBeExecuted.empty(); |
| } |
| |
| /// Start up tasks if we aren't already at the parallel limit, and no earlier |
| /// subtasks have failed. |
| /// \return true on error. |
| bool startUpSomeTasks(); |
| |
| /// \return true on error. |
| bool beginExecutingATask(Task &T); |
| |
| /// Enter the task and its outputs in this TaskMonitor's data structures so |
| /// it can be polled. |
| void startPollingFdsOfTask(const Task &T); |
| |
| void stopPolling(ArrayRef<int> FinishedFds); |
| |
| enum class PollResult { HardError, SoftError, NoError }; |
| PollResult pollTheFds(); |
| |
| /// \return None on error. |
| Optional<std::vector<int>> readFromReadyFdsReturningFinishedOnes(); |
| |
| /// Ensure that events bits returned from polling are what's expected. |
| void verifyEvents(short events) const; |
| |
| void readDataIfAvailable(short events, int fd, Task &T) const; |
| |
| bool didTaskHangup(short events) const; |
| }; |
| |
| bool TaskMonitor::executeTasks() { |
| while (!isFinishedExecutingTasks()) { |
| if (startUpSomeTasks()) |
| return true; |
| |
| switch (pollTheFds()) { |
| case PollResult::HardError: |
| return true; |
| case PollResult::SoftError: |
| continue; |
| case PollResult::NoError: |
| break; |
| } |
| Optional<std::vector<int>> FinishedFds = |
| readFromReadyFdsReturningFinishedOnes(); |
| if (!FinishedFds) |
| return true; |
| |
| stopPolling(*FinishedFds); |
| } |
| return false; |
| } |
| |
| bool TaskMonitor::startUpSomeTasks() { |
| while (!TasksToBeExecuted.empty() && |
| TasksBeingExecuted.size() < MaxNumberOfParallelTasks) { |
| std::unique_ptr<Task> T(TasksToBeExecuted.front().release()); |
| TasksToBeExecuted.pop(); |
| if (beginExecutingATask(*T)) |
| return true; |
| startPollingFdsOfTask(*T); |
| TasksBeingExecuted.add(std::move(T)); |
| } |
| return false; |
| } |
| |
| void TaskMonitor::startPollingFdsOfTask(const Task &T) { |
| FdsBeingPolled.push_back({T.getPipe(), POLLIN | POLLPRI | POLLHUP, 0}); |
| // We should also poll T->getErrorPipe(), but this introduces timing |
| // issues with shutting down the task after reading getPipe(). |
| } |
| |
| TaskMonitor::PollResult TaskMonitor::pollTheFds() { |
| assert(!FdsBeingPolled.empty() && |
| "We should only call poll() if we have fds to watch!"); |
| int ReadyFdCount = poll(FdsBeingPolled.data(), FdsBeingPolled.size(), -1); |
| if (callbacks.PolledAnFd) |
| callbacks.PolledAnFd(); |
| if (ReadyFdCount != -1) |
| return PollResult::NoError; |
| return errno == EAGAIN || errno == EINTR ? PollResult::SoftError |
| : PollResult::HardError; |
| } |
| |
| bool TaskMonitor::beginExecutingATask(Task &T) { |
| if (T.execute()) |
| return true; |
| if (callbacks.TaskBegan) |
| callbacks.TaskBegan(T.getPid(), T.getContext()); |
| return false; |
| } |
| |
| static bool |
| cleanUpAHungUpTask(Task &T, |
| const TaskQueue::TaskFinishedCallback FinishedCallback, |
| TaskQueue::TaskSignalledCallback SignalledCallback); |
| |
| /** |
| Wait for the process with a given pid to finish. |
| |
| @param pidToWaitFor the pid of the process to wait for |
| @return Status information of the wait call and information about process |
| */ |
| static std::pair<Optional<int>, TaskProcessInformation> waitForPid(const pid_t pidToWaitFor); |
| static bool |
| cleanUpAfterSignal(int Status, const Task &T, TaskProcessInformation ProcInfo, |
| const TaskQueue::TaskSignalledCallback SignalledCallback); |
| static bool |
| cleanUpAfterExit(int Status, const Task &T, TaskProcessInformation ProcInfo, |
| const TaskQueue::TaskFinishedCallback FinishedCallback); |
| |
| Optional<std::vector<int>> |
| TaskMonitor::readFromReadyFdsReturningFinishedOnes() { |
| std::vector<int> finishedFds; |
| for (struct pollfd &fd : FdsBeingPolled) { |
| const int fileDes = fd.fd; |
| const short receivedEvents = fd.revents; |
| fd.revents = 0; |
| verifyEvents(receivedEvents); |
| Task &T = TasksBeingExecuted.findTaskForFd(fileDes); |
| readDataIfAvailable(receivedEvents, fileDes, T); |
| if (!didTaskHangup(receivedEvents)) |
| continue; |
| finishedFds.push_back(fileDes); |
| const bool hadError = |
| cleanUpAHungUpTask(T, callbacks.TaskFinished, callbacks.TaskSignalled); |
| TasksBeingExecuted.destroyTask(T); |
| if (hadError) |
| return None; |
| } |
| return finishedFds; |
| } |
| |
| void TaskMonitor::verifyEvents(const short events) const { |
| // We passed an invalid fd; this should never happen, |
| // since we always mark fds as finished after calling |
| // Task::finishExecution() (which closes the Task's fd). |
| assert((events & POLLNVAL) == 0 && "Asked poll() to watch a closed fd"); |
| const short expectedEvents = POLLIN | POLLPRI | POLLHUP | POLLERR; |
| assert((events & ~expectedEvents) == 0 && "Received unexpected event"); |
| } |
| |
| void TaskMonitor::readDataIfAvailable(const short events, const int fd, |
| Task &T) const { |
| if (events & (POLLIN | POLLPRI)) { |
| // There's data available to read. Read _some_ of it here, but not |
| // necessarily _all_, since the pipe is in blocking mode and we might |
| // have other input pending (or soon -- before this subprocess is done |
| // writing) from other subprocesses. |
| // |
| // FIXME: longer term, this should probably either be restructured to |
| // use O_NONBLOCK, or at very least poll the stderr file descriptor as |
| // well; the whole loop here is a bit of a mess. |
| T.readFromPipes(/*UntilEnd*/ false); |
| } |
| } |
| |
| bool TaskMonitor::didTaskHangup(const short events) const { |
| return (events & (POLLHUP | POLLERR)) != 0; |
| } |
| |
| static bool |
| cleanUpAHungUpTask(Task &T, |
| const TaskQueue::TaskFinishedCallback FinishedCallback, |
| const TaskQueue::TaskSignalledCallback SignalledCallback) { |
| const auto StatusAndProcessInformation = waitForPid(T.getPid()); |
| if (!StatusAndProcessInformation.first) |
| return true; |
| |
| T.finishExecution(); |
| int Status = *(StatusAndProcessInformation.first); |
| TaskProcessInformation ProcInfo = StatusAndProcessInformation.second; |
| return WIFEXITED(Status) |
| ? cleanUpAfterExit(Status, T, ProcInfo, FinishedCallback) |
| : WIFSIGNALED(Status) |
| ? cleanUpAfterSignal(Status, T, ProcInfo, SignalledCallback) |
| : false /* Can this case ever happen? */; |
| } |
| |
| static std::pair<Optional<int>, TaskProcessInformation> waitForPid(const pid_t pidToWaitFor) { |
| for (;;) { |
| int Status = 0; |
| |
| #if defined(HAVE_GETRUSAGE) && !defined(__HAIKU__) && defined(HAVE_WAIT4) |
| struct rusage Usage; |
| const pid_t pidFromWait = wait4(pidToWaitFor, &Status, 0, &Usage); |
| TaskProcessInformation ProcInfo(pidToWaitFor, Usage); |
| #else |
| const pid_t pidFromWait = waitpid(pidToWaitFor, &Status, 0); |
| TaskProcessInformation ProcInfo(pidToWaitFor); |
| #endif |
| |
| if (pidFromWait == pidToWaitFor) |
| return std::make_pair(Status, ProcInfo); |
| assert(pidFromWait == -1 && |
| "Did not pass WNOHANG, should only get pidToWaitFor or -1"); |
| if (errno == ECHILD || errno == EINVAL) |
| return std::make_pair(None, TaskProcessInformation(pidToWaitFor)); |
| } |
| } |
| |
| static bool |
| cleanUpAfterExit(int Status, const Task &T, TaskProcessInformation ProcInfo, |
| const TaskQueue::TaskFinishedCallback FinishedCallback) { |
| const int Result = WEXITSTATUS(Status); |
| if (!FinishedCallback) { |
| // Since we don't have a TaskFinishedCallback, treat a subtask |
| // which returned a nonzero exit code as having failed. |
| return Result != 0; |
| } |
| // If we have a TaskFinishedCallback, only have an error if the callback |
| // returns StopExecution. |
| return TaskFinishedResponse::StopExecution == |
| FinishedCallback(T.getPid(), Result, T.getOutput(), T.getErrors(), ProcInfo, |
| T.getContext()); |
| } |
| |
| static bool |
| cleanUpAfterSignal(int Status, const Task &T, TaskProcessInformation ProcInfo, |
| const TaskQueue::TaskSignalledCallback SignalledCallback) { |
| // The process exited due to a signal. |
| const int Signal = WTERMSIG(Status); |
| StringRef ErrorMsg = strsignal(Signal); |
| |
| if (!SignalledCallback) { |
| // Since we don't have a TaskCrashedCallback, treat a crashing |
| // subtask as having failed. |
| return true; |
| } |
| // If we have a TaskCrashedCallback, only return an error if the callback |
| // returns StopExecution. |
| return TaskFinishedResponse::StopExecution == |
| SignalledCallback(T.getPid(), ErrorMsg, T.getOutput(), T.getErrors(), |
| T.getContext(), Signal, ProcInfo); |
| } |
| |
| void TaskMonitor::stopPolling(ArrayRef<int> FinishedFds) { |
| // Remove any fds which we've closed from FdsBeingPolled. |
| for (int fd : FinishedFds) { |
| auto predicate = [&fd](struct pollfd &i) { return i.fd == fd; }; |
| auto iter = |
| std::find_if(FdsBeingPolled.begin(), FdsBeingPolled.end(), predicate); |
| assert(iter != FdsBeingPolled.end() && |
| "The finished fd must be in FdsBeingPolled!"); |
| FdsBeingPolled.erase(iter); |
| } |
| } |
| |
| bool TaskQueue::execute(TaskBeganCallback BeganCallback, |
| TaskFinishedCallback FinishedCallback, |
| TaskSignalledCallback SignalledCallback) { |
| TaskMonitor::Callbacks callbacks{ |
| BeganCallback, FinishedCallback, SignalledCallback, [&] { |
| if (Stats) |
| ++Stats->getDriverCounters().NumDriverPipePolls; |
| }}; |
| |
| TaskMonitor TE(QueuedTasks, getNumberOfParallelTasks(), callbacks); |
| return TE.executeTasks(); |
| } |