Reland "[engine] Limit number of active subprocesses"

This reverts commit 0dae038b1f9471122d08633e0e246f8436c24275.

Reason for revert: There was a potential for deadlocks in the first
version of this change. Previously, the semaphore used to limit the
number of running subprocesses wouldn't get released, allowing another
subproces to start, until the subprocess completed *and* the `wait()`
Starlark function was called on the subprocess object.

This meant that it was not safe to launch a large number of subprocesses
and then call `wait()` on them in any order other than the same order in
which they were started. Otherwise the code would run the risk of
calling `wait()` on a subprocess that couldn't start until another
subprocess had `wait()` called on it, leading to a deadlock.

Now the semaphore gets released by a separate goroutine immediately
after the subprocess completes, even if the Starlark code hasn't called
`wait()` yet, making this class of deadlock impossible.

Original change's description:
> Revert "[engine] Limit number of active subprocesses"
>
> This reverts commit 612779b564d2ba08167db3f63f4166fa012729bd.
>
> Reason for revert: `shac check --all` is hanging in large repos:
> https://ci.chromium.org/b/8767597391211255425
>
> Original change's description:
> > [engine] Limit number of active subprocesses
> >
> > Previously the number of concurrent subprocess invocations launched by
> > `ctx.os.exec()` was unbounded, which could place a strain on the system.
> > Now there's effectively a pool of NumCPU+2 workers for running
> > subprocesses.
> >
> > `ctx.os.exec()` returns immediately, but the underlying subprocess is
> > started asynchronously.
> >
> > `ba -against main` showed no significant difference in the results of
> > the `ctx.os.exec()` benchmarks.
> >
> > Change-Id: I76e4542249783c9a503f0f927e327e9f90f8bb04
> > Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/867979
> > Reviewed-by: Ina Huh <ihuh@google.com>
> > Commit-Queue: Oliver Newman <olivernewman@google.com>
>
> Change-Id: Icfd3611825b1995948c856170ddc353b7ebfb1eb
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/929633
> Fuchsia-Auto-Submit: Oliver Newman <olivernewman@google.com>
> Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
> Reviewed-by: RubberStamper 🤖 <android-build-ayeaye@system.gserviceaccount.com>

Change-Id: Iefdd7aebc04d03e60f925f136a08eebc28e5bb63
Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/929654
Reviewed-by: Ina Huh <ihuh@google.com>
Fuchsia-Auto-Submit: Oliver Newman <olivernewman@google.com>
Commit-Queue: Auto-Submit <auto-submit@fuchsia-infra.iam.gserviceaccount.com>
diff --git a/internal/engine/run.go b/internal/engine/run.go
index 512cd9c..c341870 100644
--- a/internal/engine/run.go
+++ b/internal/engine/run.go
@@ -40,6 +40,7 @@
 	"go.starlark.net/resolve"
 	"go.starlark.net/starlark"
 	"golang.org/x/sync/errgroup"
+	"golang.org/x/sync/semaphore"
 	"google.golang.org/protobuf/encoding/prototext"
 )
 
@@ -364,6 +365,8 @@
 		packages: packages,
 	}
 
+	subprocessSem := semaphore.NewWeighted(int64(runtime.NumCPU()) + 2)
+
 	var vars map[string]string
 
 	newState := func(scm scmCheckout, subdir string, idx int) (*shacState, error) {
@@ -404,6 +407,7 @@
 			sandbox:        sb,
 			scm:            scm,
 			subdir:         subdir,
+			subprocessSem:  subprocessSem,
 			tmpdir:         filepath.Join(tmpdir, strconv.Itoa(idx)),
 			writableRoot:   doc.WritableRoot,
 			vars:           vars,
@@ -464,7 +468,7 @@
 		if err != nil {
 			return err
 		}
-		shacStates = []*shacState{state}
+		shacStates = append(shacStates, state)
 	}
 
 	// Parse the starlark files. Run everything from our errgroup.
@@ -488,6 +492,7 @@
 			if cb == nil {
 				loop = false
 			} else {
+				// Actually run the check.
 				eg.Go(cb)
 			}
 		case <-done:
@@ -644,6 +649,9 @@
 	filter         CheckFilter
 	passthroughEnv []*PassthroughEnv
 
+	// Limits the number of concurrent subprocesses launched by ctx.os.exec().
+	subprocessSem *semaphore.Weighted
+
 	// Set when fail() is called. This happens only during the first phase, thus
 	// no mutex is needed.
 	failErr *failure
diff --git a/internal/engine/run_test.go b/internal/engine/run_test.go
index e84ce40..00d8f9e 100644
--- a/internal/engine/run_test.go
+++ b/internal/engine/run_test.go
@@ -2273,7 +2273,7 @@
 		},
 		{
 			name: "ctx-os-exec-parallel.star",
-			want: strings.Repeat("[//ctx-os-exec-parallel.star:27] Hello, world\n", 10),
+			want: strings.Repeat("[//ctx-os-exec-parallel.star:32] Hello, world\n", 1000),
 		},
 		{
 			name: "ctx-os-exec-relpath.star",
diff --git a/internal/engine/runtime_ctx_os.go b/internal/engine/runtime_ctx_os.go
index 12e0818..6a68f11 100644
--- a/internal/engine/runtime_ctx_os.go
+++ b/internal/engine/runtime_ctx_os.go
@@ -42,6 +42,7 @@
 	raiseOnFailure bool
 	okRetcodes     []int
 	tempDir        string
+	errs           <-chan error
 
 	waitCalled bool
 }
@@ -85,12 +86,16 @@
 		return nil, fmt.Errorf("wait was already called")
 	}
 	s.waitCalled = true
+	val, err := s.waitInner()
+	if err2 := s.cleanup(); err == nil {
+		err = err2
+	}
+	return val, err
+}
 
-	defer s.cleanup()
-
-	err := s.cmd.Wait()
+func (s *subprocess) waitInner() (starlark.Value, error) {
 	retcode := 0
-	if err != nil {
+	if err := <-s.errs; err != nil {
 		var errExit *exec.ExitError
 		if errors.As(err, &errExit) {
 			retcode = errExit.ExitCode()
@@ -127,16 +132,27 @@
 }
 
 func (s *subprocess) cleanup() error {
+	// Wait for the subprocess to launch before trying to kill it. s.startErrs
+	// gets closed after the subprocess starts, so even if the error has already
+	// been received by `wait()`, this receive will return due to the channel
+	// being closed.
+	<-s.errs
 	// Kill the process before doing any other cleanup steps to ensure resources
-	// are no longer in use.
-	err := s.cmd.Process.Kill()
-	// Kill() doesn't block until the process actually completes, so we need to
-	// wait before cleaning up resources.
-	_ = s.cmd.Wait()
+	// are no longer in use before cleaning them up.
+	var err error
+	// If Process is not nil then the command successfully started. If
+	// ProcessState is nil then the command hasn't yet completed.
+	if s.cmd.Process != nil && s.cmd.ProcessState == nil {
+		err = s.cmd.Process.Kill()
+		// Kill() is non-blocking, so it's necessary to wait for the process to
+		// exit before cleaning up resources.
+		_ = s.cmd.Wait()
+	}
 
 	if err2 := os.RemoveAll(s.tempDir); err == nil {
 		err = err2
 	}
+
 	buffers.push(s.stdout)
 	buffers.push(s.stderr)
 	s.stdout, s.stderr = nil, nil
@@ -212,15 +228,6 @@
 		return os.RemoveAll(tempDir)
 	})
 
-	stdout := buffers.get()
-	stderr := buffers.get()
-
-	cleanupFuncs = append(cleanupFuncs, func() error {
-		buffers.push(stdout)
-		buffers.push(stderr)
-		return nil
-	})
-
 	env := map[string]string{
 		"PATH":    os.Getenv("PATH"),
 		"TEMP":    tempDir,
@@ -387,15 +394,29 @@
 
 	cmd := s.sandbox.Command(ctx, config)
 
-	cmd.Stdin = stdin
+	stdout, stderr := buffers.get(), buffers.get()
 	// TODO(olivernewman): Also handle commands that may output non-utf-8 bytes.
 	cmd.Stdout = stdout
 	cmd.Stderr = stderr
+	cmd.Stdin = stdin
 
-	err = execsupport.Start(cmd)
-	if err != nil {
-		return nil, err
-	}
+	errs := make(chan error, 1)
+	go func() {
+		errs <- func() error {
+			if err := s.subprocessSem.Acquire(ctx, 1); err != nil {
+				return err
+			}
+			defer s.subprocessSem.Release(1)
+
+			if err := execsupport.Start(cmd); err != nil {
+				return err
+			}
+			return cmd.Wait()
+		}()
+		// Signals to subprocess.wait() that the subprocess is done, whether or
+		// not it was successful.
+		close(errs)
+	}()
 
 	proc := &subprocess{
 		cmd:            cmd,
@@ -405,7 +426,9 @@
 		raiseOnFailure: bool(argraiseOnFailure),
 		okRetcodes:     okRetcodes,
 		tempDir:        tempDir,
+		errs:           errs,
 	}
+
 	// Only clean up now if starting the subprocess failed; otherwise it will
 	// get cleaned up by wait().
 	cleanupFuncs = cleanupFuncs[:0]
diff --git a/internal/engine/testdata/print/ctx-os-exec-parallel.star b/internal/engine/testdata/print/ctx-os-exec-parallel.star
index 5aaf075..bbbf0be 100644
--- a/internal/engine/testdata/print/ctx-os-exec-parallel.star
+++ b/internal/engine/testdata/print/ctx-os-exec-parallel.star
@@ -18,11 +18,16 @@
     else:
         cmd = ["./hello_world.sh"]
 
-    procs = []
-    for _ in range(10):
-        procs.append(ctx.os.exec(cmd))
+    # Launch more parallel subprocesses than shac will actually allow to run in
+    # parallel, i.e. more than any realistic machine will have cores (but not
+    # too many, or the test will be very slow).
+    num_procs = 1000
+    procs = [ctx.os.exec(cmd) for _ in range(num_procs)]
 
-    for proc in procs:
+    # It should be possible to wait on the subprocesses in the reverse of the
+    # order in which they were started without causing a deadlock; the lock
+    # should be released asynchronously, not by calling wait().
+    for proc in reversed(procs):
         res = proc.wait()
         print(res.stdout.strip())
 
diff --git a/internal/engine/version.go b/internal/engine/version.go
index 877fd9e..8e1d778 100644
--- a/internal/engine/version.go
+++ b/internal/engine/version.go
@@ -26,7 +26,7 @@
 	// Version is the current tool version.
 	//
 	// TODO(maruel): Add proper version, preferably from git tag.
-	Version = shacVersion{0, 1, 14}
+	Version = shacVersion{0, 1, 15}
 )
 
 func (v shacVersion) String() string {