| package jsonfilelog |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "io" |
| "os" |
| "strconv" |
| "sync" |
| "time" |
| |
| "gopkg.in/fsnotify.v1" |
| |
| "github.com/Sirupsen/logrus" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/pkg/ioutils" |
| "github.com/docker/docker/pkg/jsonlog" |
| "github.com/docker/docker/pkg/pubsub" |
| "github.com/docker/docker/pkg/tailfile" |
| "github.com/docker/docker/pkg/timeutils" |
| "github.com/docker/docker/pkg/units" |
| ) |
| |
| const ( |
| Name = "json-file" |
| maxJSONDecodeRetry = 10 |
| ) |
| |
| // JSONFileLogger is Logger implementation for default docker logging: |
| // JSON objects to file |
| type JSONFileLogger struct { |
| buf *bytes.Buffer |
| f *os.File // store for closing |
| mu sync.Mutex // protects buffer |
| capacity int64 //maximum size of each file |
| n int //maximum number of files |
| ctx logger.Context |
| readers map[*logger.LogWatcher]struct{} // stores the active log followers |
| notifyRotate *pubsub.Publisher |
| } |
| |
| func init() { |
| if err := logger.RegisterLogDriver(Name, New); err != nil { |
| logrus.Fatal(err) |
| } |
| if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil { |
| logrus.Fatal(err) |
| } |
| } |
| |
| // New creates new JSONFileLogger which writes to filename |
| func New(ctx logger.Context) (logger.Logger, error) { |
| log, err := os.OpenFile(ctx.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) |
| if err != nil { |
| return nil, err |
| } |
| var capval int64 = -1 |
| if capacity, ok := ctx.Config["max-size"]; ok { |
| var err error |
| capval, err = units.FromHumanSize(capacity) |
| if err != nil { |
| return nil, err |
| } |
| } |
| var maxFiles int = 1 |
| if maxFileString, ok := ctx.Config["max-file"]; ok { |
| maxFiles, err = strconv.Atoi(maxFileString) |
| if err != nil { |
| return nil, err |
| } |
| if maxFiles < 1 { |
| return nil, fmt.Errorf("max-files cannot be less than 1.") |
| } |
| } |
| return &JSONFileLogger{ |
| f: log, |
| buf: bytes.NewBuffer(nil), |
| ctx: ctx, |
| capacity: capval, |
| n: maxFiles, |
| readers: make(map[*logger.LogWatcher]struct{}), |
| notifyRotate: pubsub.NewPublisher(0, 1), |
| }, nil |
| } |
| |
| // Log converts logger.Message to jsonlog.JSONLog and serializes it to file |
| func (l *JSONFileLogger) Log(msg *logger.Message) error { |
| l.mu.Lock() |
| defer l.mu.Unlock() |
| |
| timestamp, err := timeutils.FastMarshalJSON(msg.Timestamp) |
| if err != nil { |
| return err |
| } |
| err = (&jsonlog.JSONLogBytes{Log: append(msg.Line, '\n'), Stream: msg.Source, Created: timestamp}).MarshalJSONBuf(l.buf) |
| if err != nil { |
| return err |
| } |
| l.buf.WriteByte('\n') |
| _, err = writeLog(l) |
| return err |
| } |
| |
| func writeLog(l *JSONFileLogger) (int64, error) { |
| if l.capacity == -1 { |
| return writeToBuf(l) |
| } |
| meta, err := l.f.Stat() |
| if err != nil { |
| return -1, err |
| } |
| if meta.Size() >= l.capacity { |
| name := l.f.Name() |
| if err := l.f.Close(); err != nil { |
| return -1, err |
| } |
| if err := rotate(name, l.n); err != nil { |
| return -1, err |
| } |
| file, err := os.OpenFile(name, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666) |
| if err != nil { |
| return -1, err |
| } |
| l.f = file |
| l.notifyRotate.Publish(struct{}{}) |
| } |
| return writeToBuf(l) |
| } |
| |
| func writeToBuf(l *JSONFileLogger) (int64, error) { |
| i, err := l.buf.WriteTo(l.f) |
| if err != nil { |
| l.buf = bytes.NewBuffer(nil) |
| } |
| return i, err |
| } |
| |
| func rotate(name string, n int) error { |
| if n < 2 { |
| return nil |
| } |
| for i := n - 1; i > 1; i-- { |
| oldFile := name + "." + strconv.Itoa(i) |
| replacingFile := name + "." + strconv.Itoa(i-1) |
| if err := backup(oldFile, replacingFile); err != nil { |
| return err |
| } |
| } |
| if err := backup(name+".1", name); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func backup(old, curr string) error { |
| if _, err := os.Stat(old); !os.IsNotExist(err) { |
| err := os.Remove(old) |
| if err != nil { |
| return err |
| } |
| } |
| if _, err := os.Stat(curr); os.IsNotExist(err) { |
| f, err := os.Create(curr) |
| if err != nil { |
| return err |
| } |
| f.Close() |
| } |
| return os.Rename(curr, old) |
| } |
| |
| func ValidateLogOpt(cfg map[string]string) error { |
| for key := range cfg { |
| switch key { |
| case "max-file": |
| case "max-size": |
| default: |
| return fmt.Errorf("unknown log opt '%s' for json-file log driver", key) |
| } |
| } |
| return nil |
| } |
| |
| func (l *JSONFileLogger) LogPath() string { |
| return l.ctx.LogPath |
| } |
| |
| // Close closes underlying file and signals all readers to stop |
| func (l *JSONFileLogger) Close() error { |
| l.mu.Lock() |
| err := l.f.Close() |
| for r := range l.readers { |
| r.Close() |
| delete(l.readers, r) |
| } |
| l.mu.Unlock() |
| return err |
| } |
| |
| // Name returns name of this logger |
| func (l *JSONFileLogger) Name() string { |
| return Name |
| } |
| |
| func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) { |
| l.Reset() |
| if err := dec.Decode(l); err != nil { |
| return nil, err |
| } |
| msg := &logger.Message{ |
| Source: l.Stream, |
| Timestamp: l.Created, |
| Line: []byte(l.Log), |
| } |
| return msg, nil |
| } |
| |
| // Reads from the log file |
| func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { |
| logWatcher := logger.NewLogWatcher() |
| |
| go l.readLogs(logWatcher, config) |
| return logWatcher |
| } |
| |
| func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) { |
| defer close(logWatcher.Msg) |
| |
| pth := l.ctx.LogPath |
| var files []io.ReadSeeker |
| for i := l.n; i > 1; i-- { |
| f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1)) |
| if err != nil { |
| if !os.IsNotExist(err) { |
| logWatcher.Err <- err |
| break |
| } |
| continue |
| } |
| defer f.Close() |
| files = append(files, f) |
| } |
| |
| latestFile, err := os.Open(pth) |
| if err != nil { |
| logWatcher.Err <- err |
| return |
| } |
| defer latestFile.Close() |
| |
| files = append(files, latestFile) |
| tailer := ioutils.MultiReadSeeker(files...) |
| |
| if config.Tail != 0 { |
| tailFile(tailer, logWatcher, config.Tail, config.Since) |
| } |
| |
| if !config.Follow { |
| return |
| } |
| |
| if config.Tail >= 0 { |
| latestFile.Seek(0, os.SEEK_END) |
| } |
| |
| l.mu.Lock() |
| l.readers[logWatcher] = struct{}{} |
| l.mu.Unlock() |
| |
| notifyRotate := l.notifyRotate.Subscribe() |
| followLogs(latestFile, logWatcher, notifyRotate, config.Since) |
| |
| l.mu.Lock() |
| delete(l.readers, logWatcher) |
| l.mu.Unlock() |
| |
| l.notifyRotate.Evict(notifyRotate) |
| } |
| |
| func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) { |
| var rdr io.Reader = f |
| if tail > 0 { |
| ls, err := tailfile.TailFile(f, tail) |
| if err != nil { |
| logWatcher.Err <- err |
| return |
| } |
| rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n"))) |
| } |
| dec := json.NewDecoder(rdr) |
| l := &jsonlog.JSONLog{} |
| for { |
| msg, err := decodeLogLine(dec, l) |
| if err != nil { |
| if err != io.EOF { |
| logWatcher.Err <- err |
| } |
| return |
| } |
| if !since.IsZero() && msg.Timestamp.Before(since) { |
| continue |
| } |
| logWatcher.Msg <- msg |
| } |
| } |
| |
| func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) { |
| dec := json.NewDecoder(f) |
| l := &jsonlog.JSONLog{} |
| fileWatcher, err := fsnotify.NewWatcher() |
| if err != nil { |
| logWatcher.Err <- err |
| return |
| } |
| defer fileWatcher.Close() |
| if err := fileWatcher.Add(f.Name()); err != nil { |
| logWatcher.Err <- err |
| return |
| } |
| |
| var retries int |
| for { |
| msg, err := decodeLogLine(dec, l) |
| if err != nil { |
| if err != io.EOF { |
| // try again because this shouldn't happen |
| if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry { |
| dec = json.NewDecoder(f) |
| retries += 1 |
| continue |
| } |
| logWatcher.Err <- err |
| return |
| } |
| |
| select { |
| case <-fileWatcher.Events: |
| dec = json.NewDecoder(f) |
| continue |
| case <-fileWatcher.Errors: |
| logWatcher.Err <- err |
| return |
| case <-logWatcher.WatchClose(): |
| return |
| case <-notifyRotate: |
| fileWatcher.Remove(f.Name()) |
| |
| f, err = os.Open(f.Name()) |
| if err != nil { |
| logWatcher.Err <- err |
| return |
| } |
| if err := fileWatcher.Add(f.Name()); err != nil { |
| logWatcher.Err <- err |
| } |
| dec = json.NewDecoder(f) |
| continue |
| } |
| } |
| |
| retries = 0 // reset retries since we've succeeded |
| if !since.IsZero() && msg.Timestamp.Before(since) { |
| continue |
| } |
| select { |
| case logWatcher.Msg <- msg: |
| case <-logWatcher.WatchClose(): |
| logWatcher.Msg <- msg |
| for { |
| msg, err := decodeLogLine(dec, l) |
| if err != nil { |
| return |
| } |
| if !since.IsZero() && msg.Timestamp.Before(since) { |
| continue |
| } |
| logWatcher.Msg <- msg |
| } |
| } |
| } |
| } |