Move json log reading into log file object

This allows much of the read logic to be shared for other things,
especially for the new log driver proposed in
https://github.com/moby/moby/issues/33475

The only logic for reads in the json logger is around decoding log
messages, which gets passed into the log file object.

This also helps with implementing compression as it allows us to
simplify locking strategies.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go
index 65f19e7..7aa92f3 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog.go
@@ -23,9 +23,9 @@
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
-	mu      sync.RWMutex
+	mu      sync.Mutex
 	closed  bool
-	writer  *loggerutils.RotateFileWriter
+	writer  *loggerutils.LogFile
 	readers map[*logger.LogWatcher]struct{} // stores the active log followers
 }
 
@@ -83,7 +83,8 @@
 		buf.Reset()
 		return b, nil
 	}
-	writer, err := loggerutils.NewRotateFileWriter(info.LogPath, capval, maxFiles, marshalFunc)
+
+	writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, marshalFunc, decodeFunc)
 	if err != nil {
 		return nil, err
 	}
diff --git a/daemon/logger/jsonfilelog/jsonfilelog_test.go b/daemon/logger/jsonfilelog/jsonfilelog_test.go
index 2b2b2b5..893c054 100644
--- a/daemon/logger/jsonfilelog/jsonfilelog_test.go
+++ b/daemon/logger/jsonfilelog/jsonfilelog_test.go
@@ -82,7 +82,7 @@
 	}
 
 	buf := bytes.NewBuffer(nil)
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+	require.NoError(b, marshalMessage(msg, nil, buf))
 	b.SetBytes(int64(buf.Len()))
 
 	b.ResetTimer()
diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
index 09eaaf0..f190e01 100644
--- a/daemon/logger/jsonfilelog/read.go
+++ b/daemon/logger/jsonfilelog/read.go
@@ -1,33 +1,45 @@
 package jsonfilelog
 
 import (
-	"bytes"
 	"encoding/json"
-	"fmt"
 	"io"
-	"os"
-	"time"
-
-	"github.com/fsnotify/fsnotify"
-	"golang.org/x/net/context"
 
 	"github.com/docker/docker/api/types/backend"
 	"github.com/docker/docker/daemon/logger"
 	"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
-	"github.com/docker/docker/daemon/logger/jsonfilelog/multireader"
-	"github.com/docker/docker/pkg/filenotify"
-	"github.com/docker/docker/pkg/tailfile"
-	"github.com/pkg/errors"
-	"github.com/sirupsen/logrus"
 )
 
 const maxJSONDecodeRetry = 20000
 
+// ReadLogs implements the logger's LogReader interface for the logs
+// created by this driver.
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
+	logWatcher := logger.NewLogWatcher()
+
+	go l.readLogs(logWatcher, config)
+	return logWatcher
+}
+
+func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
+	defer close(watcher.Msg)
+
+	l.mu.Lock()
+	l.readers[watcher] = struct{}{}
+	l.mu.Unlock()
+
+	l.writer.ReadLogs(config, watcher)
+
+	l.mu.Lock()
+	delete(l.readers, watcher)
+	l.mu.Unlock()
+}
+
 func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
 	l.Reset()
 	if err := dec.Decode(l); err != nil {
 		return nil, err
 	}
+
 	var attrs []backend.LogAttr
 	if len(l.Attrs) != 0 {
 		attrs = make([]backend.LogAttr, 0, len(l.Attrs))
@@ -44,318 +56,34 @@
 	return msg, nil
 }
 
-// ReadLogs implements the logger's LogReader interface for the logs
-// created by this driver.
-func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
-	logWatcher := logger.NewLogWatcher()
-
-	go l.readLogs(logWatcher, config)
-	return logWatcher
-}
-
-func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
-	defer close(logWatcher.Msg)
-
-	// lock so the read stream doesn't get corrupted due to rotations or other log data written while we open these files
-	// This will block writes!!!
-	l.mu.RLock()
-
-	// TODO it would be nice to move a lot of this reader implementation to the rotate logger object
-	pth := l.writer.LogPath()
-	var files []io.ReadSeeker
-	for i := l.writer.MaxFiles(); i > 1; i-- {
-		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
-		if err != nil {
-			if !os.IsNotExist(err) {
-				logWatcher.Err <- err
-				l.mu.RUnlock()
-				return
-			}
-			continue
-		}
-		defer f.Close()
-		files = append(files, f)
-	}
-
-	latestFile, err := os.Open(pth)
-	if err != nil {
-		logWatcher.Err <- errors.Wrap(err, "error opening latest log file")
-		l.mu.RUnlock()
-		return
-	}
-	defer latestFile.Close()
-
-	latestChunk, err := newSectionReader(latestFile)
-
-	// Now we have the reader sectioned, all fd's opened, we can unlock.
-	// New writes/rotates will not affect seeking through these files
-	l.mu.RUnlock()
-
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-
-	if config.Tail != 0 {
-		tailer := multireader.MultiReadSeeker(append(files, latestChunk)...)
-		tailFile(tailer, logWatcher, config.Tail, config.Since, config.Until)
-	}
-
-	// close all the rotated files
-	for _, f := range files {
-		if err := f.(io.Closer).Close(); err != nil {
-			logrus.WithField("logger", "json-file").Warnf("error closing tailed log file: %v", err)
-		}
-	}
-
-	if !config.Follow || l.closed {
-		return
-	}
-
-	notifyRotate := l.writer.NotifyRotate()
-	defer l.writer.NotifyRotateEvict(notifyRotate)
-
-	l.mu.Lock()
-	l.readers[logWatcher] = struct{}{}
-	l.mu.Unlock()
-
-	followLogs(latestFile, logWatcher, notifyRotate, config)
-
-	l.mu.Lock()
-	delete(l.readers, logWatcher)
-	l.mu.Unlock()
-}
-
-func newSectionReader(f *os.File) (*io.SectionReader, error) {
-	// seek to the end to get the size
-	// we'll leave this at the end of the file since section reader does not advance the reader
-	size, err := f.Seek(0, os.SEEK_END)
-	if err != nil {
-		return nil, errors.Wrap(err, "error getting current file size")
-	}
-	return io.NewSectionReader(f, 0, size), nil
-}
-
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since, until time.Time) {
-	rdr := io.Reader(f)
-	if tail > 0 {
-		ls, err := tailfile.TailFile(f, tail)
-		if err != nil {
-			logWatcher.Err <- err
-			return
-		}
-		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
-	}
-	dec := json.NewDecoder(rdr)
-	for {
-		msg, err := decodeLogLine(dec, &jsonlog.JSONLog{})
-		if err != nil {
-			if err != io.EOF {
-				logWatcher.Err <- err
-			}
-			return
-		}
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		if !until.IsZero() && msg.Timestamp.After(until) {
-			return
-		}
-		select {
-		case <-logWatcher.WatchClose():
-			return
-		case logWatcher.Msg <- msg:
-		}
-	}
-}
-
-func watchFile(name string) (filenotify.FileWatcher, error) {
-	fileWatcher, err := filenotify.New()
-	if err != nil {
-		return nil, err
-	}
-
-	if err := fileWatcher.Add(name); err != nil {
-		logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
-		fileWatcher.Close()
-		fileWatcher = filenotify.NewPollingWatcher()
-
-		if err := fileWatcher.Add(name); err != nil {
-			fileWatcher.Close()
-			logrus.Debugf("error watching log file for modifications: %v", err)
-			return nil, err
-		}
-	}
-	return fileWatcher, nil
-}
-
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, config logger.ReadConfig) {
-	dec := json.NewDecoder(f)
+// decodeFunc is used to create a decoder for the log file reader
+func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
 	l := &jsonlog.JSONLog{}
-
-	name := f.Name()
-	fileWatcher, err := watchFile(name)
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-	defer func() {
-		f.Close()
-		fileWatcher.Remove(name)
-		fileWatcher.Close()
-	}()
-
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go func() {
-		select {
-		case <-logWatcher.WatchClose():
-			fileWatcher.Remove(name)
-			cancel()
-		case <-ctx.Done():
-			return
-		}
-	}()
-
-	var retries int
-	handleRotate := func() error {
-		f.Close()
-		fileWatcher.Remove(name)
-
-		// retry when the file doesn't exist
-		for retries := 0; retries <= 5; retries++ {
-			f, err = os.Open(name)
-			if err == nil || !os.IsNotExist(err) {
+	dec := json.NewDecoder(rdr)
+	return func() (msg *logger.Message, err error) {
+		for retries := 0; retries < maxJSONDecodeRetry; retries++ {
+			msg, err = decodeLogLine(dec, l)
+			if err == nil {
 				break
 			}
-		}
-		if err != nil {
-			return err
-		}
-		if err := fileWatcher.Add(name); err != nil {
-			return err
-		}
-		dec = json.NewDecoder(f)
-		return nil
-	}
 
-	errRetry := errors.New("retry")
-	errDone := errors.New("done")
-	waitRead := func() error {
-		select {
-		case e := <-fileWatcher.Events():
-			switch e.Op {
-			case fsnotify.Write:
-				dec = json.NewDecoder(f)
-				return nil
-			case fsnotify.Rename, fsnotify.Remove:
-				select {
-				case <-notifyRotate:
-				case <-ctx.Done():
-					return errDone
-				}
-				if err := handleRotate(); err != nil {
-					return err
-				}
-				return nil
-			}
-			return errRetry
-		case err := <-fileWatcher.Errors():
-			logrus.Debug("logger got error watching file: %v", err)
-			// Something happened, let's try and stay alive and create a new watcher
-			if retries <= 5 {
-				fileWatcher.Close()
-				fileWatcher, err = watchFile(name)
-				if err != nil {
-					return err
-				}
+			// try again, could be due to a an incomplete json object as we read
+			if _, ok := err.(*json.SyntaxError); ok {
+				dec = json.NewDecoder(rdr)
 				retries++
-				return errRetry
+				continue
 			}
-			return err
-		case <-ctx.Done():
-			return errDone
-		}
-	}
 
-	handleDecodeErr := func(err error) error {
-		if err == io.EOF {
-			for {
-				err := waitRead()
-				if err == nil {
-					break
-				}
-				if err == errRetry {
-					continue
-				}
-				return err
-			}
-			return nil
-		}
-		// try again because this shouldn't happen
-		if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
-			dec = json.NewDecoder(f)
-			retries++
-			return nil
-		}
-		// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-		// remaining data in the parser's buffer while an io.EOF occurs.
-		// If the json logger writes a partial json log entry to the disk
-		// while at the same time the decoder tries to decode it, the race condition happens.
-		if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
-			reader := io.MultiReader(dec.Buffered(), f)
-			dec = json.NewDecoder(reader)
-			retries++
-			return nil
-		}
-		return err
-	}
-
-	// main loop
-	for {
-		msg, err := decodeLogLine(dec, l)
-		if err != nil {
-			if err := handleDecodeErr(err); err != nil {
-				if err == errDone {
-					return
-				}
-				// we got an unrecoverable error, so return
-				logWatcher.Err <- err
-				return
-			}
-			// ready to try again
-			continue
-		}
-
-		since := config.Since
-		until := config.Until
-
-		retries = 0 // reset retries since we've succeeded
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		if !until.IsZero() && msg.Timestamp.After(until) {
-			return
-		}
-		select {
-		case logWatcher.Msg <- msg:
-		case <-ctx.Done():
-			logWatcher.Msg <- msg
-			// This for loop is used when the logger is closed (ie, container
-			// stopped) but the consumer is still waiting for logs.
-			for {
-				msg, err := decodeLogLine(dec, l)
-				if err != nil {
-					return
-				}
-				if !since.IsZero() && msg.Timestamp.Before(since) {
-					continue
-				}
-				if !until.IsZero() && msg.Timestamp.After(until) {
-					return
-				}
-				logWatcher.Msg <- msg
+			// io.ErrUnexpectedEOF is returned from json.Decoder when there is
+			// remaining data in the parser's buffer while an io.EOF occurs.
+			// If the json logger writes a partial json log entry to the disk
+			// while at the same time the decoder tries to decode it, the race condition happens.
+			if err == io.ErrUnexpectedEOF {
+				reader := io.MultiReader(dec.Buffered(), rdr)
+				dec = json.NewDecoder(reader)
+				retries++
 			}
 		}
+		return msg, err
 	}
 }
diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go
index 01a05c4..599fdf9 100644
--- a/daemon/logger/jsonfilelog/read_test.go
+++ b/daemon/logger/jsonfilelog/read_test.go
@@ -35,7 +35,7 @@
 	}
 
 	buf := bytes.NewBuffer(nil)
-	require.NoError(b, marshalMessage(msg, jsonlogger.(*JSONFileLogger).extra, buf))
+	require.NoError(b, marshalMessage(msg, nil, buf))
 	b.SetBytes(int64(buf.Len()))
 
 	b.ResetTimer()
diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go
new file mode 100644
index 0000000..6a7a689
--- /dev/null
+++ b/daemon/logger/loggerutils/logfile.go
@@ -0,0 +1,454 @@
+package loggerutils
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/daemon/logger/loggerutils/multireader"
+	"github.com/docker/docker/pkg/filenotify"
+	"github.com/docker/docker/pkg/pubsub"
+	"github.com/docker/docker/pkg/tailfile"
+	"github.com/fsnotify/fsnotify"
+	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+)
+
+// LogFile is Logger implementation for default Docker logging.
+type LogFile struct {
+	f             *os.File // store for closing
+	closed        bool
+	mu            sync.RWMutex
+	capacity      int64 //maximum size of each file
+	currentSize   int64 // current size of the latest file
+	maxFiles      int   //maximum number of files
+	notifyRotate  *pubsub.Publisher
+	marshal       logger.MarshalFunc
+	createDecoder makeDecoderFunc
+}
+
+type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)
+
+//NewLogFile creates new LogFile
+func NewLogFile(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc) (*LogFile, error) {
+	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
+	if err != nil {
+		return nil, err
+	}
+
+	size, err := log.Seek(0, os.SEEK_END)
+	if err != nil {
+		return nil, err
+	}
+
+	return &LogFile{
+		f:             log,
+		capacity:      capacity,
+		currentSize:   size,
+		maxFiles:      maxFiles,
+		notifyRotate:  pubsub.NewPublisher(0, 1),
+		marshal:       marshaller,
+		createDecoder: decodeFunc,
+	}, nil
+}
+
+// WriteLogEntry writes the provided log message to the current log file.
+// This may trigger a rotation event if the max file/capacity limits are hit.
+func (w *LogFile) WriteLogEntry(msg *logger.Message) error {
+	b, err := w.marshal(msg)
+	if err != nil {
+		return errors.Wrap(err, "error marshalling log message")
+	}
+
+	logger.PutMessage(msg)
+
+	w.mu.Lock()
+	if w.closed {
+		w.mu.Unlock()
+		return errors.New("cannot write because the output file was closed")
+	}
+
+	if err := w.checkCapacityAndRotate(); err != nil {
+		w.mu.Unlock()
+		return err
+	}
+
+	n, err := w.f.Write(b)
+	if err == nil {
+		w.currentSize += int64(n)
+	}
+	w.mu.Unlock()
+	return err
+}
+
+func (w *LogFile) checkCapacityAndRotate() error {
+	if w.capacity == -1 {
+		return nil
+	}
+
+	if w.currentSize >= w.capacity {
+		name := w.f.Name()
+		if err := w.f.Close(); err != nil {
+			return errors.Wrap(err, "error closing file")
+		}
+		if err := rotate(name, w.maxFiles); err != nil {
+			return err
+		}
+		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
+		if err != nil {
+			return err
+		}
+		w.f = file
+		w.currentSize = 0
+		w.notifyRotate.Publish(struct{}{})
+	}
+
+	return nil
+}
+
+func rotate(name string, maxFiles int) error {
+	if maxFiles < 2 {
+		return nil
+	}
+	for i := maxFiles - 1; i > 1; i-- {
+		toPath := name + "." + strconv.Itoa(i)
+		fromPath := name + "." + strconv.Itoa(i-1)
+		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
+			return errors.Wrap(err, "error rotating old log entries")
+		}
+	}
+
+	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
+		return errors.Wrap(err, "error rotating current log")
+	}
+	return nil
+}
+
+// LogPath returns the location the given writer logs to.
+func (w *LogFile) LogPath() string {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	return w.f.Name()
+}
+
+// MaxFiles return maximum number of files
+func (w *LogFile) MaxFiles() int {
+	return w.maxFiles
+}
+
+// Close closes underlying file and signals all readers to stop.
+func (w *LogFile) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if w.closed {
+		return nil
+	}
+	if err := w.f.Close(); err != nil {
+		return err
+	}
+	w.closed = true
+	return nil
+}
+
+// ReadLogs decodes entries from log files and sends them the passed in watcher
+func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
+	w.mu.RLock()
+	files, err := w.openRotatedFiles()
+	if err != nil {
+		w.mu.RUnlock()
+		watcher.Err <- err
+		return
+	}
+	defer func() {
+		for _, f := range files {
+			f.Close()
+		}
+	}()
+
+	currentFile, err := os.Open(w.f.Name())
+	if err != nil {
+		w.mu.RUnlock()
+		watcher.Err <- err
+		return
+	}
+	defer currentFile.Close()
+
+	currentChunk, err := newSectionReader(currentFile)
+	w.mu.RUnlock()
+
+	if err != nil {
+		watcher.Err <- err
+		return
+	}
+
+	if config.Tail != 0 {
+		seekers := make([]io.ReadSeeker, 0, len(files)+1)
+		for _, f := range files {
+			seekers = append(seekers, f)
+		}
+		seekers = append(seekers, currentChunk)
+		tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)
+	}
+
+	w.mu.RLock()
+	if !config.Follow || w.closed {
+		w.mu.RUnlock()
+		return
+	}
+	w.mu.RUnlock()
+
+	notifyRotate := w.notifyRotate.Subscribe()
+	defer w.notifyRotate.Evict(notifyRotate)
+	followLogs(currentFile, watcher, notifyRotate, w.createDecoder, config.Since, config.Until)
+}
+
+func (w *LogFile) openRotatedFiles() (files []*os.File, err error) {
+	defer func() {
+		if err == nil {
+			return
+		}
+		for _, f := range files {
+			f.Close()
+		}
+	}()
+
+	for i := w.maxFiles; i > 1; i-- {
+		f, err := os.Open(fmt.Sprintf("%s.%d", w.f.Name(), i-1))
+		if err != nil {
+			if !os.IsNotExist(err) {
+				return nil, err
+			}
+			continue
+		}
+		files = append(files, f)
+	}
+
+	return files, nil
+}
+
+func newSectionReader(f *os.File) (*io.SectionReader, error) {
+	// seek to the end to get the size
+	// we'll leave this at the end of the file since section reader does not advance the reader
+	size, err := f.Seek(0, os.SEEK_END)
+	if err != nil {
+		return nil, errors.Wrap(err, "error getting current file size")
+	}
+	return io.NewSectionReader(f, 0, size), nil
+}
+
+type decodeFunc func() (*logger.Message, error)
+
+func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
+	var rdr io.Reader = f
+	if config.Tail > 0 {
+		ls, err := tailfile.TailFile(f, config.Tail)
+		if err != nil {
+			watcher.Err <- err
+			return
+		}
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
+	}
+
+	decodeLogLine := createDecoder(rdr)
+	for {
+		msg, err := decodeLogLine()
+		if err != nil {
+			if err != io.EOF {
+				watcher.Err <- err
+			}
+			return
+		}
+		if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
+			continue
+		}
+		if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
+			return
+		}
+		select {
+		case <-watcher.WatchClose():
+			return
+		case watcher.Msg <- msg:
+		}
+	}
+}
+
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, createDecoder makeDecoderFunc, since, until time.Time) {
+	decodeLogLine := createDecoder(f)
+
+	name := f.Name()
+	fileWatcher, err := watchFile(name)
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+	defer func() {
+		f.Close()
+		fileWatcher.Remove(name)
+		fileWatcher.Close()
+	}()
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	go func() {
+		select {
+		case <-logWatcher.WatchClose():
+			fileWatcher.Remove(name)
+			cancel()
+		case <-ctx.Done():
+			return
+		}
+	}()
+
+	var retries int
+	handleRotate := func() error {
+		f.Close()
+		fileWatcher.Remove(name)
+
+		// retry when the file doesn't exist
+		for retries := 0; retries <= 5; retries++ {
+			f, err = os.Open(name)
+			if err == nil || !os.IsNotExist(err) {
+				break
+			}
+		}
+		if err != nil {
+			return err
+		}
+		if err := fileWatcher.Add(name); err != nil {
+			return err
+		}
+		decodeLogLine = createDecoder(f)
+		return nil
+	}
+
+	errRetry := errors.New("retry")
+	errDone := errors.New("done")
+	waitRead := func() error {
+		select {
+		case e := <-fileWatcher.Events():
+			switch e.Op {
+			case fsnotify.Write:
+				decodeLogLine = createDecoder(f)
+				return nil
+			case fsnotify.Rename, fsnotify.Remove:
+				select {
+				case <-notifyRotate:
+				case <-ctx.Done():
+					return errDone
+				}
+				if err := handleRotate(); err != nil {
+					return err
+				}
+				return nil
+			}
+			return errRetry
+		case err := <-fileWatcher.Errors():
+			logrus.Debug("logger got error watching file: %v", err)
+			// Something happened, let's try and stay alive and create a new watcher
+			if retries <= 5 {
+				fileWatcher.Close()
+				fileWatcher, err = watchFile(name)
+				if err != nil {
+					return err
+				}
+				retries++
+				return errRetry
+			}
+			return err
+		case <-ctx.Done():
+			return errDone
+		}
+	}
+
+	handleDecodeErr := func(err error) error {
+		if err != io.EOF {
+			return err
+		}
+
+		for {
+			err := waitRead()
+			if err == nil {
+				break
+			}
+			if err == errRetry {
+				continue
+			}
+			return err
+		}
+		return nil
+	}
+
+	// main loop
+	for {
+		msg, err := decodeLogLine()
+		if err != nil {
+			if err := handleDecodeErr(err); err != nil {
+				if err == errDone {
+					return
+				}
+				// we got an unrecoverable error, so return
+				logWatcher.Err <- err
+				return
+			}
+			// ready to try again
+			continue
+		}
+
+		retries = 0 // reset retries since we've succeeded
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		if !until.IsZero() && msg.Timestamp.After(until) {
+			return
+		}
+		select {
+		case logWatcher.Msg <- msg:
+		case <-ctx.Done():
+			logWatcher.Msg <- msg
+			for {
+				msg, err := decodeLogLine()
+				if err != nil {
+					return
+				}
+				if !since.IsZero() && msg.Timestamp.Before(since) {
+					continue
+				}
+				if !until.IsZero() && msg.Timestamp.After(until) {
+					return
+				}
+				logWatcher.Msg <- msg
+			}
+		}
+	}
+}
+
+func watchFile(name string) (filenotify.FileWatcher, error) {
+	fileWatcher, err := filenotify.New()
+	if err != nil {
+		return nil, err
+	}
+
+	logger := logrus.WithFields(logrus.Fields{
+		"module": "logger",
+		"fille":  name,
+	})
+
+	if err := fileWatcher.Add(name); err != nil {
+		logger.WithError(err).Warnf("falling back to file poller")
+		fileWatcher.Close()
+		fileWatcher = filenotify.NewPollingWatcher()
+
+		if err := fileWatcher.Add(name); err != nil {
+			fileWatcher.Close()
+			logger.WithError(err).Debugf("error watching log file for modifications")
+			return nil, err
+		}
+	}
+	return fileWatcher, nil
+}
diff --git a/daemon/logger/jsonfilelog/multireader/multireader.go b/daemon/logger/loggerutils/multireader/multireader.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader.go
rename to daemon/logger/loggerutils/multireader/multireader.go
diff --git a/daemon/logger/jsonfilelog/multireader/multireader_test.go b/daemon/logger/loggerutils/multireader/multireader_test.go
similarity index 100%
rename from daemon/logger/jsonfilelog/multireader/multireader_test.go
rename to daemon/logger/loggerutils/multireader/multireader_test.go
diff --git a/daemon/logger/loggerutils/rotatefilewriter.go b/daemon/logger/loggerutils/rotatefilewriter.go
deleted file mode 100644
index 54c5688..0000000
--- a/daemon/logger/loggerutils/rotatefilewriter.go
+++ /dev/null
@@ -1,153 +0,0 @@
-package loggerutils
-
-import (
-	"os"
-	"strconv"
-	"sync"
-
-	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/pkg/pubsub"
-	"github.com/pkg/errors"
-)
-
-// RotateFileWriter is Logger implementation for default Docker logging.
-type RotateFileWriter struct {
-	f            *os.File // store for closing
-	closed       bool
-	mu           sync.Mutex
-	capacity     int64 //maximum size of each file
-	currentSize  int64 // current size of the latest file
-	maxFiles     int   //maximum number of files
-	notifyRotate *pubsub.Publisher
-	marshal      logger.MarshalFunc
-}
-
-//NewRotateFileWriter creates new RotateFileWriter
-func NewRotateFileWriter(logPath string, capacity int64, maxFiles int, marshaller logger.MarshalFunc) (*RotateFileWriter, error) {
-	log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
-	if err != nil {
-		return nil, err
-	}
-
-	size, err := log.Seek(0, os.SEEK_END)
-	if err != nil {
-		return nil, err
-	}
-
-	return &RotateFileWriter{
-		f:            log,
-		capacity:     capacity,
-		currentSize:  size,
-		maxFiles:     maxFiles,
-		notifyRotate: pubsub.NewPublisher(0, 1),
-		marshal:      marshaller,
-	}, nil
-}
-
-// WriteLogEntry writes the provided log message to the current log file.
-// This may trigger a rotation event if the max file/capacity limits are hit.
-func (w *RotateFileWriter) WriteLogEntry(msg *logger.Message) error {
-	b, err := w.marshal(msg)
-	if err != nil {
-		return errors.Wrap(err, "error marshalling log message")
-	}
-
-	logger.PutMessage(msg)
-
-	w.mu.Lock()
-	if w.closed {
-		w.mu.Unlock()
-		return errors.New("cannot write because the output file was closed")
-	}
-
-	if err := w.checkCapacityAndRotate(); err != nil {
-		w.mu.Unlock()
-		return err
-	}
-
-	n, err := w.f.Write(b)
-	if err == nil {
-		w.currentSize += int64(n)
-	}
-	w.mu.Unlock()
-	return err
-}
-
-func (w *RotateFileWriter) checkCapacityAndRotate() error {
-	if w.capacity == -1 {
-		return nil
-	}
-
-	if w.currentSize >= w.capacity {
-		name := w.f.Name()
-		if err := w.f.Close(); err != nil {
-			return errors.Wrap(err, "error closing file")
-		}
-		if err := rotate(name, w.maxFiles); err != nil {
-			return err
-		}
-		file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
-		if err != nil {
-			return err
-		}
-		w.f = file
-		w.currentSize = 0
-		w.notifyRotate.Publish(struct{}{})
-	}
-
-	return nil
-}
-
-func rotate(name string, maxFiles int) error {
-	if maxFiles < 2 {
-		return nil
-	}
-	for i := maxFiles - 1; i > 1; i-- {
-		toPath := name + "." + strconv.Itoa(i)
-		fromPath := name + "." + strconv.Itoa(i-1)
-		if err := os.Rename(fromPath, toPath); err != nil && !os.IsNotExist(err) {
-			return errors.Wrap(err, "error rotating old log entries")
-		}
-	}
-
-	if err := os.Rename(name, name+".1"); err != nil && !os.IsNotExist(err) {
-		return errors.Wrap(err, "error rotating current log")
-	}
-	return nil
-}
-
-// LogPath returns the location the given writer logs to.
-func (w *RotateFileWriter) LogPath() string {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	return w.f.Name()
-}
-
-// MaxFiles return maximum number of files
-func (w *RotateFileWriter) MaxFiles() int {
-	return w.maxFiles
-}
-
-//NotifyRotate returns the new subscriber
-func (w *RotateFileWriter) NotifyRotate() chan interface{} {
-	return w.notifyRotate.Subscribe()
-}
-
-//NotifyRotateEvict removes the specified subscriber from receiving any more messages.
-func (w *RotateFileWriter) NotifyRotateEvict(sub chan interface{}) {
-	w.notifyRotate.Evict(sub)
-}
-
-// Close closes underlying file and signals all readers to stop.
-func (w *RotateFileWriter) Close() error {
-	w.mu.Lock()
-	defer w.mu.Unlock()
-	if w.closed {
-		return nil
-	}
-	if err := w.f.Close(); err != nil {
-		return err
-	}
-	w.closed = true
-	return nil
-}
diff --git a/daemon/logs.go b/daemon/logs.go
index babf07e..131360b 100644
--- a/daemon/logs.go
+++ b/daemon/logs.go
@@ -123,7 +123,7 @@
 				}
 				return
 			case <-ctx.Done():
-				lg.Debug("logs: end stream, ctx is done: %v", ctx.Err())
+				lg.Debugf("logs: end stream, ctx is done: %v", ctx.Err())
 				return
 			case msg, ok := <-logs.Msg:
 				// there is some kind of pool or ring buffer in the logger that