Merge pull request #34162 from cpuguy83/move_logread_logic
Move jsonlog read logic
diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go
index 177c070..7aa92f3 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog.go
@@ -7,7 +7,6 @@
"bytes"
"encoding/json"
"fmt"
- "io"
"strconv"
"sync"
@@ -24,12 +23,9 @@
// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
- extra []byte // json-encoded extra attributes
-
- mu sync.RWMutex
- buf *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
+ mu sync.Mutex
closed bool
- writer *loggerutils.RotateFileWriter
+ writer *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
}
@@ -65,11 +61,6 @@
}
}
- writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles)
- if err != nil {
- return nil, err
- }
-
var extra []byte
attrs, err := info.ExtraAttributes(nil)
if err != nil {
@@ -83,33 +74,35 @@
}
}
+ buf := bytes.NewBuffer(nil)
+ marshalFunc := func(msg *logger.Message) ([]byte, error) {
+ if err := marshalMessage(msg, extra, buf); err != nil {
+ return nil, err
+ }
+ b := buf.Bytes()
+ buf.Reset()
+ return b, nil
+ }
+
+ writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc)
+ if err != nil {
+ return nil, err
+ }
+
return &JSONFileLogger{
- buf: bytes.NewBuffer(nil),
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
- extra: extra,
}, nil
}
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
func (l *JSONFileLogger) Log(msg *logger.Message) error {
l.mu.Lock()
- err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
- l.buf.Reset()
+ err := l.writer.WriteLogEntry(msg)
l.mu.Unlock()
return err
}
-func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
- if err := marshalMessage(m, extra, buf); err != nil {
- logger.PutMessage(m)
- return err
- }
- logger.PutMessage(m)
- _, err := w.Write(buf.Bytes())
- return errors.Wrap(err, "error writing log entry")
-}
-
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
logLine := msg.Line
if !msg.Partial {
diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go
index 2b2b2b5..893c054 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog_test.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go
@@ -82,7 +82,7 @@
}
buf := bytes.NewBuffer(nil)
- require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+ require.NoError(b, marshalMessage(msg, nil, buf))
b.SetBytes(int64(buf.Len()))
b.ResetTimer()
diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index 09eaaf0..f190e01 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -1,33 +1,45 @@
package jsonfilelog
import (
- "bytes"
"encoding/json"
- "fmt"
"io"
- "os"
- "time"
-
- "github.com/fsnotify/fsnotify"
- "golang.org/x/net/context"
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
- "github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
- "github.com/docker/docker/pkg/filenotify"
- "github.com/docker/docker/pkg/tailfile"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
)
const maxJSONDecodeRetry = 20000
+// ReadLogs implements the logger's LogReader interface for the logs
+// created by this driver.
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
+ logWatcher := logger.NewLogWatcher()
+
+ go l.readLogs(logWatcher, config)
+ return logWatcher
+}
+
+func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
+ defer close(watcher.Msg)
+
+ l.mu.Lock()
+ l.readers[watcher] = struct{}{}
+ l.mu.Unlock()
+
+ l.writer.ReadLogs(config, watcher)
+
+ l.mu.Lock()
+ delete(l.readers, watcher)
+ l.mu.Unlock()
+}
+
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
return nil, err
}
+
var attrs []backend.LogAttr
if len(l.Attrs) != 0 {
attrs = make([]backend.LogAttr, 0, len(l.Attrs))
@@ -44,318 +56,34 @@
return msg, nil
}
-// ReadLogs implements the logger's LogReader interface for the logs
-// created by this driver.
-func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- logWatcher := logger.NewLogWatcher()
-
- go l.readLogs(logWatcher, config)
- return logWatcher
-}
-
-func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
- defer close(logWatcher.Msg)
-
- // lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
- // This will block writes!!!
- l.mu.RLock()
-
- // TODO it would be nice to move a lot of this reader implementation to the rotate logger object
- pth := l.writer.LogPath()
- var files []io.ReadSeeker
- for i := l.writer.MaxFiles(); i > 1; i-- {
- f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
- if err != nil {
- if !os.IsNotExist(err) {
- logWatcher.Err <- err
- l.mu.RUnlock()
- return
- }
- continue
- }
- defer f.Close()
- files = append(files, f)
- }
-
- latestFile, err := os.Open(pth)
- if err != nil {
- logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
- l.mu.RUnlock()
- return
- }
- defer latestFile.Close()
-
- latestChunk, err := newSectionReader(latestFile)
-
- // Now we have the reader sectioned, all fd's opened, we can unlock.
- // New writes/rotates will not affect seeking through these files
- l.mu.RUnlock()
-
- if err != nil {
- logWatcher.Err <- err
- return
- }
-
- if config.Tail != 0 {
- tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
- tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
- }
-
- // close all the rotated files
- for _, f := range files {
- if err := f.(io.Closer).Close(); err != nil {
- logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
- }
- }
-
- if !config.Follow || l.closed {
- return
- }
-
- notifyRotate := l.writer.NotifyRotate()
- defer l.writer.NotifyRotateEvict(notifyRotate)
-
- l.mu.Lock()
- l.readers[logWatcher] = struct{}{}
- l.mu.Unlock()
-
- followLogs(latestFile, logWatcher, notifyRotate, config)
-
- l.mu.Lock()
- delete(l.readers, logWatcher)
- l.mu.Unlock()
-}
-
-func newSectionReader(f *os.File) (*io.SectionReader, error) {
- // seek to the end to get the size
- // we'll leave this at the end of the file since section reader does not advance the reader
- size, err := f.Seek(0, os.SEEK_END)
- if err != nil {
- return nil, errors.Wrap(err, "error getting current file size")
- }
- return io.NewSectionReader(f, 0, size), nil
-}
-
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
- rdr := io.Reader(f)
- if tail > 0 {
- ls, err := tailfile.TailFile(f, tail)
- if err != nil {
- logWatcher.Err <- err
- return
- }
- rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
- }
- dec := json.NewDecoder(rdr)
- for {
- msg, err := decodeLogLine(dec, &jsonlog.JSONLog{})
- if err != nil {
- if err != io.EOF {
- logWatcher.Err <- err
- }
- return
- }
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- if !until.IsZero() && msg.Timestamp.After(until) {
- return
- }
- select {
- case <-logWatcher.WatchClose():
- return
- case logWatcher.Msg <- msg:
- }
- }
-}
-
-func watchFile(name string) (filenotify.FileWatcher, error) {
- fileWatcher, err := filenotify.New()
- if err != nil {
- return nil, err
- }
-
- if err := fileWatcher.Add(name); err != nil {
- logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
- fileWatcher.Close()
- fileWatcher = filenotify.NewPollingWatcher()
-
- if err := fileWatcher.Add(name); err != nil {
- fileWatcher.Close()
- logrus.Debugf("error watching log file for modifications: %v", err)
- return nil, err
- }
- }
- return fileWatcher, nil
-}
-
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
- dec := json.NewDecoder(f)
+// decodeFunc is used to create a decoder for the log file reader
+func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
l := &jsonlog.JSONLog{}
-
- name := f.Name()
- fileWatcher, err := watchFile(name)
- if err != nil {
- logWatcher.Err <- err
- return
- }
- defer func() {
- f.Close()
- fileWatcher.Remove(name)
- fileWatcher.Close()
- }()
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go func() {
- select {
- case <-logWatcher.WatchClose():
- fileWatcher.Remove(name)
- cancel()
- case <-ctx.Done():
- return
- }
- }()
-
- var retries int
- handleRotate := func() error {
- f.Close()
- fileWatcher.Remove(name)
-
- // retry when the file doesn't exist
- for retries := 0; retries <= 5; retries++ {
- f, err = os.Open(name)
- if err == nil || !os.IsNotExist(err) {
+ dec := json.NewDecoder(rdr)
+ return func() (msg *logger.Message, err error) {
+ for retries := 0; retries < maxJSONDecodeRetry; retries++ {
+ msg, err = decodeLogLine(dec, l)
+ if err == nil {
break
}
- }
- if err != nil {
- return err
- }
- if err := fileWatcher.Add(name); err != nil {
- return err
- }
- dec = json.NewDecoder(f)
- return nil
- }
- errRetry := errors.New("retry")
- errDone := errors.New("done")
- waitRead := func() error {
- select {
- case e := <-fileWatcher.Events():
- switch e.Op {
- case fsnotify.Write:
- dec = json.NewDecoder(f)
- return nil
- case fsnotify.Rename, fsnotify.Remove:
- select {
- case <-notifyRotate:
- case <-ctx.Done():
- return errDone
- }
- if err := handleRotate(); err != nil {
- return err
- }
- return nil
- }
- return errRetry
- case err := <-fileWatcher.Errors():
- logrus.Debug("logger got error watching file: %v", err)
- // Something happened, let's try and stay alive and create a new watcher
- if retries <= 5 {
- fileWatcher.Close()
- fileWatcher, err = watchFile(name)
- if err != nil {
- return err
- }
+ // try again, could be due to a an incomplete json object as we read
+ if _, ok := err.(*json.SyntaxError); ok {
+ dec = json.NewDecoder(rdr)
retries++
- return errRetry
+ continue
}
- return err
- case <-ctx.Done():
- return errDone
- }
- }
- handleDecodeErr := func(err error) error {
- if err == io.EOF {
- for {
- err := waitRead()
- if err == nil {
- break
- }
- if err == errRetry {
- continue
- }
- return err
- }
- return nil
- }
- // try again because this shouldn't happen
- if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
- dec = json.NewDecoder(f)
- retries++
- return nil
- }
- // io.ErrUnexpectedEOF is returned from json.Decoder when there is
- // remaining data in the parser's buffer while an io.EOF occurs.
- // If the json logger writes a partial json log entry to the disk
- // while at the same time the decoder tries to decode it, the race condition happens.
- if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
- reader := io.MultiReader(dec.Buffered(), f)
- dec = json.NewDecoder(reader)
- retries++
- return nil
- }
- return err
- }
-
- // main loop
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- if err := handleDecodeErr(err); err != nil {
- if err == errDone {
- return
- }
- // we got an unrecoverable error, so return
- logWatcher.Err <- err
- return
- }
- // ready to try again
- continue
- }
-
- since := config.Since
- until := config.Until
-
- retries = 0 // reset retries since we've succeeded
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- if !until.IsZero() && msg.Timestamp.After(until) {
- return
- }
- select {
- case logWatcher.Msg <- msg:
- case <-ctx.Done():
- logWatcher.Msg <- msg
- // This for loop is used when the logger is closed (ie, container
- // stopped) but the consumer is still waiting for logs.
- for {
- msg, err := decodeLogLine(dec, l)
- if err != nil {
- return
- }
- if !since.IsZero() && msg.Timestamp.Before(since) {
- continue
- }
- if !until.IsZero() && msg.Timestamp.After(until) {
- return
- }
- logWatcher.Msg <- msg
+ // io.ErrUnexpectedEOF is returned from json.Decoder when there is
+ // remaining data in the parser's buffer while an io.EOF occurs.
+ // If the json logger writes a partial json log entry to the disk
+ // while at the same time the decoder tries to decode it, the race condition happens.
+ if err == io.ErrUnexpectedEOF {
+ reader := io.MultiReader(dec.Buffered(), rdr)
+ dec = json.NewDecoder(reader)
+ retries++
}
}
+ return msg, err
}
}
diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go
index 01a05c4..599fdf9 100644
--- a/daemon/logger/jsonfilelog/read_test.go
+++ b/daemon/logger/jsonfilelog/read_test.go
@@ -35,7 +35,7 @@
}
buf := bytes.NewBuffer(nil)
- require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+ require.NoError(b, marshalMessage(msg, nil, buf))
b.SetBytes(int64(buf.Len()))
b.ResetTimer()
diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go
index 6edbf73..dc25beb 100644
--- a/daemon/logger/logger.go
+++ b/daemon/logger/logger.go
@@ -140,3 +140,6 @@
// Determines if a log driver can read back logs
ReadLogs bool
}
+
+// MarshalFunc is a func that marshals a message into an arbitrary format
+type MarshalFunc func(*Message) ([]byte, error)
diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go
new file mode 100644
index 0000000..6a7a689
--- /dev/null
+++ b/daemon/logger/loggerutils/logfile.go
@@ -0,0 +1,454 @@
+package loggerutils
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/docker/docker/daemon/logger"
+ "github.com/docker/docker/daemon/logger/loggerutils/multireader"
+ "github.com/docker/docker/pkg/filenotify"
+ "github.com/docker/docker/pkg/pubsub"
+ "github.com/docker/docker/pkg/tailfile"
+ "github.com/fsnotify/fsnotify"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+)
+
+// LogFile is Logger implementation for default Docker logging.
+type LogFile struct {
+ f *os.File // store for closing
+ closed bool
+ mu sync.RWMutex
+ capacity int64 //maximum size of each file
+ currentSize int64 // current size of the latest file
+ maxFiles int //maximum number of files
+ notifyRotate *pubsub.Publisher
+ marshal logger.MarshalFunc
+ createDecoder makeDecoderFunc
+}
+
+type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
+
+//NewLogFile creates new LogFile
+func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
+ log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
+ if err != nil {
+ return nil, err
+ }
+
+ size, err := log.Seek(0, os.SEEK_END)
+ if err != nil {
+ return nil, err
+ }
+
+ return &LogFile{
+ f: log,
+ capacity: capacity,
+ currentSize: size,
+ maxFiles: maxFiles,
+ notifyRotate: pubsub.NewPublisher(0, 1),
+ marshal: marshaller,
+ createDecoder: decodeFunc,
+ }, nil
+}
+
+// WriteLogEntry writes the provided log message to the current log file.
+// This may trigger a rotation event if the max file/capacity limits are hit.
+func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
+ b, err := w.marshal(msg)
+ if err != nil {
+ return errors.Wrap(err, "error marshalling log message")
+ }
+
+ logger.PutMessage(msg)
+
+ w.mu.Lock()
+ if w.closed {
+ w.mu.Unlock()
+ return errors.New("cannot write because the output file was closed")
+ }
+
+ if err := w.checkCapacityAndRotate(); err != nil {
+ w.mu.Unlock()
+ return err
+ }
+
+ n, err := w.f.Write(b)
+ if err == nil {
+ w.currentSize += int64(n)
+ }
+ w.mu.Unlock()
+ return err
+}
+
+func (w *LogFile) checkCapacityAndRotate() error {
+ if w.capacity == -1 {
+ return nil
+ }
+
+ if w.currentSize >= w.capacity {
+ name := w.f.Name()
+ if err := w.f.Close(); err != nil {
+ return errors.Wrap(err, "error closing file")
+ }
+ if err := rotate(name, w.maxFiles); err != nil {
+ return err
+ }
+ file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
+ if err != nil {
+ return err
+ }
+ w.f = file
+ w.currentSize = 0
+ w.notifyRotate.Publish(struct{}{})
+ }
+
+ return nil
+}
+
+func rotate(name string, maxFiles int) error {
+ if maxFiles < 2 {
+ return nil
+ }
+ for i := maxFiles - 1; i > 1; i-- {
+ toPath := name + "." + strconv.Itoa(i)
+ fromPath := name + "." + strconv.Itoa(i-1)
+ if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
+ return errors.Wrap(err, "error rotating old log entries")
+ }
+ }
+
+ if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
+ return errors.Wrap(err, "error rotating current log")
+ }
+ return nil
+}
+
+// LogPath returns the location the given writer logs to.
+func (w *LogFile) LogPath() string {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ return w.f.Name()
+}
+
+// MaxFiles return maximum number of files
+func (w *LogFile) MaxFiles() int {
+ return w.maxFiles
+}
+
+// Close closes underlying file and signals all readers to stop.
+func (w *LogFile) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if w.closed {
+ return nil
+ }
+ if err := w.f.Close(); err != nil {
+ return err
+ }
+ w.closed = true
+ return nil
+}
+
+// ReadLogs decodes entries from log files and sends them the passed in watcher
+func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
+ w.mu.RLock()
+ files, err := w.openRotatedFiles()
+ if err != nil {
+ w.mu.RUnlock()
+ watcher.Err <- err
+ return
+ }
+ defer func() {
+ for _, f := range files {
+ f.Close()
+ }
+ }()
+
+ currentFile, err := os.Open(w.f.Name())
+ if err != nil {
+ w.mu.RUnlock()
+ watcher.Err <- err
+ return
+ }
+ defer currentFile.Close()
+
+ currentChunk, err := newSectionReader(currentFile)
+ w.mu.RUnlock()
+
+ if err != nil {
+ watcher.Err <- err
+ return
+ }
+
+ if config.Tail != 0 {
+ seekers := make([]io.ReadSeeker, 0, len(files)+1)
+ for _, f := range files {
+ seekers = append(seekers, f)
+ }
+ seekers = append(seekers, currentChunk)
+ tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
+ }
+
+ w.mu.RLock()
+ if !config.Follow || w.closed {
+ w.mu.RUnlock()
+ return
+ }
+ w.mu.RUnlock()
+
+ notifyRotate := w.notifyRotate.Subscribe()
+ defer w.notifyRotate.Evict(notifyRotate)
+ followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
+}
+
+func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
+ defer func() {
+ if err == nil {
+ return
+ }
+ for _, f := range files {
+ f.Close()
+ }
+ }()
+
+ for i := w.maxFiles; i > 1; i-- {
+ f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
+ if err != nil {
+ if !os.IsNotExist(err) {
+ return nil, err
+ }
+ continue
+ }
+ files = append(files, f)
+ }
+
+ return files, nil
+}
+
+func newSectionReader(f *os.File) (*io.SectionReader, error) {
+ // seek to the end to get the size
+ // we'll leave this at the end of the file since section reader does not advance the reader
+ size, err := f.Seek(0, os.SEEK_END)
+ if err != nil {
+ return nil, errors.Wrap(err, "error getting current file size")
+ }
+ return io.NewSectionReader(f, 0, size), nil
+}
+
+type decodeFunc func() (*logger.Message, error)
+
+func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
+ var rdr io.Reader = f
+ if config.Tail > 0 {
+ ls, err := tailfile.TailFile(f, config.Tail)
+ if err != nil {
+ watcher.Err <- err
+ return
+ }
+ rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
+ }
+
+ decodeLogLine := createDecoder(rdr)
+ for {
+ msg, err := decodeLogLine()
+ if err != nil {
+ if err != io.EOF {
+ watcher.Err <- err
+ }
+ return
+ }
+ if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
+ continue
+ }
+ if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
+ return
+ }
+ select {
+ case <-watcher.WatchClose():
+ return
+ case watcher.Msg <- msg:
+ }
+ }
+}
+
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
+ decodeLogLine := createDecoder(f)
+
+ name := f.Name()
+ fileWatcher, err := watchFile(name)
+ if err != nil {
+ logWatcher.Err <- err
+ return
+ }
+ defer func() {
+ f.Close()
+ fileWatcher.Remove(name)
+ fileWatcher.Close()
+ }()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ select {
+ case <-logWatcher.WatchClose():
+ fileWatcher.Remove(name)
+ cancel()
+ case <-ctx.Done():
+ return
+ }
+ }()
+
+ var retries int
+ handleRotate := func() error {
+ f.Close()
+ fileWatcher.Remove(name)
+
+ // retry when the file doesn't exist
+ for retries := 0; retries <= 5; retries++ {
+ f, err = os.Open(name)
+ if err == nil || !os.IsNotExist(err) {
+ break
+ }
+ }
+ if err != nil {
+ return err
+ }
+ if err := fileWatcher.Add(name); err != nil {
+ return err
+ }
+ decodeLogLine = createDecoder(f)
+ return nil
+ }
+
+ errRetry := errors.New("retry")
+ errDone := errors.New("done")
+ waitRead := func() error {
+ select {
+ case e := <-fileWatcher.Events():
+ switch e.Op {
+ case fsnotify.Write:
+ decodeLogLine = createDecoder(f)
+ return nil
+ case fsnotify.Rename, fsnotify.Remove:
+ select {
+ case <-notifyRotate:
+ case <-ctx.Done():
+ return errDone
+ }
+ if err := handleRotate(); err != nil {
+ return err
+ }
+ return nil
+ }
+ return errRetry
+ case err := <-fileWatcher.Errors():
+ logrus.Debug("logger got error watching file: %v", err)
+ // Something happened, let's try and stay alive and create a new watcher
+ if retries <= 5 {
+ fileWatcher.Close()
+ fileWatcher, err = watchFile(name)
+ if err != nil {
+ return err
+ }
+ retries++
+ return errRetry
+ }
+ return err
+ case <-ctx.Done():
+ return errDone
+ }
+ }
+
+ handleDecodeErr := func(err error) error {
+ if err != io.EOF {
+ return err
+ }
+
+ for {
+ err := waitRead()
+ if err == nil {
+ break
+ }
+ if err == errRetry {
+ continue
+ }
+ return err
+ }
+ return nil
+ }
+
+ // main loop
+ for {
+ msg, err := decodeLogLine()
+ if err != nil {
+ if err := handleDecodeErr(err); err != nil {
+ if err == errDone {
+ return
+ }
+ // we got an unrecoverable error, so return
+ logWatcher.Err <- err
+ return
+ }
+ // ready to try again
+ continue
+ }
+
+ retries = 0 // reset retries since we've succeeded
+ if !since.IsZero() && msg.Timestamp.Before(since) {
+ continue
+ }
+ if !until.IsZero() && msg.Timestamp.After(until) {
+ return
+ }
+ select {
+ case logWatcher.Msg <- msg:
+ case <-ctx.Done():
+ logWatcher.Msg <- msg
+ for {
+ msg, err := decodeLogLine()
+ if err != nil {
+ return
+ }
+ if !since.IsZero() && msg.Timestamp.Before(since) {
+ continue
+ }
+ if !until.IsZero() && msg.Timestamp.After(until) {
+ return
+ }
+ logWatcher.Msg <- msg
+ }
+ }
+ }
+}
+
+func watchFile(name string) (filenotify.FileWatcher, error) {
+ fileWatcher, err := filenotify.New()
+ if err != nil {
+ return nil, err
+ }
+
+ logger := logrus.WithFields(logrus.Fields{
+ "module": "logger",
+ "fille": name,
+ })
+
+ if err := fileWatcher.Add(name); err != nil {
+ logger.WithError(err).Warnf("falling back to file poller")
+ fileWatcher.Close()
+ fileWatcher = filenotify.NewPollingWatcher()
+
+ if err := fileWatcher.Add(name); err != nil {
+ fileWatcher.Close()
+ logger.WithError(err).Debugf("error watching log file for modifications")
+ return nil, err
+ }
+ }
+ return fileWatcher, nil
+}
diff --git a/daemon/logger/jsonfilelog/multireader/multireader.go b/daemon/logger/loggerutils/multireader/multireader.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader.go
rename to daemon/logger/loggerutils/multireader/multireader.go
diff --git a/daemon/logger/jsonfilelog/multireader/multireader_test.go b/daemon/logger/loggerutils/multireader/multireader_test.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader_test.go
rename to daemon/logger/loggerutils/multireader/multireader_test.go
diff --git a/daemon/logger/loggerutils/rotatefilewriter.go b/daemon/logger/loggerutils/rotatefilewriter.go
deleted file mode 100644
index 457a39b..0000000
--- a/daemon/logger/loggerutils/rotatefilewriter.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package loggerutils
-
-import (
- "errors"
- "os"
- "strconv"
- "sync"
-
- "github.com/docker/docker/pkg/pubsub"
-)
-
-// RotateFileWriter is Logger implementation for default Docker logging.
-type RotateFileWriter struct {
- f *os.File // store for closing
- closed bool
- mu sync.Mutex
- capacity int64 //maximum size of each file
- currentSize int64 // current size of the latest file
- maxFiles int //maximum number of files
- notifyRotate *pubsub.Publisher
-}
-
-//NewRotateFileWriter creates new RotateFileWriter
-func NewRotateFileWriter(logPath string, capacity int64, maxFiles int) (*RotateFileWriter, error) {
- log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
- if err != nil {
- return nil, err
- }
-
- size, err := log.Seek(0, os.SEEK_END)
- if err != nil {
- return nil, err
- }
-
- return &RotateFileWriter{
- f: log,
- capacity: capacity,
- currentSize: size,
- maxFiles: maxFiles,
- notifyRotate: pubsub.NewPublisher(0, 1),
- }, nil
-}
-
-//WriteLog write log message to File
-func (w *RotateFileWriter) Write(message []byte) (int, error) {
- w.mu.Lock()
- if w.closed {
- w.mu.Unlock()
- return -1, errors.New("cannot write because the output file was closed")
- }
- if err := w.checkCapacityAndRotate(); err != nil {
- w.mu.Unlock()
- return -1, err
- }
-
- n, err := w.f.Write(message)
- if err == nil {
- w.currentSize += int64(n)
- }
- w.mu.Unlock()
- return n, err
-}
-
-func (w *RotateFileWriter) checkCapacityAndRotate() error {
- if w.capacity == -1 {
- return nil
- }
-
- if w.currentSize >= w.capacity {
- name := w.f.Name()
- if err := w.f.Close(); err != nil {
- return err
- }
- if err := rotate(name, w.maxFiles); err != nil {
- return err
- }
- file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
- if err != nil {
- return err
- }
- w.f = file
- w.currentSize = 0
- w.notifyRotate.Publish(struct{}{})
- }
-
- return nil
-}
-
-func rotate(name string, maxFiles int) error {
- if maxFiles < 2 {
- return nil
- }
- for i := maxFiles - 1; i > 1; i-- {
- toPath := name + "." + strconv.Itoa(i)
- fromPath := name + "." + strconv.Itoa(i-1)
- if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
- return err
- }
- }
-
- if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
- return err
- }
- return nil
-}
-
-// LogPath returns the location the given writer logs to.
-func (w *RotateFileWriter) LogPath() string {
- w.mu.Lock()
- defer w.mu.Unlock()
- return w.f.Name()
-}
-
-// MaxFiles return maximum number of files
-func (w *RotateFileWriter) MaxFiles() int {
- return w.maxFiles
-}
-
-//NotifyRotate returns the new subscriber
-func (w *RotateFileWriter) NotifyRotate() chan interface{} {
- return w.notifyRotate.Subscribe()
-}
-
-//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
-func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
- w.notifyRotate.Evict(sub)
-}
-
-// Close closes underlying file and signals all readers to stop.
-func (w *RotateFileWriter) Close() error {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return nil
- }
- if err := w.f.Close(); err != nil {
- return err
- }
- w.closed = true
- return nil
-}
diff --git a/daemon/logs.go b/daemon/logs.go
index babf07e..131360b 100644
--- a/daemon/logs.go
+++ b/daemon/logs.go
@@ -123,7 +123,7 @@
}
return
case <-ctx.Done():
- lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
+ lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
return
case msg, ok := <-logs.Msg:
// there is some kind of pool or ring buffer in the logger that