blob: 7e82fd8cd8c3cbd2eb31f2a512ac0389c225af1c [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package runner
import (
"context"
"io"
"sync"
)
// Runner defines the interface for running commands by many means such as via SSH
// or via Shell or serial or some other such means.
type Runner interface {
Run(context.Context, []string, io.Writer, io.Writer) error
}
// BatchRunner allows many tasks to be run in paralell using a Runner.
// BatchRunner will give every process the same stderr of your choice
// but will save the contents of every stdout for later debugging.
type BatchRunner struct {
ctx context.Context
cancel func()
wg sync.WaitGroup
canEnqueue chan struct{}
runner Runner
errs chan error
}
// NewBatchRunner creates a BatchRunner that can use runner to run many jobs
// in paralell. At most maxBatchSize jobs will be active at once. Enqueue will
// block when maxBatchSize is exceeded. When ctx is Done, all jobs will terminate
// without error.
func NewBatchRunner(ctx context.Context, runner Runner, maxBatchSize int) *BatchRunner {
ctxCancel, cancel := context.WithCancel(ctx)
return &BatchRunner{
ctx: ctxCancel,
cancel: cancel,
canEnqueue: make(chan struct{}, maxBatchSize),
runner: runner,
errs: make(chan error, 1),
}
}
// Enqueue is similair to Run in Runner except that it's async. The call returns
// immediately and the enqueued command starts running. If however the maximum
// batch size has been reached Enqueue will block until a new job opens up.
func (b *BatchRunner) Enqueue(command []string, stdout, stderr io.Writer, closers ...func()) {
// Note that the order of the wg.Add and the canEnqueue send below is important.
// We'd like the property to hold that if any part of BatchRunner's Wait method
// has executed then no matter where we are in this function, we either panic or
// function correctly (e.g. the job is enqueued and will be waited on). If we flip
// the order of wg.Add and the canEnqueue send there are interleavings that
// allow Wait and Enqueue to race such that Wait returns before an Enqueued job
// completes yet Enqueue doesn't panic.
b.wg.Add(1)
// This dynamically prevents Enqueue being run after Wait and sets the maximum batch
// size.
b.canEnqueue <- struct{}{}
go func() {
defer func() {
b.wg.Done()
// This should always succeed instantly.
<-b.canEnqueue
for _, closer := range closers {
closer()
}
}()
// Now this goroutine blocks until Run finishes.
if err := b.runner.Run(b.ctx, command, stdout, stderr); err != nil {
// If an error has already been sent out don't bother sending another.
select {
case b.errs <- err:
// Stop all other jobs.
b.cancel()
return
default:
return
}
}
}()
}
// Wait blocks on all previouslly Enqueued tasks to finish. It is invalid for
// Enqueue to be called after Wait is called.
func (b *BatchRunner) Wait() error {
close(b.canEnqueue)
b.wg.Wait()
select {
case err := <-b.errs:
return err
default:
return nil
}
}