blob: 69bea5120610d3ff9f3c50e2e3bb303e906c095b [file] [log] [blame]
//===-- LaneBasedExecutionQueue.cpp ---------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2015 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
#include "llbuild/BuildSystem/BuildExecutionQueue.h"
#include "llbuild/Basic/LLVM.h"
#include "llvm/ADT/ArrayRef.h"
#include "llvm/ADT/SmallString.h"
#include "llvm/Support/Path.h"
#include "llvm/Support/Program.h"
#include <atomic>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <memory>
#include <thread>
#include <vector>
#include <string>
#include <fcntl.h>
#include <unistd.h>
#include <signal.h>
#include <spawn.h>
#include <sys/wait.h>
using namespace llbuild;
using namespace llbuild::buildsystem;
extern "C" {
extern char **environ;
}
namespace {
struct LaneBasedExecutionQueueJobContext {
QueueJob& job;
};
/// Build execution queue.
//
// FIXME: Consider trying to share this with the Ninja implementation.
class LaneBasedExecutionQueue : public BuildExecutionQueue {
/// The number of lanes the queue was configured with.
unsigned numLanes;
/// A thread for each lane.
std::vector<std::unique_ptr<std::thread>> lanes;
/// The ready queue of jobs to execute.
std::deque<QueueJob> readyJobs;
std::mutex readyJobsMutex;
std::condition_variable readyJobsCondition;
void executeLane(unsigned laneNumber) {
// Execute items from the queue until shutdown.
while (true) {
// Take a job from the ready queue.
QueueJob job{};
{
std::unique_lock<std::mutex> lock(readyJobsMutex);
// While the queue is empty, wait for an item.
while (readyJobs.empty()) {
readyJobsCondition.wait(lock);
}
// Take an item according to the chosen policy.
job = readyJobs.front();
readyJobs.pop_front();
}
// If we got an empty job, the queue is shutting down.
if (!job.getForCommand())
break;
// Process the job.
LaneBasedExecutionQueueJobContext context{ job };
getDelegate().commandJobStarted(job.getForCommand());
job.execute(reinterpret_cast<QueueJobContext*>(&context));
getDelegate().commandJobFinished(job.getForCommand());
}
}
public:
LaneBasedExecutionQueue(BuildExecutionQueueDelegate& delegate,
unsigned numLanes)
: BuildExecutionQueue(delegate), numLanes(numLanes)
{
for (unsigned i = 0; i != numLanes; ++i) {
lanes.push_back(std::unique_ptr<std::thread>(
new std::thread(
&LaneBasedExecutionQueue::executeLane, this, i)));
}
}
virtual ~LaneBasedExecutionQueue() {
// Shut down the lanes.
for (unsigned i = 0; i != numLanes; ++i) {
addJob({});
}
for (unsigned i = 0; i != numLanes; ++i) {
lanes[i]->join();
}
}
virtual void addJob(QueueJob job) override {
std::lock_guard<std::mutex> guard(readyJobsMutex);
readyJobs.push_back(job);
readyJobsCondition.notify_one();
}
virtual bool
executeProcess(QueueJobContext* opaqueContext,
ArrayRef<StringRef> commandLine,
ArrayRef<std::pair<StringRef,
StringRef>> environment) override {
// Assign a process handle, which just needs to be unique for as long as we
// are communicating with the delegate.
struct BuildExecutionQueueDelegate::ProcessHandle handle;
handle.id = reinterpret_cast<uintptr_t>(&handle);
// Whether or not we are capturing output.
const bool shouldCaptureOutput = true;
LaneBasedExecutionQueueJobContext& context =
*reinterpret_cast<LaneBasedExecutionQueueJobContext*>(opaqueContext);
getDelegate().commandProcessStarted(context.job.getForCommand(), handle);
// Initialize the spawn attributes.
posix_spawnattr_t attributes;
posix_spawnattr_init(&attributes);
// Unmask all signals.
sigset_t noSignals;
sigemptyset(&noSignals);
posix_spawnattr_setsigmask(&attributes, &noSignals);
// Reset all signals to default behavior.
//
// On Linux, this can only be used to reset signals that are legal to
// modify, so we have to take care about the set we use.
#if defined(__linux__)
sigset_t mostSignals;
sigemptyset(&mostSignals);
for (int i = 1; i < SIGUNUSED; ++i) {
if (i == SIGKILL || i == SIGSTOP) continue;
sigaddset(&mostSignals, i);
}
posix_spawnattr_setsigdefault(&attributes, &mostSignals);
#else
sigset_t mostSignals;
sigfillset(&mostSignals);
sigdelset(&mostSignals, SIGKILL);
sigdelset(&mostSignals, SIGSTOP);
posix_spawnattr_setsigdefault(&attributes, &mostSignals);
#endif
// Establish a separate process group.
posix_spawnattr_setpgroup(&attributes, 0);
// Set the attribute flags.
unsigned flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF;
flags |= POSIX_SPAWN_SETPGROUP;
// Close all other files by default.
//
// FIXME: Note that this is an Apple-specific extension, and we will have to
// do something else on other platforms (and unfortunately, there isn't
// really an easy answer other than using a stub executable).
#ifdef __APPLE__
flags |= POSIX_SPAWN_CLOEXEC_DEFAULT;
#endif
posix_spawnattr_setflags(&attributes, flags);
// Setup the file actions.
posix_spawn_file_actions_t fileActions;
posix_spawn_file_actions_init(&fileActions);
// Open /dev/null as stdin.
posix_spawn_file_actions_addopen(
&fileActions, 0, "/dev/null", O_RDONLY, 0);
// If we are capturing output, create a pipe and appropriate spawn actions.
int outputPipe[2]{ -1, -1 };
if (shouldCaptureOutput) {
if (::pipe(outputPipe) < 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to open output pipe (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(),
handle, -1);
return false;
}
// Open the write end of the pipe as stdout and stderr.
posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 1);
posix_spawn_file_actions_adddup2(&fileActions, outputPipe[1], 2);
// Close the read and write ends of the pipe.
posix_spawn_file_actions_addclose(&fileActions, outputPipe[0]);
posix_spawn_file_actions_addclose(&fileActions, outputPipe[1]);
} else {
// Otherwise, propagate the current stdout/stderr.
posix_spawn_file_actions_adddup2(&fileActions, 1, 1);
posix_spawn_file_actions_adddup2(&fileActions, 2, 2);
}
// Form the complete C-string command line.
std::vector<std::string> argsStorage(
commandLine.begin(), commandLine.end());
std::vector<const char*> args(argsStorage.size() + 1);
for (size_t i = 0; i != argsStorage.size(); ++i) {
args[i] = argsStorage[i].c_str();
}
args[argsStorage.size()] = nullptr;
// Form the complete environment.
std::vector<std::string> envStorage;
for (const auto& entry: environment) {
SmallString<256> assignment;
assignment += entry.first;
assignment += '=';
assignment += entry.second;
assignment += '\0';
envStorage.emplace_back(assignment.str());
}
std::vector<const char*> env(environment.size() + 1);
char* const* envp = nullptr;
if (environment.empty()) {
envp = ::environ;
} else {
for (size_t i = 0; i != envStorage.size(); ++i) {
env[i] = envStorage[i].c_str();
}
env[envStorage.size()] = nullptr;
envp = const_cast<char**>(env.data());
}
// Resolve the executable path, if necessary.
//
// FIXME: This should be cached.
if (!llvm::sys::path::is_absolute(args[0])) {
auto res = llvm::sys::findProgramByName(args[0]);
if (!res.getError()) {
argsStorage[0] = *res;
args[0] = argsStorage[0].c_str();
}
}
// Spawn the command.
//
// FIXME: Need to track spawned processes for the purposes of cancellation.
pid_t pid;
if (posix_spawn(&pid, args[0], /*file_actions=*/&fileActions,
/*attrp=*/&attributes, const_cast<char**>(args.data()),
envp) != 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to spawn process (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
-1);
return false;
}
posix_spawn_file_actions_destroy(&fileActions);
posix_spawnattr_destroy(&attributes);
// Read the command output, if capturing.
SmallString<1024> outputData;
if (shouldCaptureOutput) {
// Close the write end of the output pipe.
::close(outputPipe[1]);
// Read all the data from the output pipe.
while (true) {
char buf[4096];
ssize_t numBytes = read(outputPipe[0], buf, sizeof(buf));
if (numBytes < 0) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to read process output (") + strerror(errno) + ")");
break;
}
if (numBytes == 0)
break;
outputData.insert(outputData.end(), &buf[0], &buf[numBytes]);
}
// Close the read end of the pipe.
::close(outputPipe[0]);
}
// Wait for the command to complete.
int status, result = waitpid(pid, &status, 0);
while (result == -1 && errno == EINTR)
result = waitpid(pid, &status, 0);
if (result == -1) {
getDelegate().commandProcessHadError(
context.job.getForCommand(), handle,
Twine("unable to wait for process (") + strerror(errno) + ")");
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
-1);
return false;
}
// Notify the client of the output, if buffering.
if (shouldCaptureOutput) {
getDelegate().commandProcessHadOutput(context.job.getForCommand(), handle,
outputData);
}
// Notify of the process completion.
//
// FIXME: Need to communicate more information on the process exit status.
getDelegate().commandProcessFinished(context.job.getForCommand(), handle,
status);
return (status == 0);
}
};
}
BuildExecutionQueue*
llbuild::buildsystem::createLaneBasedExecutionQueue(
BuildExecutionQueueDelegate& delegate, int numLanes) {
return new LaneBasedExecutionQueue(delegate, numLanes);
}