Merge pull request #179 from ddunbar/stop-lane-buffering
[BuildSystem] Buffer command output at frontend level.
diff --git a/include/llbuild/BuildSystem/BuildExecutionQueue.h b/include/llbuild/BuildSystem/BuildExecutionQueue.h
index 8231d42..fb80fb8 100644
--- a/include/llbuild/BuildSystem/BuildExecutionQueue.h
+++ b/include/llbuild/BuildSystem/BuildExecutionQueue.h
@@ -150,6 +150,9 @@
/// All delegate interfaces are invoked synchronously by the execution queue,
/// and should defer any long running operations to avoid blocking the queue
/// unnecessarily.
+///
+/// NOTE: The delegate *MUST* be thread-safe with respect to all calls, which
+/// will arrive concurrently and without any specified thread.
class BuildExecutionQueueDelegate {
// DO NOT COPY
BuildExecutionQueueDelegate(const BuildExecutionQueueDelegate&)
diff --git a/lib/BuildSystem/BuildSystemFrontend.cpp b/lib/BuildSystem/BuildSystemFrontend.cpp
index 4a9948a..0dadb2c 100644
--- a/lib/BuildSystem/BuildSystemFrontend.cpp
+++ b/lib/BuildSystem/BuildSystemFrontend.cpp
@@ -20,6 +20,7 @@
#include "llbuild/BuildSystem/BuildFile.h"
#include "llbuild/BuildSystem/BuildKey.h"
+#include "llvm/ADT/DenseMap.h"
#include "llvm/ADT/SmallString.h"
#include "llvm/Support/Format.h"
#include "llvm/Support/Path.h"
@@ -28,6 +29,7 @@
#include <atomic>
#include <memory>
+#include <mutex>
#include <thread>
using namespace llbuild;
@@ -252,6 +254,12 @@
BuildSystemFrontend* frontend = nullptr;
BuildSystem* system = nullptr;
+
+ /// The set of active command output buffers, by process handle.
+ llvm::DenseMap<uintptr_t, std::vector<uint8_t>> processOutputBuffers;
+
+ /// The lock protecting `processOutputBuffers`.
+ std::mutex processOutputBuffersMutex;
BuildSystemFrontendDelegateImpl(llvm::SourceMgr& sourceMgr,
const BuildSystemInvocation& invocation)
@@ -523,15 +531,30 @@
void BuildSystemFrontendDelegate::
commandProcessHadOutput(Command* command, ProcessHandle handle,
StringRef data) {
- // FIXME: Design the logging and status output APIs.
- fwrite(data.data(), data.size(), 1, stdout);
- fflush(stdout);
+ auto impl = static_cast<BuildSystemFrontendDelegateImpl*>(this->impl);
+ std::unique_lock<std::mutex> lock(impl->processOutputBuffersMutex);
+
+ // Append to the output buffer.
+ auto& buffer = impl->processOutputBuffers[handle.id];
+ buffer.insert(buffer.end(), data.begin(), data.end());
}
void BuildSystemFrontendDelegate::
commandProcessFinished(Command*, ProcessHandle handle,
CommandResult result,
int exitStatus) {
+ auto impl = static_cast<BuildSystemFrontendDelegateImpl*>(this->impl);
+ std::unique_lock<std::mutex> lock(impl->processOutputBuffersMutex);
+
+ // If there was an output buffer, flush it.
+ auto it = impl->processOutputBuffers.find(handle.id);
+ if (it == impl->processOutputBuffers.end())
+ return;
+
+ fwrite(it->second.data(), it->second.size(), 1, stdout);
+ fflush(stdout);
+
+ impl->processOutputBuffers.erase(it);
}
#pragma mark - BuildSystemFrontend implementation
diff --git a/lib/BuildSystem/LaneBasedExecutionQueue.cpp b/lib/BuildSystem/LaneBasedExecutionQueue.cpp
index c92fcf9..f154b45 100644
--- a/lib/BuildSystem/LaneBasedExecutionQueue.cpp
+++ b/lib/BuildSystem/LaneBasedExecutionQueue.cpp
@@ -430,7 +430,6 @@
}
// Read the command output, if capturing.
- SmallString<1024> outputData;
if (shouldCaptureOutput) {
// Close the write end of the output pipe.
::close(outputPipe[1]);
@@ -449,7 +448,10 @@
if (numBytes == 0)
break;
- outputData.insert(outputData.end(), &buf[0], &buf[numBytes]);
+ // Notify the client of the output.
+ getDelegate().commandProcessHadOutput(
+ context.job.getForCommand(), handle,
+ StringRef(buf, numBytes));
}
// Close the read end of the pipe.
@@ -475,12 +477,6 @@
CommandResult::Failed, -1);
return CommandResult::Failed;
}
-
- // Notify the client of the output, if buffering.
- if (shouldCaptureOutput) {
- getDelegate().commandProcessHadOutput(context.job.getForCommand(), handle,
- outputData);
- }
// Notify of the process completion.
bool cancelled = WIFSIGNALED(status) && (WTERMSIG(status) == SIGINT || WTERMSIG(status) == SIGKILL);