| package local |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "io" |
| |
| "bytes" |
| |
| "github.com/docker/docker/api/types/plugins/logdriver" |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/daemon/logger/loggerutils" |
| "github.com/docker/docker/errdefs" |
| "github.com/pkg/errors" |
| ) |
| |
| func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher { |
| logWatcher := logger.NewLogWatcher() |
| |
| go d.readLogs(logWatcher, config) |
| return logWatcher |
| } |
| |
| func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) { |
| defer close(watcher.Msg) |
| |
| d.mu.Lock() |
| d.readers[watcher] = struct{}{} |
| d.mu.Unlock() |
| |
| d.logfile.ReadLogs(config, watcher) |
| |
| d.mu.Lock() |
| delete(d.readers, watcher) |
| d.mu.Unlock() |
| } |
| |
| func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) { |
| size := r.Size() |
| if req < 0 { |
| return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req)) |
| } |
| |
| if size < (encodeBinaryLen*2)+1 { |
| return bytes.NewReader(nil), 0, nil |
| } |
| |
| const encodeBinaryLen64 = int64(encodeBinaryLen) |
| var found int |
| |
| buf := make([]byte, encodeBinaryLen) |
| |
| offset := size |
| for { |
| select { |
| case <-ctx.Done(): |
| return nil, 0, ctx.Err() |
| default: |
| } |
| |
| n, err := r.ReadAt(buf, offset-encodeBinaryLen64) |
| if err != nil && err != io.EOF { |
| return nil, 0, errors.Wrap(err, "error reading log message footer") |
| } |
| |
| if n != encodeBinaryLen { |
| return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer")) |
| } |
| |
| msgLen := binary.BigEndian.Uint32(buf) |
| |
| n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen)) |
| if err != nil && err != io.EOF { |
| return nil, 0, errors.Wrap(err, "error reading log message header") |
| } |
| |
| if n != encodeBinaryLen { |
| return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header")) |
| } |
| |
| if msgLen != binary.BigEndian.Uint32(buf) { |
| return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes")) |
| } |
| |
| found++ |
| offset -= int64(msgLen) |
| offset -= encodeBinaryLen64 * 2 |
| if found == req { |
| break |
| } |
| if offset <= 0 { |
| break |
| } |
| } |
| |
| return io.NewSectionReader(r, offset, size), found, nil |
| } |
| |
| func decodeFunc(rdr io.Reader) func() (*logger.Message, error) { |
| proto := &logdriver.LogEntry{} |
| buf := make([]byte, initialBufSize) |
| |
| return func() (*logger.Message, error) { |
| var ( |
| read int |
| err error |
| ) |
| |
| resetProto(proto) |
| |
| for i := 0; i < maxDecodeRetry; i++ { |
| var n int |
| n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen]) |
| if err != nil { |
| if err != io.ErrUnexpectedEOF { |
| return nil, errors.Wrap(err, "error reading log message length") |
| } |
| read += n |
| continue |
| } |
| read += n |
| break |
| } |
| if err != nil { |
| return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen) |
| } |
| |
| msgLen := int(binary.BigEndian.Uint32(buf[:read])) |
| |
| if len(buf) < msgLen+encodeBinaryLen { |
| buf = make([]byte, msgLen+encodeBinaryLen) |
| } else { |
| if msgLen <= initialBufSize { |
| buf = buf[:initialBufSize] |
| } else { |
| buf = buf[:msgLen+encodeBinaryLen] |
| } |
| } |
| |
| return decodeLogEntry(rdr, proto, buf, msgLen) |
| } |
| } |
| |
| func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) { |
| var ( |
| read int |
| err error |
| ) |
| for i := 0; i < maxDecodeRetry; i++ { |
| var n int |
| n, err = io.ReadFull(rdr, buf[read:msgLen+encodeBinaryLen]) |
| if err != nil { |
| if err != io.ErrUnexpectedEOF { |
| return nil, errors.Wrap(err, "could not decode log entry") |
| } |
| read += n |
| continue |
| } |
| break |
| } |
| if err != nil { |
| return nil, errors.Wrapf(err, "could not decode entry: read %d, expected: %d", read, msgLen) |
| } |
| |
| if err := proto.Unmarshal(buf[:msgLen]); err != nil { |
| return nil, errors.Wrap(err, "error unmarshalling log entry") |
| } |
| |
| msg := protoToMessage(proto) |
| if msg.PLogMetaData == nil { |
| msg.Line = append(msg.Line, '\n') |
| } |
| return msg, nil |
| } |