blob: 55393377a8003f05b79376aa52c8cbddb5cf9563 [file] [log] [blame]
//===-- LaneBasedExecutionQueue.cpp ---------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2015 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
#include "llbuild/BuildSystem/BuildExecutionQueue.h"
#include "llbuild/Basic/LLVM.h"
#include "llvm/ADT/ArrayRef.h"
#include "llvm/ADT/SmallString.h"
#include "llvm/ADT/STLExtras.h"
#include "llvm/ADT/Twine.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/Program.h"
#include <atomic>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <memory>
#include <thread>
#include <vector>
#include <string>
#include <unordered_set>
#include <fcntl.h>
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
#include <spawn.h>
#include <sys/wait.h>
using namespace llbuild;
using namespace llbuild::buildsystem;
extern "C" {
extern char **environ;
}
namespace {
struct LaneBasedExecutionQueueJobContext {
QueueJob& job;
};
/// Build execution queue.
//
// FIXME: Consider trying to share this with the Ninja implementation.
class LaneBasedExecutionQueue : public BuildExecutionQueue {
/// The number of lanes the queue was configured with.
unsigned numLanes;
/// A thread for each lane.
std::vector<std::unique_ptr<std::thread>> lanes;
/// The ready queue of jobs to execute.
std::deque<QueueJob> readyJobs;
std::mutex readyJobsMutex;
std::condition_variable readyJobsCondition;
/// The set of spawned processes to terminate if we get cancelled.
std::unordered_set<pid_t> spawnedProcesses;
std::mutex spawnedProcessesMutex;
/// Management of cancellation and SIGKILL escalation
std::unique_ptr<std::thread> killAfterTimeoutThread = nullptr;
std::atomic<bool> cancelled { false };
std::condition_variable stopKillingCondition;
std::mutex stopKillingMutex;
void executeLane(unsigned laneNumber) {
// Set the thread name, if available.
#if defined(__APPLE__)
pthread_setname_np(
(llvm::Twine("org.swift.llbuild Lane-") +
llvm::Twine(laneNumber)).str().c_str());
#elif defined(__linux__)
pthread_setname_np(
pthread_self(),
(llvm::Twine("org.swift.llbuild Lane-") +
llvm::Twine(laneNumber)).str().c_str());
#endif
// Execute items from the queue until shutdown.
while (!cancelled) {
// Take a job from the ready queue.
QueueJob job{};
{
std::unique_lock<std::mutex> lock(readyJobsMutex);
// While the queue is empty, wait for an item.
while (readyJobs.empty()) {
readyJobsCondition.wait(lock);
if (cancelled) {
return;
}
}
// Take an item according to the chosen policy.
job = readyJobs.front();
readyJobs.pop_front();
}
// If we got an empty job, the queue is shutting down.
if (!job.getForCommand())
break;
// Process the job.
LaneBasedExecutionQueueJobContext context{ job };
getDelegate().commandJobStarted(job.getForCommand());
job.execute(reinterpret_cast<QueueJobContext*>(&context));
getDelegate().commandJobFinished(job.getForCommand());
}
}
void killAfterTimeout() {
std::unique_lock<std::mutex> lock(stopKillingMutex);
stopKillingCondition.wait_for(lock, std::chrono::seconds(10));
sendSignalToProcesses(SIGKILL);
}
void sendSignalToProcesses(int signal) {
std::unique_lock<std::mutex> lock(spawnedProcessesMutex);
for (pid_t pid: spawnedProcesses) {
// We are killing the whole process group here, this depends on us spawning each process in its own group earlier
::kill(-pid, signal);
}
}
public:
LaneBasedExecutionQueue(BuildExecutionQueueDelegate& delegate,
unsigned numLanes)
: BuildExecutionQueue(delegate), numLanes(numLanes)
{
for (unsigned i = 0; i != numLanes; ++i) {
lanes.push_back(std::unique_ptr<std::thread>(
new std::thread(
&LaneBasedExecutionQueue::executeLane, this, i)));
}
}
virtual ~LaneBasedExecutionQueue() {
// Shut down the lanes.
cancelled = true;
readyJobsCondition.notify_all();
for (unsigned i = 0; i != numLanes; ++i) {
lanes[i]->join();
}
if (killAfterTimeoutThread) {
stopKillingCondition.notify_all();
killAfterTimeoutThread->join();
}
}
virtual void addJob(QueueJob job) override {
if (cancelled) {
// FIXME: We should eventually raise an error here as new work should not be enqueued after cancellation
return;
}
std::lock_guard<std::mutex> guard(readyJobsMutex);
readyJobs.push_back(job);
readyJobsCondition.notify_one();
}
virtual void cancelAllJobs() override {
auto wasAlreadyCancelled = cancelled.exchange(true);
// If we were already cancelled, do nothing.
if (wasAlreadyCancelled) {
return;
}
readyJobsCondition.notify_all();
sendSignalToProcesses(SIGINT);
killAfterTimeoutThread = llvm::make_unique<std::thread>(&LaneBasedExecutionQueue::killAfterTimeout, this);
}
virtual bool
executeProcess(QueueJobContext* opaqueContext,
ArrayRef<StringRef> commandLine,
ArrayRef<std::pair<StringRef,
StringRef>> environment) override {
// Assign a process handle, which just needs to be unique for as long as we
// are communicating with the delegate.
struct BuildExecutionQueueDelegate::ProcessHandle handle;
handle.id = reinterpret_cast<uintptr_t>(&handle);
// Whether or not we are capturing output.
const bool shouldCaptureOutput = true;
LaneBasedExecutionQueueJobContext& context =
*reinterpret_cast<LaneBasedExecutionQueueJobContext*>(opaqueContext);
getDelegate().commandProcessStarted(context.job.getForCommand(), handle);
// Initialize the spawn attributes.
posix_spawnattr_t attributes;
posix_spawnattr_init(&attributes);
// Unmask all signals.
sigset_t noSignals;
sigemptyset(&noSignals);
posix_spawnattr_setsigmask(&attributes, &noSignals);
// Reset all signals to default behavior.
//
// On Linux, this can only be used to reset signals that are legal to
// modify, so we have to take care about the set we use.
#if defined(__linux__)
sigset_t mostSignals;
sigemptyset(&mostSignals);
for (int i = 1; i < SIGUNUSED; ++i) {
if (i == SIGKILL || i == SIGSTOP) continue;
sigaddset(&mostSignals, i);
}
posix_spawnattr_setsigdefault(&attributes, &mostSignals);
#else
sigset_t mostSignals;
sigfillset(&mostSignals);
sigdelset(&mostSignals, SIGKILL);
sigdelset(&mostSignals, SIGSTOP);
posix_spawnattr_setsigdefault(&attributes, &mostSignals);
#endif
// Establish a separate process group.
posix_spawnattr_setpgroup(&attributes, 0);
// Set the attribute flags.
unsigned flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF;
flags |= POSIX_SPAWN_SETPGROUP;
// Close all other files by default.
//
// FIXME: Note that this is an Apple-specific extension, and we will have to
// do something else on other platforms (and unfortunately, there isn't
// really an easy answer other than using a stub executable).
#ifdef __APPLE__
flags |= POSIX_SPAWN_CLOEXEC_DEFAULT;
#endif
posix_spawnattr_setflags(&attributes, flags);
// Setup the file actions.
posix_spawn_file_actions_t fileActions;
posix_spawn_file_actions_init(&fileActions);
// Open /dev/null as stdin.
posix_spawn_file_actions_addopen(
&fileActions, 0, "/dev/null", O_RDONLY, 0);
// If we are capturing output, create a pipe and appropriate spawn actions.
int outputPipe[2]{ -1, -1 };
if (shouldCaptureOutput) {
if (::pipe(outputPipe) < 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to open output pipe (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(),
handle, -1);
return false;
}
// Open the write end of the pipe as stdout and stderr.
posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 1);
posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 2);
// Close the read and write ends of the pipe.
posix_spawn_file_actions_addclose(&fileActions, outputPipe[0]);
posix_spawn_file_actions_addclose(&fileActions, outputPipe[1]);
} else {
// Otherwise, propagate the current stdout/stderr.
posix_spawn_file_actions_adddup2(&fileActions, 1, 1);
posix_spawn_file_actions_adddup2(&fileActions, 2, 2);
}
// Form the complete C-string command line.
std::vector<std::string> argsStorage(
commandLine.begin(), commandLine.end());
std::vector<const char*> args(argsStorage.size() + 1);
for (size_t i = 0; i != argsStorage.size(); ++i) {
args[i] = argsStorage[i].c_str();
}
args[argsStorage.size()] = nullptr;
// Form the complete environment.
std::vector<std::string> envStorage;
for (const auto& entry: environment) {
SmallString<256> assignment;
assignment += entry.first;
assignment += '=';
assignment += entry.second;
assignment += '\0';
envStorage.emplace_back(assignment.str());
}
std::vector<const char*> env(environment.size() + 1);
char* const* envp = nullptr;
if (environment.empty()) {
envp = ::environ;
} else {
for (size_t i = 0; i != envStorage.size(); ++i) {
env[i] = envStorage[i].c_str();
}
env[envStorage.size()] = nullptr;
envp = const_cast<char**>(env.data());
}
// Resolve the executable path, if necessary.
//
// FIXME: This should be cached.
if (!llvm::sys::path::is_absolute(args[0])) {
auto res = llvm::sys::findProgramByName(args[0]);
if (!res.getError()) {
argsStorage[0] = *res;
args[0] = argsStorage[0].c_str();
}
}
// Spawn the command.
pid_t pid;
{
// We need to hold the spawn processes lock when we spawn, to ensure that
// we don't create a process in between when we are cancelled.
std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
if (posix_spawn(&pid, args[0], /*file_actions=*/&fileActions,
/*attrp=*/&attributes, const_cast<char**>(args.data()),
envp) != 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to spawn process (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
-1);
return false;
}
spawnedProcesses.insert(pid);
}
posix_spawn_file_actions_destroy(&fileActions);
posix_spawnattr_destroy(&attributes);
// Read the command output, if capturing.
SmallString<1024> outputData;
if (shouldCaptureOutput) {
// Close the write end of the output pipe.
::close(outputPipe[1]);
// Read all the data from the output pipe.
while (true) {
char buf[4096];
ssize_t numBytes = read(outputPipe[0], buf, sizeof(buf));
if (numBytes < 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to read process output (") + strerror(errno) + ")");
break;
}
if (numBytes == 0)
break;
outputData.insert(outputData.end(), &buf[0], &buf[numBytes]);
}
// Close the read end of the pipe.
::close(outputPipe[0]);
}
// Wait for the command to complete.
int status, result = waitpid(pid, &status, 0);
while (result == -1 && errno == EINTR)
result = waitpid(pid, &status, 0);
// Update the set of spawned processes.
{
std::lock_guard<std::mutex> guard(spawnedProcessesMutex);
spawnedProcesses.erase(pid);
}
if (result == -1) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to wait for process (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
-1);
return false;
}
// Notify the client of the output, if buffering.
if (shouldCaptureOutput) {
getDelegate().commandProcessHadOutput(context.job.getForCommand(), handle,
outputData);
}
// Notify of the process completion.
//
// FIXME: Need to communicate more information on the process exit status.
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
status);
return (status == 0);
}
};
}
BuildExecutionQueue*
llbuild::buildsystem::createLaneBasedExecutionQueue(
BuildExecutionQueueDelegate& delegate, int numLanes) {
return new LaneBasedExecutionQueue(delegate, numLanes);
}