[engine] Limit number of active subprocesses
Previously the number of concurrent subprocess invocations launched by
`ctx.os.exec()` was unbounded, which could place a strain on the system.
Now there's effectively a pool of NumCPU+2 workers for running
subprocesses.
`ctx.os.exec()` returns immediately, but the underlying subprocess is
started asynchronously.
`ba -against main` showed no significant difference in the results of
the `ctx.os.exec()` benchmarks.
Change-Id: I76e4542249783c9a503f0f927e327e9f90f8bb04
Reviewed-on: https://fuchsia-review.googlesource.com/c/shac-project/shac/+/867979
Reviewed-by: Ina Huh <ihuh@google.com>
Commit-Queue: Oliver Newman <olivernewman@google.com>
diff --git a/internal/engine/run.go b/internal/engine/run.go
index 512cd9c..c341870 100644
--- a/internal/engine/run.go
+++ b/internal/engine/run.go
@@ -40,6 +40,7 @@
"go.starlark.net/resolve"
"go.starlark.net/starlark"
"golang.org/x/sync/errgroup"
+ "golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
)
@@ -364,6 +365,8 @@
packages: packages,
}
+ subprocessSem := semaphore.NewWeighted(int64(runtime.NumCPU()) + 2)
+
var vars map[string]string
newState := func(scm scmCheckout, subdir string, idx int) (*shacState, error) {
@@ -404,6 +407,7 @@
sandbox: sb,
scm: scm,
subdir: subdir,
+ subprocessSem: subprocessSem,
tmpdir: filepath.Join(tmpdir, strconv.Itoa(idx)),
writableRoot: doc.WritableRoot,
vars: vars,
@@ -464,7 +468,7 @@
if err != nil {
return err
}
- shacStates = []*shacState{state}
+ shacStates = append(shacStates, state)
}
// Parse the starlark files. Run everything from our errgroup.
@@ -488,6 +492,7 @@
if cb == nil {
loop = false
} else {
+ // Actually run the check.
eg.Go(cb)
}
case <-done:
@@ -644,6 +649,9 @@
filter CheckFilter
passthroughEnv []*PassthroughEnv
+ // Limits the number of concurrent subprocesses launched by ctx.os.exec().
+ subprocessSem *semaphore.Weighted
+
// Set when fail() is called. This happens only during the first phase, thus
// no mutex is needed.
failErr *failure
diff --git a/internal/engine/run_test.go b/internal/engine/run_test.go
index e84ce40..392f459 100644
--- a/internal/engine/run_test.go
+++ b/internal/engine/run_test.go
@@ -2273,7 +2273,7 @@
},
{
name: "ctx-os-exec-parallel.star",
- want: strings.Repeat("[//ctx-os-exec-parallel.star:27] Hello, world\n", 10),
+ want: strings.Repeat("[//ctx-os-exec-parallel.star:28] Hello, world\n", 1000),
},
{
name: "ctx-os-exec-relpath.star",
diff --git a/internal/engine/runtime_ctx_os.go b/internal/engine/runtime_ctx_os.go
index 12e0818..a1735f8 100644
--- a/internal/engine/runtime_ctx_os.go
+++ b/internal/engine/runtime_ctx_os.go
@@ -42,6 +42,7 @@
raiseOnFailure bool
okRetcodes []int
tempDir string
+ startErrs <-chan error
waitCalled bool
}
@@ -80,15 +81,27 @@
return []string{"wait"}
}
-func (s *subprocess) wait() (starlark.Value, error) {
+func (s *subprocess) wait(state *shacState) (starlark.Value, error) {
if s.waitCalled {
return nil, fmt.Errorf("wait was already called")
}
s.waitCalled = true
+ val, err := s.waitInner(state)
+ if err2 := s.cleanup(); err == nil {
+ err = err2
+ }
+ return val, err
+}
- defer s.cleanup()
+func (s *subprocess) waitInner(state *shacState) (starlark.Value, error) {
+ if err := <-s.startErrs; err != nil {
+ // If cmd.Start() failed the semaphore will already have been released,
+ // no need to release it.
+ return nil, err
+ }
err := s.cmd.Wait()
+ state.subprocessSem.Release(1)
retcode := 0
if err != nil {
var errExit *exec.ExitError
@@ -127,16 +140,25 @@
}
func (s *subprocess) cleanup() error {
+ // Wait for the subprocess to launch before trying to kill it. s.startErrs
+ // gets closed after the subprocess starts, so even if the error has already
+ // been received by `wait()`, this receive will return due to the channel
+ // being closed.
+ <-s.startErrs
// Kill the process before doing any other cleanup steps to ensure resources
- // are no longer in use.
- err := s.cmd.Process.Kill()
- // Kill() doesn't block until the process actually completes, so we need to
- // wait before cleaning up resources.
- _ = s.cmd.Wait()
+ // are no longer in use before cleaning them up.
+ var err error
+ if s.cmd.ProcessState == nil {
+ err = s.cmd.Process.Kill()
+ // Kill() is non-blocking, so it's necessary to wait for the process to
+ // exit before cleaning up resources.
+ _ = s.cmd.Wait()
+ }
if err2 := os.RemoveAll(s.tempDir); err == nil {
err = err2
}
+
buffers.push(s.stdout)
buffers.push(s.stderr)
s.stdout, s.stderr = nil, nil
@@ -148,7 +170,7 @@
if err := starlark.UnpackArgs(name, args, kwargs); err != nil {
return nil, err
}
- return self.(*subprocess).wait()
+ return self.(*subprocess).wait(s)
})
// ctxOsExec implements the native function ctx.os.exec().
@@ -212,15 +234,6 @@
return os.RemoveAll(tempDir)
})
- stdout := buffers.get()
- stderr := buffers.get()
-
- cleanupFuncs = append(cleanupFuncs, func() error {
- buffers.push(stdout)
- buffers.push(stderr)
- return nil
- })
-
env := map[string]string{
"PATH": os.Getenv("PATH"),
"TEMP": tempDir,
@@ -387,15 +400,31 @@
cmd := s.sandbox.Command(ctx, config)
- cmd.Stdin = stdin
+ stdout, stderr := buffers.get(), buffers.get()
// TODO(olivernewman): Also handle commands that may output non-utf-8 bytes.
cmd.Stdout = stdout
cmd.Stderr = stderr
+ cmd.Stdin = stdin
- err = execsupport.Start(cmd)
- if err != nil {
- return nil, err
- }
+ startErrs := make(chan error, 1)
+ go func() {
+ // Signals to subprocess.cleanup() that starting the subprocess is done,
+ // whether or not it was successful.
+ defer close(startErrs)
+
+ err := s.subprocessSem.Acquire(ctx, 1)
+ if err != nil {
+ startErrs <- err
+ return
+ }
+ err = execsupport.Start(cmd)
+ if err != nil {
+ // Release early if the process failed to start, no point in
+ // delaying until wait() is called.
+ s.subprocessSem.Release(1)
+ }
+ startErrs <- err
+ }()
proc := &subprocess{
cmd: cmd,
@@ -405,7 +434,9 @@
raiseOnFailure: bool(argraiseOnFailure),
okRetcodes: okRetcodes,
tempDir: tempDir,
+ startErrs: startErrs,
}
+
// Only clean up now if starting the subprocess failed; otherwise it will
// get cleaned up by wait().
cleanupFuncs = cleanupFuncs[:0]
diff --git a/internal/engine/testdata/print/ctx-os-exec-parallel.star b/internal/engine/testdata/print/ctx-os-exec-parallel.star
index 5aaf075..ab9b28f 100644
--- a/internal/engine/testdata/print/ctx-os-exec-parallel.star
+++ b/internal/engine/testdata/print/ctx-os-exec-parallel.star
@@ -18,9 +18,10 @@
else:
cmd = ["./hello_world.sh"]
- procs = []
- for _ in range(10):
- procs.append(ctx.os.exec(cmd))
+ # Launch more parallel subprocesses than any realistic host machine will
+ # have cores (but not too many, or the test will be very slow).
+ num_procs = 1000
+ procs = [ctx.os.exec(cmd) for _ in range(num_procs)]
for proc in procs:
res = proc.wait()
diff --git a/internal/engine/version.go b/internal/engine/version.go
index 877fd9e..8e1d778 100644
--- a/internal/engine/version.go
+++ b/internal/engine/version.go
@@ -26,7 +26,7 @@
// Version is the current tool version.
//
// TODO(maruel): Add proper version, preferably from git tag.
- Version = shacVersion{0, 1, 14}
+ Version = shacVersion{0, 1, 15}
)
func (v shacVersion) String() string {