Merge pull request #179 from ddunbar/stop-lane-buffering

[BuildSystem] Buffer command output at frontend level.
diff --git a/include/llbuild/BuildSystem/BuildExecutionQueue.h b/include/llbuild/BuildSystem/BuildExecutionQueue.h
index 8231d42..fb80fb8 100644
--- a/include/llbuild/BuildSystem/BuildExecutionQueue.h
+++ b/include/llbuild/BuildSystem/BuildExecutionQueue.h
@@ -150,6 +150,9 @@
 /// All delegate interfaces are invoked synchronously by the execution queue,
 /// and should defer any long running operations to avoid blocking the queue
 /// unnecessarily.
+///
+/// NOTE: The delegate *MUST* be thread-safe with respect to all calls, which
+/// will arrive concurrently and without any specified thread.
 class BuildExecutionQueueDelegate {
   // DO NOT COPY
   BuildExecutionQueueDelegate(const BuildExecutionQueueDelegate&)
diff --git a/lib/BuildSystem/BuildSystemFrontend.cpp b/lib/BuildSystem/BuildSystemFrontend.cpp
index 4a9948a..0dadb2c 100644
--- a/lib/BuildSystem/BuildSystemFrontend.cpp
+++ b/lib/BuildSystem/BuildSystemFrontend.cpp
@@ -20,6 +20,7 @@
 #include "llbuild/BuildSystem/BuildFile.h"
 #include "llbuild/BuildSystem/BuildKey.h"
 
+#include "llvm/ADT/DenseMap.h"
 #include "llvm/ADT/SmallString.h"
 #include "llvm/Support/Format.h"
 #include "llvm/Support/Path.h"
@@ -28,6 +29,7 @@
 
 #include <atomic>
 #include <memory>
+#include <mutex>
 #include <thread>
 
 using namespace llbuild;
@@ -252,6 +254,12 @@
 
   BuildSystemFrontend* frontend = nullptr;
   BuildSystem* system = nullptr;
+
+  /// The set of active command output buffers, by process handle.
+  llvm::DenseMap<uintptr_t, std::vector<uint8_t>> processOutputBuffers;
+
+  /// The lock protecting `processOutputBuffers`.
+  std::mutex processOutputBuffersMutex;
   
   BuildSystemFrontendDelegateImpl(llvm::SourceMgr& sourceMgr,
                                   const BuildSystemInvocation& invocation)
@@ -523,15 +531,30 @@
 void BuildSystemFrontendDelegate::
 commandProcessHadOutput(Command* command, ProcessHandle handle,
                         StringRef data) {
-  // FIXME: Design the logging and status output APIs.
-  fwrite(data.data(), data.size(), 1, stdout);
-  fflush(stdout);
+  auto impl = static_cast<BuildSystemFrontendDelegateImpl*>(this->impl);
+  std::unique_lock<std::mutex> lock(impl->processOutputBuffersMutex);
+
+  // Append to the output buffer.
+  auto& buffer = impl->processOutputBuffers[handle.id];
+  buffer.insert(buffer.end(), data.begin(), data.end());
 }
 
 void BuildSystemFrontendDelegate::
 commandProcessFinished(Command*, ProcessHandle handle,
                        CommandResult result,
                        int exitStatus) {
+  auto impl = static_cast<BuildSystemFrontendDelegateImpl*>(this->impl);
+  std::unique_lock<std::mutex> lock(impl->processOutputBuffersMutex);
+
+  // If there was an output buffer, flush it.
+  auto it = impl->processOutputBuffers.find(handle.id);
+  if (it == impl->processOutputBuffers.end())
+    return;
+
+  fwrite(it->second.data(), it->second.size(), 1, stdout);
+  fflush(stdout);
+
+  impl->processOutputBuffers.erase(it);  
 }
 
 #pragma mark - BuildSystemFrontend implementation
diff --git a/lib/BuildSystem/LaneBasedExecutionQueue.cpp b/lib/BuildSystem/LaneBasedExecutionQueue.cpp
index c92fcf9..f154b45 100644
--- a/lib/BuildSystem/LaneBasedExecutionQueue.cpp
+++ b/lib/BuildSystem/LaneBasedExecutionQueue.cpp
@@ -430,7 +430,6 @@
     }
 
     // Read the command output, if capturing.
-    SmallString<1024> outputData;
     if (shouldCaptureOutput) {
       // Close the write end of the output pipe.
       ::close(outputPipe[1]);
@@ -449,7 +448,10 @@
         if (numBytes == 0)
           break;
 
-        outputData.insert(outputData.end(), &buf[0], &buf[numBytes]);
+        // Notify the client of the output.
+        getDelegate().commandProcessHadOutput(
+            context.job.getForCommand(), handle,
+            StringRef(buf, numBytes));
       }
 
       // Close the read end of the pipe.
@@ -475,12 +477,6 @@
                                            CommandResult::Failed, -1);
       return CommandResult::Failed;
     }
-
-    // Notify the client of the output, if buffering.
-    if (shouldCaptureOutput) {
-      getDelegate().commandProcessHadOutput(context.job.getForCommand(), handle,
-                                            outputData);
-    }
     
     // Notify of the process completion.
     bool cancelled = WIFSIGNALED(status) && (WTERMSIG(status) == SIGINT || WTERMSIG(status) == SIGKILL);