| //===--- TaskQueue.inc - Unix-specific TaskQueue ----------------*- C++ -*-===// |
| // |
| // This source file is part of the Swift.org open source project |
| // |
| // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors |
| // Licensed under Apache License v2.0 with Runtime Library Exception |
| // |
| // See https://swift.org/LICENSE.txt for license information |
| // See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors |
| // |
| //===----------------------------------------------------------------------===// |
| |
| #include "swift/Basic/TaskQueue.h" |
| |
| #include "llvm/ADT/StringRef.h" |
| #include "llvm/ADT/DenseMap.h" |
| #include "llvm/ADT/DenseSet.h" |
| #include "llvm/Support/ErrorHandling.h" |
| |
| #include <string> |
| #include <cerrno> |
| |
| #if HAVE_POSIX_SPAWN |
| #include <spawn.h> |
| #endif |
| |
| #if HAVE_UNISTD_H |
| #include <unistd.h> |
| #endif |
| |
| #include <poll.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| |
| #if !defined(__APPLE__) |
| extern char **environ; |
| #else |
| #include <crt_externs.h> // for _NSGetEnviron |
| #endif |
| |
| namespace swift { |
| namespace sys { |
| |
| class Task { |
| /// The path to the executable which this Task will execute. |
| const char *ExecPath; |
| |
| /// Any arguments which should be passed during execution. |
| ArrayRef<const char *> Args; |
| |
| /// The environment which will be used during execution. If empty, uses |
| /// this process's environment. |
| ArrayRef<const char *> Env; |
| |
| /// Context which should be associated with this task. |
| void *Context; |
| |
| /// The pid of this Task when executing. |
| pid_t Pid; |
| |
| /// A pipe for reading output from the child process. |
| int Pipe; |
| |
| /// The current state of the Task. |
| enum { |
| Preparing, |
| Executing, |
| Finished |
| } State; |
| |
| /// Once the Task has finished, this contains the buffered output of the Task. |
| std::string Output; |
| |
| public: |
| Task(const char *ExecPath, ArrayRef<const char *> Args, |
| ArrayRef<const char *> Env, void *Context) |
| : ExecPath(ExecPath), Args(Args), Env(Env), Context(Context), |
| Pid(-1), Pipe(-1), State(Preparing) { |
| assert((Env.empty() || Env.back() == nullptr) && |
| "Env must either be empty or null-terminated!"); |
| } |
| |
| const char *getExecPath() const { return ExecPath; } |
| ArrayRef<const char *> getArgs() const { return Args; } |
| StringRef getOutput() const { return Output; } |
| void *getContext() const { return Context; } |
| pid_t getPid() const { return Pid; } |
| int getPipe() const { return Pipe; } |
| |
| /// \brief Begins execution of this Task. |
| /// \returns true on error, false on success |
| bool execute(); |
| |
| /// \brief Reads data from the pipe, if any is available. |
| /// \returns true on error, false on success |
| bool readFromPipe(); |
| |
| /// \brief Performs any post-execution work for this Task, such as reading |
| /// piped output and closing the pipe. |
| void finishExecution(); |
| }; |
| |
| } // end namespace sys |
| } // end namespace swift |
| |
| bool Task::execute() { |
| assert(State < Executing && "This Task cannot be executed twice!"); |
| State = Executing; |
| |
| // Construct argv. |
| SmallVector<const char *, 128> Argv; |
| Argv.push_back(ExecPath); |
| Argv.append(Args.begin(), Args.end()); |
| Argv.push_back(0); // argv is expected to be null-terminated. |
| |
| // Set up the pipe. |
| int FullPipe[2]; |
| pipe(FullPipe); |
| Pipe = FullPipe[0]; |
| |
| // Get the environment to pass down to the subtask. |
| const char *const *envp = Env.empty() ? nullptr : Env.data(); |
| if (!envp) { |
| #if __APPLE__ |
| envp = *_NSGetEnviron(); |
| #else |
| envp = environ; |
| #endif |
| } |
| |
| const char **argvp = Argv.data(); |
| |
| #if HAVE_POSIX_SPAWN |
| posix_spawn_file_actions_t FileActions; |
| posix_spawn_file_actions_init(&FileActions); |
| |
| posix_spawn_file_actions_adddup2(&FileActions, FullPipe[1], STDOUT_FILENO); |
| posix_spawn_file_actions_adddup2(&FileActions, STDOUT_FILENO, STDERR_FILENO); |
| posix_spawn_file_actions_addclose(&FileActions, FullPipe[0]); |
| |
| // Spawn the subtask. |
| int spawnErr = posix_spawn(&Pid, ExecPath, &FileActions, nullptr, |
| const_cast<char **>(argvp), |
| const_cast<char **>(envp)); |
| |
| posix_spawn_file_actions_destroy(&FileActions); |
| close(FullPipe[1]); |
| |
| if (spawnErr != 0 || Pid == 0) { |
| close(FullPipe[0]); |
| State = Finished; |
| return true; |
| } |
| #else |
| Pid = fork(); |
| switch (Pid) { |
| case -1: { |
| close(FullPipe[0]); |
| State = Finished; |
| Pid = 0; |
| break; |
| } |
| case 0: { |
| // Child process: Execute the program. |
| dup2(FullPipe[1], STDOUT_FILENO); |
| dup2(STDOUT_FILENO, STDERR_FILENO); |
| close(FullPipe[0]); |
| execve(ExecPath, const_cast<char **>(argvp), const_cast<char **>(envp)); |
| |
| // If the execve() failed, we should exit. Follow Unix protocol and |
| // return 127 if the executable was not found, and 126 otherwise. |
| // Use _exit rather than exit so that atexit functions and static |
| // object destructors cloned from the parent process aren't |
| // redundantly run, and so that any data buffered in stdio buffers |
| // cloned from the parent aren't redundantly written out. |
| _exit(errno == ENOENT ? 127 : 126); |
| } |
| default: |
| // Parent process: Break out of the switch to do our processing. |
| break; |
| } |
| |
| close(FullPipe[1]); |
| |
| if (Pid == 0) |
| return true; |
| #endif |
| |
| return false; |
| } |
| |
| bool Task::readFromPipe() { |
| char outputBuffer[1024]; |
| ssize_t readBytes = 0; |
| while ((readBytes = read(Pipe, outputBuffer, sizeof(outputBuffer))) != 0) { |
| if (readBytes < 0) { |
| if (errno == EINTR) |
| // read() was interrupted, so try again. |
| continue; |
| return true; |
| } |
| |
| Output.append(outputBuffer, readBytes); |
| } |
| |
| return false; |
| } |
| |
| void Task::finishExecution() { |
| assert(State == Executing && |
| "This Task must be executing to finish execution!"); |
| |
| State = Finished; |
| |
| // Read the output of the command, so we can use it later. |
| readFromPipe(); |
| |
| close(Pipe); |
| } |
| |
| bool TaskQueue::supportsBufferingOutput() { |
| // The Unix implementation supports buffering output. |
| return true; |
| } |
| |
| bool TaskQueue::supportsParallelExecution() { |
| // The Unix implementation supports parallel execution. |
| return true; |
| } |
| |
| unsigned TaskQueue::getNumberOfParallelTasks() const { |
| // TODO: add support for choosing a better default value for |
| // MaxNumberOfParallelTasks if NumberOfParallelTasks is 0. (Optimally, this |
| // should choose a value > 1 tailored to the current system.) |
| return NumberOfParallelTasks > 0 ? NumberOfParallelTasks : 1; |
| } |
| |
| void TaskQueue::addTask(const char *ExecPath, ArrayRef<const char *> Args, |
| ArrayRef<const char *> Env, void *Context) { |
| std::unique_ptr<Task> T(new Task(ExecPath, Args, Env, Context)); |
| QueuedTasks.push(std::move(T)); |
| } |
| |
| bool TaskQueue::execute(TaskBeganCallback Began, TaskFinishedCallback Finished, |
| TaskSignalledCallback Signalled) { |
| typedef llvm::DenseMap<pid_t, std::unique_ptr<Task>> PidToTaskMap; |
| |
| // Stores the current executing Tasks, organized by pid. |
| PidToTaskMap ExecutingTasks; |
| |
| // Maintains the current fds we're checking with poll. |
| std::vector<struct pollfd> PollFds; |
| |
| bool SubtaskFailed = false; |
| |
| unsigned MaxNumberOfParallelTasks = getNumberOfParallelTasks(); |
| |
| if (MaxNumberOfParallelTasks == 0) |
| MaxNumberOfParallelTasks = 1; |
| |
| while ((!QueuedTasks.empty() && !SubtaskFailed) || |
| !ExecutingTasks.empty()) { |
| // Enqueue additional tasks, if we have additional tasks, we aren't |
| // already at the parallel limit, and no earlier subtasks have failed. |
| while (!SubtaskFailed && !QueuedTasks.empty() && |
| ExecutingTasks.size() < MaxNumberOfParallelTasks) { |
| std::unique_ptr<Task> T(QueuedTasks.front().release()); |
| QueuedTasks.pop(); |
| if (T->execute()) |
| return true; |
| |
| pid_t Pid = T->getPid(); |
| |
| if (Began) { |
| Began(Pid, T->getContext()); |
| } |
| |
| PollFds.push_back({ T->getPipe(), POLLIN | POLLPRI | POLLHUP, 0 }); |
| ExecutingTasks[Pid] = std::move(T); |
| } |
| |
| assert(PollFds.size() > 0 && |
| "We should only call poll() if we have fds to watch!"); |
| int ReadyFdCount = poll(PollFds.data(), PollFds.size(), -1); |
| if (ReadyFdCount == -1) { |
| // Recover from error, if possible. |
| if (errno == EAGAIN || errno == EINTR) |
| continue; |
| return true; |
| } |
| |
| // Holds all fds which have finished during this loop iteration. |
| std::vector<int> FinishedFds; |
| |
| for (struct pollfd &fd : PollFds) { |
| if (fd.revents & POLLIN || fd.revents & POLLPRI || fd.revents & POLLHUP || |
| fd.revents & POLLERR) { |
| // An event which we care about occurred. Find the appropriate Task. |
| auto predicate = [&fd] (PidToTaskMap::value_type &value) -> bool { |
| return value.second->getPipe() == fd.fd; |
| }; |
| |
| auto iter = std::find_if(ExecutingTasks.begin(), ExecutingTasks.end(), |
| predicate); |
| assert(iter != ExecutingTasks.end() && |
| "All outstanding fds must be associated with an executing Task"); |
| Task &T = *iter->second; |
| if (fd.revents & POLLIN || fd.revents & POLLPRI) { |
| // There's data available to read. |
| T.readFromPipe(); |
| } |
| |
| if (fd.revents & POLLHUP || fd.revents & POLLERR) { |
| // This fd was "hung up" or had an error, so we need to wait for the |
| // Task and then clean up. |
| pid_t Pid; |
| int Status; |
| do { |
| Status = 0; |
| Pid = waitpid(T.getPid(), &Status, 0); |
| assert(Pid != 0 && |
| "We do not pass WNOHANG, so we should always get a pid"); |
| if (Pid < 0 && (errno == ECHILD || errno == EINVAL)) |
| return true; |
| } while (Pid < 0); |
| |
| assert(Pid == T.getPid() && |
| "We asked to wait for this Task, but we got another Pid!"); |
| |
| T.finishExecution(); |
| |
| if (WIFEXITED(Status)) { |
| int Result = WEXITSTATUS(Status); |
| |
| if (Finished) { |
| // If we have a TaskFinishedCallback, only set SubtaskFailed to |
| // true if the callback returns StopExecution. |
| SubtaskFailed = Finished(T.getPid(), Result, T.getOutput(), |
| T.getContext()) == |
| TaskFinishedResponse::StopExecution; |
| } else if (Result != 0) { |
| // Since we don't have a TaskFinishedCallback, treat a subtask |
| // which returned a nonzero exit code as having failed. |
| SubtaskFailed = true; |
| } |
| } else if (WIFSIGNALED(Status)) { |
| // The process exited due to a signal. |
| int Signal = WTERMSIG(Status); |
| |
| StringRef ErrorMsg = strsignal(Signal); |
| |
| if (Signalled) { |
| TaskFinishedResponse Response = Signalled(T.getPid(), ErrorMsg, |
| T.getOutput(), |
| T.getContext()); |
| if (Response == TaskFinishedResponse::StopExecution) |
| // If we have a TaskCrashedCallback, only set SubtaskFailed to |
| // true if the callback returns StopExecution. |
| SubtaskFailed = true; |
| } else { |
| // Since we don't have a TaskCrashedCallback, treat a crashing |
| // subtask as having failed. |
| SubtaskFailed = true; |
| } |
| } |
| |
| ExecutingTasks.erase(Pid); |
| FinishedFds.push_back(fd.fd); |
| } |
| } else if (fd.revents & POLLNVAL) { |
| // We passed an invalid fd; this should never happen, |
| // since we always mark fds as finished after calling |
| // Task::finishExecution() (which closes the Task's fd). |
| llvm_unreachable("Asked poll() to watch a closed fd"); |
| } |
| |
| fd.revents = 0; |
| } |
| |
| // Remove any fds which we've closed from PollFds. |
| for (int fd : FinishedFds) { |
| auto predicate = [&fd] (struct pollfd &i) { |
| return i.fd == fd; |
| }; |
| |
| auto iter = std::find_if(PollFds.begin(), PollFds.end(), predicate); |
| assert(iter != PollFds.end() && "The finished fd must be in PollFds!"); |
| PollFds.erase(iter); |
| } |
| } |
| |
| return SubtaskFailed; |
| } |