blob: 4703f64b99e5d51cb6e05d724397535823b8d829 [file] [log] [blame]
package jsonfilelog
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"sync"
"time"
"gopkg.in/fsnotify.v1"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile"
"github.com/docker/docker/pkg/timeutils"
"github.com/docker/docker/pkg/units"
)
const (
Name = "json-file"
maxJSONDecodeRetry = 10
)
// JSONFileLogger is Logger implementation for default docker logging:
// JSON objects to file
type JSONFileLogger struct {
buf *bytes.Buffer
f *os.File // store for closing
mu sync.Mutex // protects buffer
capacity int64 //maximum size of each file
n int //maximum number of files
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
notifyRotate *pubsub.Publisher
}
func init() {
if err := logger.RegisterLogDriver(Name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}
// New creates new JSONFileLogger which writes to filename
func New(ctx logger.Context) (logger.Logger, error) {
log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
var capval int64 = -1
if capacity, ok := ctx.Config["max-size"]; ok {
var err error
capval, err = units.FromHumanSize(capacity)
if err != nil {
return nil, err
}
}
var maxFiles int = 1
if maxFileString, ok := ctx.Config["max-file"]; ok {
maxFiles, err = strconv.Atoi(maxFileString)
if err != nil {
return nil, err
}
if maxFiles < 1 {
return nil, fmt.Errorf("max-files cannot be less than 1.")
}
}
return &JSONFileLogger{
f: log,
buf: bytes.NewBuffer(nil),
ctx: ctx,
capacity: capval,
n: maxFiles,
readers: make(map[*logger.LogWatcher]struct{}),
notifyRotate: pubsub.NewPublisher(0, 1),
}, nil
}
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file
func (l *JSONFileLogger) Log(msg *logger.Message) error {
l.mu.Lock()
defer l.mu.Unlock()
timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp)
if err != nil {
return err
}
err = (&jsonlog.JSONLogBytes{Log: append(msg.Line, '\n'), Stream: msg.Source, Created: timestamp}).MarshalJSONBuf(l.buf)
if err != nil {
return err
}
l.buf.WriteByte('\n')
_, err = writeLog(l)
return err
}
func writeLog(l *JSONFileLogger) (int64, error) {
if l.capacity == -1 {
return writeToBuf(l)
}
meta, err := l.f.Stat()
if err != nil {
return -1, err
}
if meta.Size() >= l.capacity {
name := l.f.Name()
if err := l.f.Close(); err != nil {
return -1, err
}
if err := rotate(name, l.n); err != nil {
return -1, err
}
file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666)
if err != nil {
return -1, err
}
l.f = file
l.notifyRotate.Publish(struct{}{})
}
return writeToBuf(l)
}
func writeToBuf(l *JSONFileLogger) (int64, error) {
i, err := l.buf.WriteTo(l.f)
if err != nil {
l.buf = bytes.NewBuffer(nil)
}
return i, err
}
func rotate(name string, n int) error {
if n < 2 {
return nil
}
for i := n - 1; i > 1; i-- {
oldFile := name + "." + strconv.Itoa(i)
replacingFile := name + "." + strconv.Itoa(i-1)
if err := backup(oldFile, replacingFile); err != nil {
return err
}
}
if err := backup(name+".1", name); err != nil {
return err
}
return nil
}
func backup(old, curr string) error {
if _, err := os.Stat(old); !os.IsNotExist(err) {
err := os.Remove(old)
if err != nil {
return err
}
}
if _, err := os.Stat(curr); os.IsNotExist(err) {
f, err := os.Create(curr)
if err != nil {
return err
}
f.Close()
}
return os.Rename(curr, old)
}
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case "max-file":
case "max-size":
default:
return fmt.Errorf("unknown log opt '%s' for json-file log driver", key)
}
}
return nil
}
func (l *JSONFileLogger) LogPath() string {
return l.ctx.LogPath
}
// Close closes underlying file and signals all readers to stop
func (l *JSONFileLogger) Close() error {
l.mu.Lock()
err := l.f.Close()
for r := range l.readers {
r.Close()
delete(l.readers, r)
}
l.mu.Unlock()
return err
}
// Name returns name of this logger
func (l *JSONFileLogger) Name() string {
return Name
}
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
return nil, err
}
msg := &logger.Message{
Source: l.Stream,
Timestamp: l.Created,
Line: []byte(l.Log),
}
return msg, nil
}
// Reads from the log file
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()
go l.readLogs(logWatcher, config)
return logWatcher
}
func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(logWatcher.Msg)
pth := l.ctx.LogPath
var files []io.ReadSeeker
for i := l.n; i > 1; i-- {
f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
if err != nil {
if !os.IsNotExist(err) {
logWatcher.Err <- err
break
}
continue
}
defer f.Close()
files = append(files, f)
}
latestFile, err := os.Open(pth)
if err != nil {
logWatcher.Err <- err
return
}
defer latestFile.Close()
files = append(files, latestFile)
tailer := ioutils.MultiReadSeeker(files...)
if config.Tail != 0 {
tailFile(tailer, logWatcher, config.Tail, config.Since)
}
if !config.Follow {
return
}
if config.Tail >= 0 {
latestFile.Seek(0, os.SEEK_END)
}
l.mu.Lock()
l.readers[logWatcher] = struct{}{}
l.mu.Unlock()
notifyRotate := l.notifyRotate.Subscribe()
followLogs(latestFile, logWatcher, notifyRotate, config.Since)
l.mu.Lock()
delete(l.readers, logWatcher)
l.mu.Unlock()
l.notifyRotate.Evict(notifyRotate)
}
func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
var rdr io.Reader = f
if tail > 0 {
ls, err := tailfile.TailFile(f, tail)
if err != nil {
logWatcher.Err <- err
return
}
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
}
dec := json.NewDecoder(rdr)
l := &jsonlog.JSONLog{}
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err != io.EOF {
logWatcher.Err <- err
}
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}
fileWatcher, err := fsnotify.NewWatcher()
if err != nil {
logWatcher.Err <- err
return
}
defer fileWatcher.Close()
if err := fileWatcher.Add(f.Name()); err != nil {
logWatcher.Err <- err
return
}
var retries int
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
if err != io.EOF {
// try again because this shouldn't happen
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
dec = json.NewDecoder(f)
retries += 1
continue
}
logWatcher.Err <- err
return
}
select {
case <-fileWatcher.Events:
dec = json.NewDecoder(f)
continue
case <-fileWatcher.Errors:
logWatcher.Err <- err
return
case <-logWatcher.WatchClose():
return
case <-notifyRotate:
fileWatcher.Remove(f.Name())
f, err = os.Open(f.Name())
if err != nil {
logWatcher.Err <- err
return
}
if err := fileWatcher.Add(f.Name()); err != nil {
logWatcher.Err <- err
}
dec = json.NewDecoder(f)
continue
}
}
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
select {
case logWatcher.Msg <- msg:
case <-logWatcher.WatchClose():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
logWatcher.Msg <- msg
}
}
}
}