| package logger // import "github.com/docker/docker/daemon/logger" |
| |
| import ( |
| "io" |
| "os" |
| "path/filepath" |
| "sync" |
| "time" |
| |
| "github.com/docker/docker/api/types/plugins/logdriver" |
| "github.com/docker/docker/pkg/plugingetter" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| ) |
| |
| // pluginAdapter takes a plugin and implements the Logger interface for logger |
| // instances |
| type pluginAdapter struct { |
| driverName string |
| id string |
| plugin logPlugin |
| fifoPath string |
| capabilities Capability |
| logInfo Info |
| |
| // synchronize access to the log stream and shared buffer |
| mu sync.Mutex |
| enc logdriver.LogEntryEncoder |
| stream io.WriteCloser |
| // buf is shared for each `Log()` call to reduce allocations. |
| // buf must be protected by mutex |
| buf logdriver.LogEntry |
| } |
| |
| func (a *pluginAdapter) Log(msg *Message) error { |
| a.mu.Lock() |
| |
| a.buf.Line = msg.Line |
| a.buf.TimeNano = msg.Timestamp.UnixNano() |
| a.buf.Partial = msg.PLogMetaData != nil |
| a.buf.Source = msg.Source |
| |
| err := a.enc.Encode(&a.buf) |
| a.buf.Reset() |
| |
| a.mu.Unlock() |
| |
| PutMessage(msg) |
| return err |
| } |
| |
| func (a *pluginAdapter) Name() string { |
| return a.driverName |
| } |
| |
| func (a *pluginAdapter) Close() error { |
| a.mu.Lock() |
| defer a.mu.Unlock() |
| |
| if err := a.plugin.StopLogging(filepath.Join("/", "run", "docker", "logging", a.id)); err != nil { |
| return err |
| } |
| |
| if err := a.stream.Close(); err != nil { |
| logrus.WithError(err).Error("error closing plugin fifo") |
| } |
| if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) { |
| logrus.WithError(err).Error("error cleaning up plugin fifo") |
| } |
| |
| // may be nil, especially for unit tests |
| if pluginGetter != nil { |
| pluginGetter.Get(a.Name(), extName, plugingetter.Release) |
| } |
| return nil |
| } |
| |
| type pluginAdapterWithRead struct { |
| *pluginAdapter |
| } |
| |
| func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { |
| watcher := NewLogWatcher() |
| |
| go func() { |
| defer close(watcher.Msg) |
| stream, err := a.plugin.ReadLogs(a.logInfo, config) |
| if err != nil { |
| watcher.Err <- errors.Wrap(err, "error getting log reader") |
| return |
| } |
| defer stream.Close() |
| |
| dec := logdriver.NewLogEntryDecoder(stream) |
| for { |
| select { |
| case <-watcher.WatchClose(): |
| return |
| default: |
| } |
| |
| var buf logdriver.LogEntry |
| if err := dec.Decode(&buf); err != nil { |
| if err == io.EOF { |
| return |
| } |
| select { |
| case watcher.Err <- errors.Wrap(err, "error decoding log message"): |
| case <-watcher.WatchClose(): |
| } |
| return |
| } |
| |
| msg := &Message{ |
| Timestamp: time.Unix(0, buf.TimeNano), |
| Line: buf.Line, |
| Source: buf.Source, |
| } |
| |
| // plugin should handle this, but check just in case |
| if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) { |
| continue |
| } |
| if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { |
| return |
| } |
| |
| select { |
| case watcher.Msg <- msg: |
| case <-watcher.WatchClose(): |
| // make sure the message we consumed is sent |
| watcher.Msg <- msg |
| return |
| } |
| } |
| }() |
| |
| return watcher |
| } |