| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package botanist |
| |
| import ( |
| "bytes" |
| "context" |
| "errors" |
| "io" |
| "math" |
| "time" |
| |
| "go.fuchsia.dev/fuchsia/tools/lib/logger" |
| "go.fuchsia.dev/fuchsia/tools/lib/streams" |
| ) |
| |
| // Experiments is a map containing a set of experiments to check for. |
| type Experiments map[string]struct{} |
| |
| func GetExperiments(experiments []string) Experiments { |
| expMap := make(map[string]struct{}) |
| for _, exp := range experiments { |
| expMap[exp] = struct{}{} |
| } |
| return expMap |
| } |
| |
| func (e Experiments) Contains(experiment Experiment) bool { |
| _, ok := e[string(experiment)] |
| return ok |
| } |
| |
| // Experiment represents a supported botanist experiment. |
| type Experiment string |
| |
| const ( |
| UseFFXTestParallel Experiment = "use_ffx_test_parallel" |
| UseFFXMonitor Experiment = "use_ffx_monitor" |
| ForceFFXUSB Experiment = "force_ffx_usb" |
| UseFFXRepository Experiment = "use_ffx_repository" |
| ) |
| |
| var SupportedExperiments = []Experiment{UseFFXTestParallel, UseFFXMonitor, ForceFFXUSB, UseFFXRepository} |
| |
| // GetLoggerCtx returns a new context with the logger of the provided ctx. |
| func GetLoggerCtx(ctx context.Context) context.Context { |
| return logger.WithLogger(context.Background(), logger.LoggerFromContext(ctx)) |
| } |
| |
| // WaitForProcess launches a long-running process in the background and returns a cleanup |
| // function to cancel the context and wait for the process to finish. |
| func WaitForProcess(ctx context.Context, process func(context.Context) error, processName string) func() { |
| // Use a new context so that the subprocess can only be terminated by |
| // a direct call to the cancel function. |
| processCtx, cancel := context.WithCancel(GetLoggerCtx(ctx)) |
| cmdWait := make(chan error) |
| go func() { |
| err := process(processCtx) |
| if err != nil && !errors.Is(err, context.Canceled) { |
| logger.Errorf(ctx, "%s process finished with err: %s", processName, err) |
| } else { |
| logger.Debugf(ctx, "%s process finished", processName) |
| } |
| close(cmdWait) |
| }() |
| cleanup := func() { |
| cancel() |
| <-cmdWait |
| } |
| return cleanup |
| } |
| |
| // LockedWriter is a wrapper around a writer that locks around each write so |
| // that multiple writes won't interleave with each other. |
| type LockedWriter struct { |
| locks chan *writeLock |
| writer io.Writer |
| } |
| |
| type writeLock struct { |
| start chan struct{} |
| end chan struct{} |
| } |
| |
| // NewLockedWriter returns a LockedWriter that associates a new lock with the |
| // provided writer. |
| func NewLockedWriter(ctx context.Context, writer io.Writer) *LockedWriter { |
| w := &LockedWriter{ |
| locks: make(chan *writeLock), |
| writer: writer, |
| } |
| |
| go func() { |
| for lock := range w.locks { |
| // Signal write to start. |
| lock.start <- struct{}{} |
| // Wait for write to finish. |
| <-lock.end |
| } |
| }() |
| return w |
| } |
| |
| func (w *LockedWriter) Write(data []byte) (int, error) { |
| start := make(chan struct{}) |
| end := make(chan struct{}) |
| // Queue write. |
| w.locks <- &writeLock{start, end} |
| // Wait for turn to start write. |
| <-start |
| // Defer sending struct on chan to signal end of write. |
| defer func() { end <- struct{}{} }() |
| return w.writer.Write(data) |
| } |
| |
| func (w *LockedWriter) Close() { |
| close(w.locks) |
| } |
| |
| // LineWriter is a wrapper around a writer that writes line by line so |
| // that multiple writers to the same underlying writer won't interleave |
| // their writes midline. |
| type LineWriter struct { |
| writer io.Writer |
| line []byte |
| prefix string |
| } |
| |
| // NewLineWriter returns a new LineWriter. |
| func NewLineWriter(writer io.Writer, prefix string) *LineWriter { |
| return &LineWriter{ |
| writer: writer, |
| prefix: prefix, |
| } |
| } |
| |
| // Write stores bytes until it gets a newline and then writes to the underlying |
| // writer line by line. If the underlying Write() returns an err, this writer |
| // will return the number of bytes of the current data that were written. |
| // Otherwise, it returns the full length of the data to notify callers that it |
| // has received the whole data. |
| func (w *LineWriter) Write(data []byte) (int, error) { |
| lines := bytes.SplitAfter(data, []byte("\n")) |
| written := 0 |
| for _, line := range lines { |
| if bytes.HasSuffix(line, []byte("\n")) { |
| toWrite := []byte{} |
| if w.prefix != "" { |
| toWrite = append(toWrite, []byte(w.prefix+": ")...) |
| } |
| toWrite = append(toWrite, w.line...) |
| n, err := w.writer.Write(append(toWrite, line...)) |
| written += int(math.Max(0, float64(n-len(toWrite)))) |
| if err != nil { |
| return written, err |
| } |
| w.line = []byte{} |
| } else { |
| w.line = append(w.line, line...) |
| } |
| } |
| return len(data), nil |
| } |
| |
| // TimestampWriter is a wrapper around a writer that prepends its writes |
| // with the current host timestamp. This will allow all botanist logs |
| // (kernel/serial/syslog/test) to be reliably lined up when reading them. |
| type TimestampWriter struct { |
| writer io.Writer |
| format string |
| } |
| |
| func NewTimestampWriter(writer io.Writer) *TimestampWriter { |
| return &TimestampWriter{ |
| writer: writer, |
| format: "2006-01-02 15:04:05.000000 ", |
| } |
| } |
| |
| func (w *TimestampWriter) Write(data []byte) (int, error) { |
| if n, err := w.writer.Write([]byte(time.Now().Format(w.format))); err != nil { |
| return n, err |
| } |
| return w.writer.Write(data) |
| } |
| |
| // NewStiodWriters returns a new LineWriter for the stdout and stderr associated |
| // with the provided context. It also returns a function to flush out any |
| // remaining data not written by Write because it didn't end with a newline. |
| func NewStdioWriters(ctx context.Context, id string) (io.Writer, io.Writer, func()) { |
| stdoutWriter := NewLineWriter(streams.Stdout(ctx), id) |
| stderrWriter := NewLineWriter(streams.Stderr(ctx), id) |
| flush := func() { |
| // Flush out the rest of the data stored by the writers. |
| if len(stdoutWriter.line) > 0 { |
| if _, err := stdoutWriter.Write([]byte("\n")); err != nil { |
| logger.Debugf(ctx, "failed to flush out data to stdout %q: %s", string(stdoutWriter.line), err) |
| } |
| } |
| if len(stderrWriter.line) > 0 { |
| if _, err := stderrWriter.Write([]byte("\n")); err != nil { |
| logger.Debugf(ctx, "failed to flush out data to stderr %q: %s", string(stderrWriter.line), err) |
| } |
| } |
| } |
| return stdoutWriter, stderrWriter, flush |
| } |