blob: a752de2a8d54138100cfde439293f6f5c6296d00 [file] [log] [blame]
package local
import (
"context"
"encoding/binary"
"io"
"bytes"
"github.com/docker/docker/api/types/plugins/logdriver"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/errdefs"
"github.com/pkg/errors"
)
func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()
go d.readLogs(logWatcher, config)
return logWatcher
}
func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(watcher.Msg)
d.mu.Lock()
d.readers[watcher] = struct{}{}
d.mu.Unlock()
d.logfile.ReadLogs(config, watcher)
d.mu.Lock()
delete(d.readers, watcher)
d.mu.Unlock()
}
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
size := r.Size()
if req < 0 {
return nil, 0, errdefs.InvalidParameter(errors.Errorf("invalid number of lines to tail: %d", req))
}
if size < (encodeBinaryLen*2)+1 {
return bytes.NewReader(nil), 0, nil
}
const encodeBinaryLen64 = int64(encodeBinaryLen)
var found int
buf := make([]byte, encodeBinaryLen)
offset := size
for {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
default:
}
n, err := r.ReadAt(buf, offset-encodeBinaryLen64)
if err != nil && err != io.EOF {
return nil, 0, errors.Wrap(err, "error reading log message footer")
}
if n != encodeBinaryLen {
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message footer"))
}
msgLen := binary.BigEndian.Uint32(buf)
n, err = r.ReadAt(buf, offset-encodeBinaryLen64-encodeBinaryLen64-int64(msgLen))
if err != nil && err != io.EOF {
return nil, 0, errors.Wrap(err, "error reading log message header")
}
if n != encodeBinaryLen {
return nil, 0, errdefs.DataLoss(errors.New("unexpected number of bytes read from log message header"))
}
if msgLen != binary.BigEndian.Uint32(buf) {
return nil, 0, errdefs.DataLoss(errors.Wrap(err, "log message header and footer indicate different message sizes"))
}
found++
offset -= int64(msgLen)
offset -= encodeBinaryLen64 * 2
if found == req {
break
}
if offset <= 0 {
break
}
}
return io.NewSectionReader(r, offset, size), found, nil
}
func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
proto := &logdriver.LogEntry{}
buf := make([]byte, initialBufSize)
return func() (*logger.Message, error) {
var (
read int
err error
)
resetProto(proto)
for i := 0; i < maxDecodeRetry; i++ {
var n int
n, err = io.ReadFull(rdr, buf[read:encodeBinaryLen])
if err != nil {
if err != io.ErrUnexpectedEOF {
return nil, errors.Wrap(err, "error reading log message length")
}
read += n
continue
}
read += n
break
}
if err != nil {
return nil, errors.Wrapf(err, "could not read log message length: read: %d, expected: %d", read, encodeBinaryLen)
}
msgLen := int(binary.BigEndian.Uint32(buf[:read]))
if len(buf) < msgLen+encodeBinaryLen {
buf = make([]byte, msgLen+encodeBinaryLen)
} else {
if msgLen <= initialBufSize {
buf = buf[:initialBufSize]
} else {
buf = buf[:msgLen+encodeBinaryLen]
}
}
return decodeLogEntry(rdr, proto, buf, msgLen)
}
}
func decodeLogEntry(rdr io.Reader, proto *logdriver.LogEntry, buf []byte, msgLen int) (*logger.Message, error) {
var (
read int
err error
)
for i := 0; i < maxDecodeRetry; i++ {
var n int
n, err = io.ReadFull(rdr, buf[read:msgLen+encodeBinaryLen])
if err != nil {
if err != io.ErrUnexpectedEOF {
return nil, errors.Wrap(err, "could not decode log entry")
}
read += n
continue
}
break
}
if err != nil {
return nil, errors.Wrapf(err, "could not decode entry: read %d, expected: %d", read, msgLen)
}
if err := proto.Unmarshal(buf[:msgLen]); err != nil {
return nil, errors.Wrap(err, "error unmarshalling log entry")
}
msg := protoToMessage(proto)
if msg.PLogMetaData == nil {
msg.Line = append(msg.Line, '\n')
}
return msg, nil
}