Merge pull request #34162 from cpuguy83/move_logread_logic

Move jsonlog read logic
diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go
index 177c070..7aa92f3 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog.go
@@ -7,7 +7,6 @@
 	"bytes"
 	"encoding/json"
 	"fmt"
-	"io"
 	"strconv"
 	"sync"
 
@@ -24,12 +23,9 @@
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
-	extra []byte // json-encoded extra attributes
-
-	mu      sync.RWMutex
-	buf     *bytes.Buffer // avoids allocating a new buffer on each call to `Log()`
+	mu      sync.Mutex
 	closed  bool
-	writer  *loggerutils.RotateFileWriter
+	writer  *loggerutils.LogFile
 	readers map[*logger.LogWatcher]struct{} // stores the active log followers
 }
 
@@ -65,11 +61,6 @@
 		}
 	}
 
-	writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles)
-	if err != nil {
-		return nil, err
-	}
-
 	var extra []byte
 	attrs, err := info.ExtraAttributes(nil)
 	if err != nil {
@@ -83,33 +74,35 @@
 		}
 	}
 
+	buf := bytes.NewBuffer(nil)
+	marshalFunc := func(msg *logger.Message) ([]byte, error) {
+		if err := marshalMessage(msg, extra, buf); err != nil {
+			return nil, err
+		}
+		b := buf.Bytes()
+		buf.Reset()
+		return b, nil
+	}
+
+	writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc)
+	if err != nil {
+		return nil, err
+	}
+
 	return &JSONFileLogger{
-		buf:     bytes.NewBuffer(nil),
 		writer:  writer,
 		readers: make(map[*logger.LogWatcher]struct{}),
-		extra:   extra,
 	}, nil
 }
 
 // Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
 func (l *JSONFileLogger) Log(msg *logger.Message) error {
 	l.mu.Lock()
-	err := writeMessageBuf(l.writer, msg, l.extra, l.buf)
-	l.buf.Reset()
+	err := l.writer.WriteLogEntry(msg)
 	l.mu.Unlock()
 	return err
 }
 
-func writeMessageBuf(w io.Writer, m *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
-	if err := marshalMessage(m, extra, buf); err != nil {
-		logger.PutMessage(m)
-		return err
-	}
-	logger.PutMessage(m)
-	_, err := w.Write(buf.Bytes())
-	return errors.Wrap(err, "error writing log entry")
-}
-
 func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
 	logLine := msg.Line
 	if !msg.Partial {
diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go
index 2b2b2b5..893c054 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog_test.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go
@@ -82,7 +82,7 @@
 	}
 
 	buf := bytes.NewBuffer(nil)
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+	require.NoError(b, marshalMessage(msg, nil, buf))
 	b.SetBytes(int64(buf.Len()))
 
 	b.ResetTimer()
diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index 09eaaf0..f190e01 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -1,33 +1,45 @@
 package jsonfilelog
 
 import (
-	"bytes"
 	"encoding/json"
-	"fmt"
 	"io"
-	"os"
-	"time"
-
-	"github.com/fsnotify/fsnotify"
-	"golang.org/x/net/context"
 
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
-	"github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
-	"github.com/docker/docker/pkg/filenotify"
-	"github.com/docker/docker/pkg/tailfile"
-	"github.com/pkg/errors"
-	"github.com/sirupsen/logrus"
 )
 
 const maxJSONDecodeRetry = 20000
 
+// ReadLogs implements the logger's LogReader interface for the logs
+// created by this driver.
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
+	logWatcher := logger.NewLogWatcher()
+
+	go l.readLogs(logWatcher, config)
+	return logWatcher
+}
+
+func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
+	defer close(watcher.Msg)
+
+	l.mu.Lock()
+	l.readers[watcher] = struct{}{}
+	l.mu.Unlock()
+
+	l.writer.ReadLogs(config, watcher)
+
+	l.mu.Lock()
+	delete(l.readers, watcher)
+	l.mu.Unlock()
+}
+
 func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
 	l.Reset()
 	if err := dec.Decode(l); err != nil {
 		return nil, err
 	}
+
 	var attrs []backend.LogAttr
 	if len(l.Attrs) != 0 {
 		attrs = make([]backend.LogAttr, 0, len(l.Attrs))
@@ -44,318 +56,34 @@
 	return msg, nil
 }
 
-// ReadLogs implements the logger's LogReader interface for the logs
-// created by this driver.
-func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
-	logWatcher := logger.NewLogWatcher()
-
-	go l.readLogs(logWatcher, config)
-	return logWatcher
-}
-
-func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
-	defer close(logWatcher.Msg)
-
-	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
-	// This will block writes!!!
-	l.mu.RLock()
-
-	// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
-	pth := l.writer.LogPath()
-	var files []io.ReadSeeker
-	for i := l.writer.MaxFiles(); i > 1; i-- {
-		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
-		if err != nil {
-			if !os.IsNotExist(err) {
-				logWatcher.Err <- err
-				l.mu.RUnlock()
-				return
-			}
-			continue
-		}
-		defer f.Close()
-		files = append(files, f)
-	}
-
-	latestFile, err := os.Open(pth)
-	if err != nil {
-		logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
-		l.mu.RUnlock()
-		return
-	}
-	defer latestFile.Close()
-
-	latestChunk, err := newSectionReader(latestFile)
-
-	// Now we have the reader sectioned, all fd's opened, we can unlock.
-	// New writes/rotates will not affect seeking through these files
-	l.mu.RUnlock()
-
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-
-	if config.Tail != 0 {
-		tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
-		tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
-	}
-
-	// close all the rotated files
-	for _, f := range files {
-		if err := f.(io.Closer).Close(); err != nil {
-			logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
-		}
-	}
-
-	if !config.Follow || l.closed {
-		return
-	}
-
-	notifyRotate := l.writer.NotifyRotate()
-	defer l.writer.NotifyRotateEvict(notifyRotate)
-
-	l.mu.Lock()
-	l.readers[logWatcher] = struct{}{}
-	l.mu.Unlock()
-
-	followLogs(latestFile, logWatcher, notifyRotate, config)
-
-	l.mu.Lock()
-	delete(l.readers, logWatcher)
-	l.mu.Unlock()
-}
-
-func newSectionReader(f *os.File) (*io.SectionReader, error) {
-	// seek to the end to get the size
-	// we'll leave this at the end of the file since section reader does not advance the reader
-	size, err := f.Seek(0, os.SEEK_END)
-	if err != nil {
-		return nil, errors.Wrap(err, "error getting current file size")
-	}
-	return io.NewSectionReader(f, 0, size), nil
-}
-
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
-	rdr := io.Reader(f)
-	if tail > 0 {
-		ls, err := tailfile.TailFile(f, tail)
-		if err != nil {
-			logWatcher.Err <- err
-			return
-		}
-		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
-	}
-	dec := json.NewDecoder(rdr)
-	for {
-		msg, err := decodeLogLine(dec, &jsonlog.JSONLog{})
-		if err != nil {
-			if err != io.EOF {
-				logWatcher.Err <- err
-			}
-			return
-		}
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		if !until.IsZero() && msg.Timestamp.After(until) {
-			return
-		}
-		select {
-		case <-logWatcher.WatchClose():
-			return
-		case logWatcher.Msg <- msg:
-		}
-	}
-}
-
-func watchFile(name string) (filenotify.FileWatcher, error) {
-	fileWatcher, err := filenotify.New()
-	if err != nil {
-		return nil, err
-	}
-
-	if err := fileWatcher.Add(name); err != nil {
-		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
-		fileWatcher.Close()
-		fileWatcher = filenotify.NewPollingWatcher()
-
-		if err := fileWatcher.Add(name); err != nil {
-			fileWatcher.Close()
-			logrus.Debugf("error watching log file for modifications: %v", err)
-			return nil, err
-		}
-	}
-	return fileWatcher, nil
-}
-
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
-	dec := json.NewDecoder(f)
+// decodeFunc is used to create a decoder for the log file reader
+func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
 	l := &jsonlog.JSONLog{}
-
-	name := f.Name()
-	fileWatcher, err := watchFile(name)
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-	defer func() {
-		f.Close()
-		fileWatcher.Remove(name)
-		fileWatcher.Close()
-	}()
-
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go func() {
-		select {
-		case <-logWatcher.WatchClose():
-			fileWatcher.Remove(name)
-			cancel()
-		case <-ctx.Done():
-			return
-		}
-	}()
-
-	var retries int
-	handleRotate := func() error {
-		f.Close()
-		fileWatcher.Remove(name)
-
-		// retry when the file doesn't exist
-		for retries := 0; retries <= 5; retries++ {
-			f, err = os.Open(name)
-			if err == nil || !os.IsNotExist(err) {
+	dec := json.NewDecoder(rdr)
+	return func() (msg *logger.Message, err error) {
+		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
+			msg, err = decodeLogLine(dec, l)
+			if err == nil {
 				break
 			}
-		}
-		if err != nil {
-			return err
-		}
-		if err := fileWatcher.Add(name); err != nil {
-			return err
-		}
-		dec = json.NewDecoder(f)
-		return nil
-	}
 
-	errRetry := errors.New("retry")
-	errDone := errors.New("done")
-	waitRead := func() error {
-		select {
-		case e := <-fileWatcher.Events():
-			switch e.Op {
-			case fsnotify.Write:
-				dec = json.NewDecoder(f)
-				return nil
-			case fsnotify.Rename, fsnotify.Remove:
-				select {
-				case <-notifyRotate:
-				case <-ctx.Done():
-					return errDone
-				}
-				if err := handleRotate(); err != nil {
-					return err
-				}
-				return nil
-			}
-			return errRetry
-		case err := <-fileWatcher.Errors():
-			logrus.Debug("logger got error watching file: %v", err)
-			// Something happened, let's try and stay alive and create a new watcher
-			if retries <= 5 {
-				fileWatcher.Close()
-				fileWatcher, err = watchFile(name)
-				if err != nil {
-					return err
-				}
+			// try again, could be due to a an incomplete json object as we read
+			if _, ok := err.(*json.SyntaxError); ok {
+				dec = json.NewDecoder(rdr)
 				retries++
-				return errRetry
+				continue
 			}
-			return err
-		case <-ctx.Done():
-			return errDone
-		}
-	}
 
-	handleDecodeErr := func(err error) error {
-		if err == io.EOF {
-			for {
-				err := waitRead()
-				if err == nil {
-					break
-				}
-				if err == errRetry {
-					continue
-				}
-				return err
-			}
-			return nil
-		}
-		// try again because this shouldn't happen
-		if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
-			dec = json.NewDecoder(f)
-			retries++
-			return nil
-		}
-		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-		// remaining data in the parser's buffer while an io.EOF occurs.
-		// If the json logger writes a partial json log entry to the disk
-		// while at the same time the decoder tries to decode it, the race condition happens.
-		if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
-			reader := io.MultiReader(dec.Buffered(), f)
-			dec = json.NewDecoder(reader)
-			retries++
-			return nil
-		}
-		return err
-	}
-
-	// main loop
-	for {
-		msg, err := decodeLogLine(dec, l)
-		if err != nil {
-			if err := handleDecodeErr(err); err != nil {
-				if err == errDone {
-					return
-				}
-				// we got an unrecoverable error, so return
-				logWatcher.Err <- err
-				return
-			}
-			// ready to try again
-			continue
-		}
-
-		since := config.Since
-		until := config.Until
-
-		retries = 0 // reset retries since we've succeeded
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		if !until.IsZero() && msg.Timestamp.After(until) {
-			return
-		}
-		select {
-		case logWatcher.Msg <- msg:
-		case <-ctx.Done():
-			logWatcher.Msg <- msg
-			// This for loop is used when the logger is closed (ie, container
-			// stopped) but the consumer is still waiting for logs.
-			for {
-				msg, err := decodeLogLine(dec, l)
-				if err != nil {
-					return
-				}
-				if !since.IsZero() && msg.Timestamp.Before(since) {
-					continue
-				}
-				if !until.IsZero() && msg.Timestamp.After(until) {
-					return
-				}
-				logWatcher.Msg <- msg
+			// io.ErrUnexpectedEOF is returned from json.Decoder when there is
+			// remaining data in the parser's buffer while an io.EOF occurs.
+			// If the json logger writes a partial json log entry to the disk
+			// while at the same time the decoder tries to decode it, the race condition happens.
+			if err == io.ErrUnexpectedEOF {
+				reader := io.MultiReader(dec.Buffered(), rdr)
+				dec = json.NewDecoder(reader)
+				retries++
 			}
 		}
+		return msg, err
 	}
 }
diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go
index 01a05c4..599fdf9 100644
--- a/daemon/logger/jsonfilelog/read_test.go
+++ b/daemon/logger/jsonfilelog/read_test.go
@@ -35,7 +35,7 @@
 	}
 
 	buf := bytes.NewBuffer(nil)
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+	require.NoError(b, marshalMessage(msg, nil, buf))
 	b.SetBytes(int64(buf.Len()))
 
 	b.ResetTimer()
diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go
index 6edbf73..dc25beb 100644
--- a/daemon/logger/logger.go
+++ b/daemon/logger/logger.go
@@ -140,3 +140,6 @@
 	// Determines if a log driver can read back logs
 	ReadLogs bool
 }
+
+// MarshalFunc is a func that marshals a message into an arbitrary format
+type MarshalFunc func(*Message) ([]byte, error)
diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go
new file mode 100644
index 0000000..6a7a689
--- /dev/null
+++ b/daemon/logger/loggerutils/logfile.go
@@ -0,0 +1,454 @@
+package loggerutils
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/daemon/logger/loggerutils/multireader"
+	"github.com/docker/docker/pkg/filenotify"
+	"github.com/docker/docker/pkg/pubsub"
+	"github.com/docker/docker/pkg/tailfile"
+	"github.com/fsnotify/fsnotify"
+	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+)
+
+// LogFile is Logger implementation for default Docker logging.
+type LogFile struct {
+	f             *os.File // store for closing
+	closed        bool
+	mu            sync.RWMutex
+	capacity      int64 //maximum size of each file
+	currentSize   int64 // current size of the latest file
+	maxFiles      int   //maximum number of files
+	notifyRotate  *pubsub.Publisher
+	marshal       logger.MarshalFunc
+	createDecoder makeDecoderFunc
+}
+
+type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
+
+//NewLogFile creates new LogFile
+func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
+	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
+	if err != nil {
+		return nil, err
+	}
+
+	size, err := log.Seek(0, os.SEEK_END)
+	if err != nil {
+		return nil, err
+	}
+
+	return &LogFile{
+		f:             log,
+		capacity:      capacity,
+		currentSize:   size,
+		maxFiles:      maxFiles,
+		notifyRotate:  pubsub.NewPublisher(0, 1),
+		marshal:       marshaller,
+		createDecoder: decodeFunc,
+	}, nil
+}
+
+// WriteLogEntry writes the provided log message to the current log file.
+// This may trigger a rotation event if the max file/capacity limits are hit.
+func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
+	b, err := w.marshal(msg)
+	if err != nil {
+		return errors.Wrap(err, "error marshalling log message")
+	}
+
+	logger.PutMessage(msg)
+
+	w.mu.Lock()
+	if w.closed {
+		w.mu.Unlock()
+		return errors.New("cannot write because the output file was closed")
+	}
+
+	if err := w.checkCapacityAndRotate(); err != nil {
+		w.mu.Unlock()
+		return err
+	}
+
+	n, err := w.f.Write(b)
+	if err == nil {
+		w.currentSize += int64(n)
+	}
+	w.mu.Unlock()
+	return err
+}
+
+func (w *LogFile) checkCapacityAndRotate() error {
+	if w.capacity == -1 {
+		return nil
+	}
+
+	if w.currentSize >= w.capacity {
+		name := w.f.Name()
+		if err := w.f.Close(); err != nil {
+			return errors.Wrap(err, "error closing file")
+		}
+		if err := rotate(name, w.maxFiles); err != nil {
+			return err
+		}
+		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
+		if err != nil {
+			return err
+		}
+		w.f = file
+		w.currentSize = 0
+		w.notifyRotate.Publish(struct{}{})
+	}
+
+	return nil
+}
+
+func rotate(name string, maxFiles int) error {
+	if maxFiles < 2 {
+		return nil
+	}
+	for i := maxFiles - 1; i > 1; i-- {
+		toPath := name + "." + strconv.Itoa(i)
+		fromPath := name + "." + strconv.Itoa(i-1)
+		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
+			return errors.Wrap(err, "error rotating old log entries")
+		}
+	}
+
+	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
+		return errors.Wrap(err, "error rotating current log")
+	}
+	return nil
+}
+
+// LogPath returns the location the given writer logs to.
+func (w *LogFile) LogPath() string {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	return w.f.Name()
+}
+
+// MaxFiles return maximum number of files
+func (w *LogFile) MaxFiles() int {
+	return w.maxFiles
+}
+
+// Close closes underlying file and signals all readers to stop.
+func (w *LogFile) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.closed {
+		return nil
+	}
+	if err := w.f.Close(); err != nil {
+		return err
+	}
+	w.closed = true
+	return nil
+}
+
+// ReadLogs decodes entries from log files and sends them the passed in watcher
+func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
+	w.mu.RLock()
+	files, err := w.openRotatedFiles()
+	if err != nil {
+		w.mu.RUnlock()
+		watcher.Err <- err
+		return
+	}
+	defer func() {
+		for _, f := range files {
+			f.Close()
+		}
+	}()
+
+	currentFile, err := os.Open(w.f.Name())
+	if err != nil {
+		w.mu.RUnlock()
+		watcher.Err <- err
+		return
+	}
+	defer currentFile.Close()
+
+	currentChunk, err := newSectionReader(currentFile)
+	w.mu.RUnlock()
+
+	if err != nil {
+		watcher.Err <- err
+		return
+	}
+
+	if config.Tail != 0 {
+		seekers := make([]io.ReadSeeker, 0, len(files)+1)
+		for _, f := range files {
+			seekers = append(seekers, f)
+		}
+		seekers = append(seekers, currentChunk)
+		tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
+	}
+
+	w.mu.RLock()
+	if !config.Follow || w.closed {
+		w.mu.RUnlock()
+		return
+	}
+	w.mu.RUnlock()
+
+	notifyRotate := w.notifyRotate.Subscribe()
+	defer w.notifyRotate.Evict(notifyRotate)
+	followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
+}
+
+func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
+	defer func() {
+		if err == nil {
+			return
+		}
+		for _, f := range files {
+			f.Close()
+		}
+	}()
+
+	for i := w.maxFiles; i > 1; i-- {
+		f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
+		if err != nil {
+			if !os.IsNotExist(err) {
+				return nil, err
+			}
+			continue
+		}
+		files = append(files, f)
+	}
+
+	return files, nil
+}
+
+func newSectionReader(f *os.File) (*io.SectionReader, error) {
+	// seek to the end to get the size
+	// we'll leave this at the end of the file since section reader does not advance the reader
+	size, err := f.Seek(0, os.SEEK_END)
+	if err != nil {
+		return nil, errors.Wrap(err, "error getting current file size")
+	}
+	return io.NewSectionReader(f, 0, size), nil
+}
+
+type decodeFunc func() (*logger.Message, error)
+
+func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
+	var rdr io.Reader = f
+	if config.Tail > 0 {
+		ls, err := tailfile.TailFile(f, config.Tail)
+		if err != nil {
+			watcher.Err <- err
+			return
+		}
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
+	}
+
+	decodeLogLine := createDecoder(rdr)
+	for {
+		msg, err := decodeLogLine()
+		if err != nil {
+			if err != io.EOF {
+				watcher.Err <- err
+			}
+			return
+		}
+		if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
+			continue
+		}
+		if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
+			return
+		}
+		select {
+		case <-watcher.WatchClose():
+			return
+		case watcher.Msg <- msg:
+		}
+	}
+}
+
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
+	decodeLogLine := createDecoder(f)
+
+	name := f.Name()
+	fileWatcher, err := watchFile(name)
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+	defer func() {
+		f.Close()
+		fileWatcher.Remove(name)
+		fileWatcher.Close()
+	}()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	go func() {
+		select {
+		case <-logWatcher.WatchClose():
+			fileWatcher.Remove(name)
+			cancel()
+		case <-ctx.Done():
+			return
+		}
+	}()
+
+	var retries int
+	handleRotate := func() error {
+		f.Close()
+		fileWatcher.Remove(name)
+
+		// retry when the file doesn't exist
+		for retries := 0; retries <= 5; retries++ {
+			f, err = os.Open(name)
+			if err == nil || !os.IsNotExist(err) {
+				break
+			}
+		}
+		if err != nil {
+			return err
+		}
+		if err := fileWatcher.Add(name); err != nil {
+			return err
+		}
+		decodeLogLine = createDecoder(f)
+		return nil
+	}
+
+	errRetry := errors.New("retry")
+	errDone := errors.New("done")
+	waitRead := func() error {
+		select {
+		case e := <-fileWatcher.Events():
+			switch e.Op {
+			case fsnotify.Write:
+				decodeLogLine = createDecoder(f)
+				return nil
+			case fsnotify.Rename, fsnotify.Remove:
+				select {
+				case <-notifyRotate:
+				case <-ctx.Done():
+					return errDone
+				}
+				if err := handleRotate(); err != nil {
+					return err
+				}
+				return nil
+			}
+			return errRetry
+		case err := <-fileWatcher.Errors():
+			logrus.Debug("logger got error watching file: %v", err)
+			// Something happened, let's try and stay alive and create a new watcher
+			if retries <= 5 {
+				fileWatcher.Close()
+				fileWatcher, err = watchFile(name)
+				if err != nil {
+					return err
+				}
+				retries++
+				return errRetry
+			}
+			return err
+		case <-ctx.Done():
+			return errDone
+		}
+	}
+
+	handleDecodeErr := func(err error) error {
+		if err != io.EOF {
+			return err
+		}
+
+		for {
+			err := waitRead()
+			if err == nil {
+				break
+			}
+			if err == errRetry {
+				continue
+			}
+			return err
+		}
+		return nil
+	}
+
+	// main loop
+	for {
+		msg, err := decodeLogLine()
+		if err != nil {
+			if err := handleDecodeErr(err); err != nil {
+				if err == errDone {
+					return
+				}
+				// we got an unrecoverable error, so return
+				logWatcher.Err <- err
+				return
+			}
+			// ready to try again
+			continue
+		}
+
+		retries = 0 // reset retries since we've succeeded
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		if !until.IsZero() && msg.Timestamp.After(until) {
+			return
+		}
+		select {
+		case logWatcher.Msg <- msg:
+		case <-ctx.Done():
+			logWatcher.Msg <- msg
+			for {
+				msg, err := decodeLogLine()
+				if err != nil {
+					return
+				}
+				if !since.IsZero() && msg.Timestamp.Before(since) {
+					continue
+				}
+				if !until.IsZero() && msg.Timestamp.After(until) {
+					return
+				}
+				logWatcher.Msg <- msg
+			}
+		}
+	}
+}
+
+func watchFile(name string) (filenotify.FileWatcher, error) {
+	fileWatcher, err := filenotify.New()
+	if err != nil {
+		return nil, err
+	}
+
+	logger := logrus.WithFields(logrus.Fields{
+		"module": "logger",
+		"fille":  name,
+	})
+
+	if err := fileWatcher.Add(name); err != nil {
+		logger.WithError(err).Warnf("falling back to file poller")
+		fileWatcher.Close()
+		fileWatcher = filenotify.NewPollingWatcher()
+
+		if err := fileWatcher.Add(name); err != nil {
+			fileWatcher.Close()
+			logger.WithError(err).Debugf("error watching log file for modifications")
+			return nil, err
+		}
+	}
+	return fileWatcher, nil
+}
diff --git a/daemon/logger/jsonfilelog/multireader/multireader.go b/daemon/logger/loggerutils/multireader/multireader.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader.go
rename to daemon/logger/loggerutils/multireader/multireader.go
diff --git a/daemon/logger/jsonfilelog/multireader/multireader_test.go b/daemon/logger/loggerutils/multireader/multireader_test.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader_test.go
rename to daemon/logger/loggerutils/multireader/multireader_test.go
diff --git a/daemon/logger/loggerutils/rotatefilewriter.go b/daemon/logger/loggerutils/rotatefilewriter.go
deleted file mode 100644
index 457a39b..0000000
--- a/daemon/logger/loggerutils/rotatefilewriter.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package loggerutils
-
-import (
-	"errors"
-	"os"
-	"strconv"
-	"sync"
-
-	"github.com/docker/docker/pkg/pubsub"
-)
-
-// RotateFileWriter is Logger implementation for default Docker logging.
-type RotateFileWriter struct {
-	f            *os.File // store for closing
-	closed       bool
-	mu           sync.Mutex
-	capacity     int64 //maximum size of each file
-	currentSize  int64 // current size of the latest file
-	maxFiles     int   //maximum number of files
-	notifyRotate *pubsub.Publisher
-}
-
-//NewRotateFileWriter creates new RotateFileWriter
-func NewRotateFileWriter(logPath string, capacity int64, maxFiles int) (*RotateFileWriter, error) {
-	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
-	if err != nil {
-		return nil, err
-	}
-
-	size, err := log.Seek(0, os.SEEK_END)
-	if err != nil {
-		return nil, err
-	}
-
-	return &RotateFileWriter{
-		f:            log,
-		capacity:     capacity,
-		currentSize:  size,
-		maxFiles:     maxFiles,
-		notifyRotate: pubsub.NewPublisher(0, 1),
-	}, nil
-}
-
-//WriteLog write log message to File
-func (w *RotateFileWriter) Write(message []byte) (int, error) {
-	w.mu.Lock()
-	if w.closed {
-		w.mu.Unlock()
-		return -1, errors.New("cannot write because the output file was closed")
-	}
-	if err := w.checkCapacityAndRotate(); err != nil {
-		w.mu.Unlock()
-		return -1, err
-	}
-
-	n, err := w.f.Write(message)
-	if err == nil {
-		w.currentSize += int64(n)
-	}
-	w.mu.Unlock()
-	return n, err
-}
-
-func (w *RotateFileWriter) checkCapacityAndRotate() error {
-	if w.capacity == -1 {
-		return nil
-	}
-
-	if w.currentSize >= w.capacity {
-		name := w.f.Name()
-		if err := w.f.Close(); err != nil {
-			return err
-		}
-		if err := rotate(name, w.maxFiles); err != nil {
-			return err
-		}
-		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
-		if err != nil {
-			return err
-		}
-		w.f = file
-		w.currentSize = 0
-		w.notifyRotate.Publish(struct{}{})
-	}
-
-	return nil
-}
-
-func rotate(name string, maxFiles int) error {
-	if maxFiles < 2 {
-		return nil
-	}
-	for i := maxFiles - 1; i > 1; i-- {
-		toPath := name + "." + strconv.Itoa(i)
-		fromPath := name + "." + strconv.Itoa(i-1)
-		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
-			return err
-		}
-	}
-
-	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
-		return err
-	}
-	return nil
-}
-
-// LogPath returns the location the given writer logs to.
-func (w *RotateFileWriter) LogPath() string {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	return w.f.Name()
-}
-
-// MaxFiles return maximum number of files
-func (w *RotateFileWriter) MaxFiles() int {
-	return w.maxFiles
-}
-
-//NotifyRotate returns the new subscriber
-func (w *RotateFileWriter) NotifyRotate() chan interface{} {
-	return w.notifyRotate.Subscribe()
-}
-
-//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
-func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
-	w.notifyRotate.Evict(sub)
-}
-
-// Close closes underlying file and signals all readers to stop.
-func (w *RotateFileWriter) Close() error {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	if w.closed {
-		return nil
-	}
-	if err := w.f.Close(); err != nil {
-		return err
-	}
-	w.closed = true
-	return nil
-}
diff --git a/daemon/logs.go b/daemon/logs.go
index babf07e..131360b 100644
--- a/daemon/logs.go
+++ b/daemon/logs.go
@@ -123,7 +123,7 @@
 				}
 				return
 			case <-ctx.Done():
-				lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
+				lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
 				return
 			case msg, ok := <-logs.Msg:
 				// there is some kind of pool or ring buffer in the logger that