| package local |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/binary" |
| "fmt" |
| "io" |
| |
| "github.com/docker/docker/api/types/plugins/logdriver" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/daemon/logger/loggerutils" |
| "github.com/docker/docker/errdefs" |
| "github.com/pkg/errors" |
| ) |
| |
| // maxMsgLen is the maximum size of the logger.Message after serialization. |
| // logger.defaultBufSize caps the size of Line field. |
| const maxMsgLen int = 1e6 // 1MB. |
| |
| func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { |
| return d.logfile.ReadLogs(config) |
| } |
| |
| func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { |
| size := r.Size() |
| if req < 0 { |
| return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req)) |
| } |
| |
| if size < (encodeBinaryLen*2)+1 { |
| return bytes.NewReader(nil), 0, nil |
| } |
| |
| const encodeBinaryLen64 = int64(encodeBinaryLen) |
| var found int |
| |
| buf := make([]byte, encodeBinaryLen) |
| |
| offset := size |
| for { |
| select { |
| case <-ctx.Done(): |
| return nil, 0, ctx.Err() |
| default: |
| } |
| |
| n, err := r.ReadAt(buf, offset-encodeBinaryLen64) |
| if err != nil && err != io.EOF { |
| return nil, 0, errors.Wrap(err, "error reading log message footer") |
| } |
| |
| if n != encodeBinaryLen { |
| return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer")) |
| } |
| |
| msgLen := binary.BigEndian.Uint32(buf) |
| |
| n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen)) |
| if err != nil && err != io.EOF { |
| return nil, 0, errors.Wrap(err, "error reading log message header") |
| } |
| |
| if n != encodeBinaryLen { |
| return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header")) |
| } |
| |
| if msgLen != binary.BigEndian.Uint32(buf) { |
| return nil, 0, errdefs.DataLoss(errors.New("log message header and footer indicate different message sizes")) |
| } |
| |
| found++ |
| offset -= int64(msgLen) |
| offset -= encodeBinaryLen64 * 2 |
| if found == req { |
| break |
| } |
| if offset <= 0 { |
| break |
| } |
| } |
| |
| return io.NewSectionReader(r, offset, size), found, nil |
| } |
| |
| type decoder struct { |
| rdr io.Reader |
| proto *logdriver.LogEntry |
| // buf keeps bytes from rdr. |
| buf []byte |
| // offset is the position in buf. |
| // If offset > 0, buf[offset:] has bytes which are read but haven't used. |
| offset int |
| // nextMsgLen is the length of the next log message. |
| // If nextMsgLen = 0, a new value must be read from rdr. |
| nextMsgLen int |
| } |
| |
| func (d *decoder) readRecord(size int) error { |
| var err error |
| for i := 0; i < maxDecodeRetry; i++ { |
| var n int |
| n, err = io.ReadFull(d.rdr, d.buf[d.offset:size]) |
| d.offset += n |
| if err != nil { |
| if err != io.ErrUnexpectedEOF { |
| return err |
| } |
| continue |
| } |
| break |
| } |
| if err != nil { |
| return err |
| } |
| d.offset = 0 |
| return nil |
| } |
| |
| func (d *decoder) Decode() (*logger.Message, error) { |
| if d.proto == nil { |
| d.proto = &logdriver.LogEntry{} |
| } else { |
| resetProto(d.proto) |
| } |
| if d.buf == nil { |
| d.buf = make([]byte, initialBufSize) |
| } |
| |
| if d.nextMsgLen == 0 { |
| msgLen, err := d.decodeSizeHeader() |
| if err != nil { |
| return nil, err |
| } |
| |
| if msgLen > maxMsgLen { |
| return nil, fmt.Errorf("log message is too large (%d > %d)", msgLen, maxMsgLen) |
| } |
| |
| if len(d.buf) < msgLen+encodeBinaryLen { |
| d.buf = make([]byte, msgLen+encodeBinaryLen) |
| } else if msgLen <= initialBufSize { |
| d.buf = d.buf[:initialBufSize] |
| } else { |
| d.buf = d.buf[:msgLen+encodeBinaryLen] |
| } |
| |
| d.nextMsgLen = msgLen |
| } |
| return d.decodeLogEntry() |
| } |
| |
| func (d *decoder) Reset(rdr io.Reader) { |
| if d.rdr == rdr { |
| return |
| } |
| |
| d.rdr = rdr |
| if d.proto != nil { |
| resetProto(d.proto) |
| } |
| if d.buf != nil { |
| d.buf = d.buf[:initialBufSize] |
| } |
| d.offset = 0 |
| d.nextMsgLen = 0 |
| } |
| |
| func (d *decoder) Close() { |
| d.buf = d.buf[:0] |
| d.buf = nil |
| if d.proto != nil { |
| resetProto(d.proto) |
| } |
| d.rdr = nil |
| } |
| |
| func decodeFunc(rdr io.Reader) loggerutils.Decoder { |
| return &decoder{rdr: rdr} |
| } |
| |
| func (d *decoder) decodeSizeHeader() (int, error) { |
| err := d.readRecord(encodeBinaryLen) |
| if err != nil { |
| return 0, errors.Wrap(err, "could not read a size header") |
| } |
| |
| msgLen := int(binary.BigEndian.Uint32(d.buf[:encodeBinaryLen])) |
| return msgLen, nil |
| } |
| |
| func (d *decoder) decodeLogEntry() (*logger.Message, error) { |
| msgLen := d.nextMsgLen |
| err := d.readRecord(msgLen + encodeBinaryLen) |
| if err != nil { |
| return nil, errors.Wrapf(err, "could not read a log entry (size=%d+%d)", msgLen, encodeBinaryLen) |
| } |
| d.nextMsgLen = 0 |
| |
| if err := d.proto.Unmarshal(d.buf[:msgLen]); err != nil { |
| return nil, errors.Wrapf(err, "error unmarshalling log entry (size=%d)", msgLen) |
| } |
| |
| msg := protoToMessage(d.proto) |
| if msg.PLogMetaData == nil || msg.PLogMetaData.Last { |
| msg.Line = append(msg.Line, '\n') |
| } |
| |
| return msg, nil |
| } |