|  | // Copyright 2019 The Fuchsia Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | package runner | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "io" | 
|  | "sync" | 
|  | ) | 
|  |  | 
|  | // Runner defines the interface for running commands by many means such as via SSH | 
|  | // or via Shell or serial or some other such means. | 
|  | type Runner interface { | 
|  | Run(context.Context, []string, io.Writer, io.Writer) error | 
|  | } | 
|  |  | 
|  | // BatchRunner allows many tasks to be run in paralell using a Runner. | 
|  | // BatchRunner will give every process the same stderr of your choice | 
|  | // but will save the contents of every stdout for later debugging. | 
|  | type BatchRunner struct { | 
|  | ctx        context.Context | 
|  | cancel     func() | 
|  | wg         sync.WaitGroup | 
|  | canEnqueue chan struct{} | 
|  | runner     Runner | 
|  | errs       chan error | 
|  | } | 
|  |  | 
|  | // NewBatchRunner creates a BatchRunner that can use runner to run many jobs | 
|  | // in paralell. At most maxBatchSize jobs will be active at once. Enqueue will | 
|  | // block when maxBatchSize is exceeded. When ctx is Done, all jobs will terminate | 
|  | // without error. | 
|  | func NewBatchRunner(ctx context.Context, runner Runner, maxBatchSize int) *BatchRunner { | 
|  | ctxCancel, cancel := context.WithCancel(ctx) | 
|  | return &BatchRunner{ | 
|  | ctx:        ctxCancel, | 
|  | cancel:     cancel, | 
|  | canEnqueue: make(chan struct{}, maxBatchSize), | 
|  | runner:     runner, | 
|  | errs:       make(chan error, 1), | 
|  | } | 
|  | } | 
|  |  | 
|  | // Enqueue is similair to Run in Runner except that it's async. The call returns | 
|  | // immediately and the enqueued command starts running. If however the maximum | 
|  | // batch size has been reached Enqueue will block until a new job opens up. | 
|  | func (b *BatchRunner) Enqueue(command []string, stdout, stderr io.Writer, closers ...func()) { | 
|  | // Note that the order of the wg.Add and the canEnqueue send below is important. | 
|  | // We'd like the property to hold that if any part of BatchRunner's Wait method | 
|  | // has executed then no matter where we are in this function, we either panic or | 
|  | // function correctly (e.g. the job is enqueued and will be waited on). If we flip | 
|  | // the order of wg.Add and the canEnqueue send there are interleavings that | 
|  | // allow Wait and Enqueue to race such that Wait returns before an Enqueued job | 
|  | // completes yet Enqueue doesn't panic. | 
|  | b.wg.Add(1) | 
|  | // This dynamically prevents Enqueue being run after Wait and sets the maximum batch | 
|  | // size. | 
|  | b.canEnqueue <- struct{}{} | 
|  | go func() { | 
|  | defer func() { | 
|  | b.wg.Done() | 
|  | // This should always succeed instantly. | 
|  | <-b.canEnqueue | 
|  | for _, closer := range closers { | 
|  | closer() | 
|  | } | 
|  | }() | 
|  | // Now this goroutine blocks until Run finishes. | 
|  | if err := b.runner.Run(b.ctx, command, stdout, stderr); err != nil { | 
|  | // If an error has already been sent out don't bother sending another. | 
|  | select { | 
|  | case b.errs <- err: | 
|  | // Stop all other jobs. | 
|  | b.cancel() | 
|  | return | 
|  | default: | 
|  | return | 
|  | } | 
|  | } | 
|  | }() | 
|  | } | 
|  |  | 
|  | // Wait blocks on all previouslly Enqueued tasks to finish. It is invalid for | 
|  | // Enqueue to be called after Wait is called. | 
|  | func (b *BatchRunner) Wait() error { | 
|  | close(b.canEnqueue) | 
|  | b.wg.Wait() | 
|  | select { | 
|  | case err := <-b.errs: | 
|  | return err | 
|  | default: | 
|  | return nil | 
|  | } | 
|  | } |