| package local // import "github.com/docker/docker/daemon/logger/local" |
| |
| import ( |
| "encoding/binary" |
| "io" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/docker/docker/api/types/backend" |
| "github.com/docker/docker/api/types/plugins/logdriver" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/daemon/logger/loggerutils" |
| "github.com/docker/docker/errdefs" |
| "github.com/docker/go-units" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| ) |
| |
| const ( |
| // Name is the name of the driver |
| Name = "local" |
| |
| encodeBinaryLen = 4 |
| initialBufSize = 2048 |
| maxDecodeRetry = 20000 |
| |
| defaultMaxFileSize int64 = 20 * 1024 * 1024 |
| defaultMaxFileCount = 5 |
| defaultCompressLogs = true |
| ) |
| |
| // LogOptKeys are the keys names used for log opts passed in to initialize the driver. |
| var LogOptKeys = map[string]bool{ |
| "max-file": true, |
| "max-size": true, |
| "compress": true, |
| } |
| |
| // ValidateLogOpt looks for log driver specific options. |
| func ValidateLogOpt(cfg map[string]string) error { |
| for key := range cfg { |
| if !LogOptKeys[key] { |
| return errors.Errorf("unknown log opt '%s' for log driver %s", key, Name) |
| } |
| } |
| return nil |
| } |
| |
| func init() { |
| if err := logger.RegisterLogDriver(Name, New); err != nil { |
| logrus.Fatal(err) |
| } |
| if err := logger.RegisterLogOptValidator(Name, ValidateLogOpt); err != nil { |
| logrus.Fatal(err) |
| } |
| } |
| |
| type driver struct { |
| mu sync.Mutex |
| closed bool |
| logfile *loggerutils.LogFile |
| readers map[*logger.LogWatcher]struct{} // stores the active log followers |
| } |
| |
| // New creates a new local logger |
| // You must provide the `LogPath` in the passed in info argument, this is the file path that logs are written to. |
| func New(info logger.Info) (logger.Logger, error) { |
| if info.LogPath == "" { |
| return nil, errdefs.System(errors.New("log path is missing -- this is a bug and should not happen")) |
| } |
| |
| cfg := newDefaultConfig() |
| if capacity, ok := info.Config["max-size"]; ok { |
| var err error |
| cfg.MaxFileSize, err = units.FromHumanSize(capacity) |
| if err != nil { |
| return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-size: %s", capacity)) |
| } |
| } |
| |
| if userMaxFileCount, ok := info.Config["max-file"]; ok { |
| var err error |
| cfg.MaxFileCount, err = strconv.Atoi(userMaxFileCount) |
| if err != nil { |
| return nil, errdefs.InvalidParameter(errors.Wrapf(err, "invalid value for max-file: %s", userMaxFileCount)) |
| } |
| } |
| |
| if userCompress, ok := info.Config["compress"]; ok { |
| compressLogs, err := strconv.ParseBool(userCompress) |
| if err != nil { |
| return nil, errdefs.InvalidParameter(errors.Wrap(err, "error reading compress log option")) |
| } |
| cfg.DisableCompression = !compressLogs |
| } |
| return newDriver(info.LogPath, cfg) |
| } |
| |
| func makeMarshaller() func(m *logger.Message) ([]byte, error) { |
| buf := make([]byte, initialBufSize) |
| |
| // allocate the partial log entry separately, which allows for easier re-use |
| proto := &logdriver.LogEntry{} |
| md := &logdriver.PartialLogEntryMetadata{} |
| |
| return func(m *logger.Message) ([]byte, error) { |
| resetProto(proto) |
| |
| messageToProto(m, proto, md) |
| protoSize := proto.Size() |
| writeLen := protoSize + (2 * encodeBinaryLen) //+ len(messageDelimiter) |
| |
| if writeLen > len(buf) { |
| buf = make([]byte, writeLen) |
| } else { |
| // shrink the buffer back down |
| if writeLen <= initialBufSize { |
| buf = buf[:initialBufSize] |
| } else { |
| buf = buf[:writeLen] |
| } |
| } |
| |
| binary.BigEndian.PutUint32(buf[:encodeBinaryLen], uint32(protoSize)) |
| n, err := proto.MarshalTo(buf[encodeBinaryLen:writeLen]) |
| if err != nil { |
| return nil, errors.Wrap(err, "error marshaling log entry") |
| } |
| if n+(encodeBinaryLen*2) != writeLen { |
| return nil, io.ErrShortWrite |
| } |
| binary.BigEndian.PutUint32(buf[writeLen-encodeBinaryLen:writeLen], uint32(protoSize)) |
| return buf[:writeLen], nil |
| } |
| } |
| |
| func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) { |
| if err := validateConfig(cfg); err != nil { |
| return nil, errdefs.InvalidParameter(err) |
| } |
| |
| lf, err := loggerutils.NewLogFile(logPath, cfg.MaxFileSize, cfg.MaxFileCount, !cfg.DisableCompression, makeMarshaller(), decodeFunc, 0640, getTailReader) |
| if err != nil { |
| return nil, err |
| } |
| return &driver{ |
| logfile: lf, |
| readers: make(map[*logger.LogWatcher]struct{}), |
| }, nil |
| } |
| |
| func (d *driver) Name() string { |
| return Name |
| } |
| |
| func (d *driver) Log(msg *logger.Message) error { |
| d.mu.Lock() |
| err := d.logfile.WriteLogEntry(msg) |
| d.mu.Unlock() |
| return err |
| } |
| |
| func (d *driver) Close() error { |
| d.mu.Lock() |
| d.closed = true |
| err := d.logfile.Close() |
| for r := range d.readers { |
| r.ProducerGone() |
| delete(d.readers, r) |
| } |
| d.mu.Unlock() |
| return err |
| } |
| |
| func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) { |
| proto.Source = msg.Source |
| proto.TimeNano = msg.Timestamp.UnixNano() |
| proto.Line = append(proto.Line[:0], msg.Line...) |
| proto.Partial = msg.PLogMetaData != nil |
| if proto.Partial { |
| partial.Ordinal = int32(msg.PLogMetaData.Ordinal) |
| partial.Last = msg.PLogMetaData.Last |
| partial.Id = msg.PLogMetaData.ID |
| proto.PartialLogMetadata = partial |
| } else { |
| proto.PartialLogMetadata = nil |
| } |
| } |
| |
| func protoToMessage(proto *logdriver.LogEntry) *logger.Message { |
| msg := &logger.Message{ |
| Source: proto.Source, |
| Timestamp: time.Unix(0, proto.TimeNano), |
| } |
| if proto.Partial { |
| var md backend.PartialLogMetaData |
| md.Last = proto.GetPartialLogMetadata().GetLast() |
| md.ID = proto.GetPartialLogMetadata().GetId() |
| md.Ordinal = int(proto.GetPartialLogMetadata().GetOrdinal()) |
| msg.PLogMetaData = &md |
| } |
| msg.Line = append(msg.Line[:0], proto.Line...) |
| return msg |
| } |
| |
| func resetProto(proto *logdriver.LogEntry) { |
| proto.Source = "" |
| proto.Line = proto.Line[:0] |
| proto.TimeNano = 0 |
| proto.Partial = false |
| if proto.PartialLogMetadata != nil { |
| proto.PartialLogMetadata.Id = "" |
| proto.PartialLogMetadata.Last = false |
| proto.PartialLogMetadata.Ordinal = 0 |
| } |
| proto.PartialLogMetadata = nil |
| } |