| package engine |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "sync" |
| ) |
| |
| type Output struct { |
| sync.Mutex |
| dests []io.Writer |
| tasks sync.WaitGroup |
| used bool |
| } |
| |
| // Tail returns the n last lines of a buffer |
| // stripped out of the last \n, if any |
| // if n <= 0, returns an empty string |
| func Tail(buffer *bytes.Buffer, n int) string { |
| if n <= 0 { |
| return "" |
| } |
| bytes := buffer.Bytes() |
| if len(bytes) > 0 && bytes[len(bytes)-1] == '\n' { |
| bytes = bytes[:len(bytes)-1] |
| } |
| for i := buffer.Len() - 2; i >= 0; i-- { |
| if bytes[i] == '\n' { |
| n-- |
| if n == 0 { |
| return string(bytes[i+1:]) |
| } |
| } |
| } |
| return string(bytes) |
| } |
| |
| // NewOutput returns a new Output object with no destinations attached. |
| // Writing to an empty Output will cause the written data to be discarded. |
| func NewOutput() *Output { |
| return &Output{} |
| } |
| |
| // Return true if something was written on this output |
| func (o *Output) Used() bool { |
| o.Lock() |
| defer o.Unlock() |
| return o.used |
| } |
| |
| // Add attaches a new destination to the Output. Any data subsequently written |
| // to the output will be written to the new destination in addition to all the others. |
| // This method is thread-safe. |
| func (o *Output) Add(dst io.Writer) { |
| o.Lock() |
| defer o.Unlock() |
| o.dests = append(o.dests, dst) |
| } |
| |
| // Set closes and remove existing destination and then attaches a new destination to |
| // the Output. Any data subsequently written to the output will be written to the new |
| // destination in addition to all the others. This method is thread-safe. |
| func (o *Output) Set(dst io.Writer) { |
| o.Close() |
| o.Lock() |
| defer o.Unlock() |
| o.dests = []io.Writer{dst} |
| } |
| |
| // AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination, |
| // and returns its reading end for consumption by the caller. |
| // This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package. |
| // This method is thread-safe. |
| func (o *Output) AddPipe() (io.Reader, error) { |
| r, w := io.Pipe() |
| o.Add(w) |
| return r, nil |
| } |
| |
| // Write writes the same data to all registered destinations. |
| // This method is thread-safe. |
| func (o *Output) Write(p []byte) (n int, err error) { |
| o.Lock() |
| defer o.Unlock() |
| o.used = true |
| var firstErr error |
| for _, dst := range o.dests { |
| _, err := dst.Write(p) |
| if err != nil && firstErr == nil { |
| firstErr = err |
| } |
| } |
| return len(p), firstErr |
| } |
| |
| // Close unregisters all destinations and waits for all background |
| // AddTail and AddString tasks to complete. |
| // The Close method of each destination is called if it exists. |
| func (o *Output) Close() error { |
| o.Lock() |
| defer o.Unlock() |
| var firstErr error |
| for _, dst := range o.dests { |
| if closer, ok := dst.(io.Closer); ok { |
| err := closer.Close() |
| if err != nil && firstErr == nil { |
| firstErr = err |
| } |
| } |
| } |
| o.tasks.Wait() |
| return firstErr |
| } |
| |
| type Input struct { |
| src io.Reader |
| sync.Mutex |
| } |
| |
| // NewInput returns a new Input object with no source attached. |
| // Reading to an empty Input will return io.EOF. |
| func NewInput() *Input { |
| return &Input{} |
| } |
| |
| // Read reads from the input in a thread-safe way. |
| func (i *Input) Read(p []byte) (n int, err error) { |
| i.Mutex.Lock() |
| defer i.Mutex.Unlock() |
| if i.src == nil { |
| return 0, io.EOF |
| } |
| return i.src.Read(p) |
| } |
| |
| // Closes the src |
| // Not thread safe on purpose |
| func (i *Input) Close() error { |
| if i.src != nil { |
| if closer, ok := i.src.(io.Closer); ok { |
| return closer.Close() |
| } |
| } |
| return nil |
| } |
| |
| // Add attaches a new source to the input. |
| // Add can only be called once per input. Subsequent calls will |
| // return an error. |
| func (i *Input) Add(src io.Reader) error { |
| i.Mutex.Lock() |
| defer i.Mutex.Unlock() |
| if i.src != nil { |
| return fmt.Errorf("Maximum number of sources reached: 1") |
| } |
| i.src = src |
| return nil |
| } |
| |
| // AddEnv starts a new goroutine which will decode all subsequent data |
| // as a stream of json-encoded objects, and point `dst` to the last |
| // decoded object. |
| // The result `env` can be queried using the type-neutral Env interface. |
| // It is not safe to query `env` until the Output is closed. |
| func (o *Output) AddEnv() (dst *Env, err error) { |
| src, err := o.AddPipe() |
| if err != nil { |
| return nil, err |
| } |
| dst = &Env{} |
| o.tasks.Add(1) |
| go func() { |
| defer o.tasks.Done() |
| decoder := NewDecoder(src) |
| for { |
| env, err := decoder.Decode() |
| if err != nil { |
| return |
| } |
| *dst = *env |
| } |
| }() |
| return dst, nil |
| } |
| |
| func (o *Output) AddListTable() (dst *Table, err error) { |
| src, err := o.AddPipe() |
| if err != nil { |
| return nil, err |
| } |
| dst = NewTable("", 0) |
| o.tasks.Add(1) |
| go func() { |
| defer o.tasks.Done() |
| content, err := ioutil.ReadAll(src) |
| if err != nil { |
| return |
| } |
| if _, err := dst.ReadListFrom(content); err != nil { |
| return |
| } |
| }() |
| return dst, nil |
| } |
| |
| func (o *Output) AddTable() (dst *Table, err error) { |
| src, err := o.AddPipe() |
| if err != nil { |
| return nil, err |
| } |
| dst = NewTable("", 0) |
| o.tasks.Add(1) |
| go func() { |
| defer o.tasks.Done() |
| if _, err := dst.ReadFrom(src); err != nil { |
| return |
| } |
| }() |
| return dst, nil |
| } |