blob: c89897cfdef620891651a5dfd00fb8796af35435 [file] [log] [blame]
package logger
import (
"errors"
"sync"
"sync/atomic"
"github.com/sirupsen/logrus"
)
const (
defaultRingMaxSize = 1e6 // 1MB
)
// RingLogger is a ring buffer that implements the Logger interface.
// This is used when lossy logging is OK.
type RingLogger struct {
buffer *messageRing
l Logger
logInfo Info
closeFlag int32
}
type ringWithReader struct {
*RingLogger
}
func (r *ringWithReader) ReadLogs(cfg ReadConfig) *LogWatcher {
reader, ok := r.l.(LogReader)
if !ok {
// something is wrong if we get here
panic("expected log reader")
}
return reader.ReadLogs(cfg)
}
func newRingLogger(driver Logger, logInfo Info, maxSize int64) *RingLogger {
l := &RingLogger{
buffer: newRing(maxSize),
l: driver,
logInfo: logInfo,
}
go l.run()
return l
}
// NewRingLogger creates a new Logger that is implemented as a RingBuffer wrapping
// the passed in logger.
func NewRingLogger(driver Logger, logInfo Info, maxSize int64) Logger {
if maxSize < 0 {
maxSize = defaultRingMaxSize
}
l := newRingLogger(driver, logInfo, maxSize)
if _, ok := driver.(LogReader); ok {
return &ringWithReader{l}
}
return l
}
// Log queues messages into the ring buffer
func (r *RingLogger) Log(msg *Message) error {
if r.closed() {
return errClosed
}
return r.buffer.Enqueue(msg)
}
// Name returns the name of the underlying logger
func (r *RingLogger) Name() string {
return r.l.Name()
}
func (r *RingLogger) closed() bool {
return atomic.LoadInt32(&r.closeFlag) == 1
}
func (r *RingLogger) setClosed() {
atomic.StoreInt32(&r.closeFlag, 1)
}
// Close closes the logger
func (r *RingLogger) Close() error {
r.setClosed()
r.buffer.Close()
// empty out the queue
var logErr bool
for _, msg := range r.buffer.Drain() {
if logErr {
// some error logging a previous message, so re-insert to message pool
// and assume log driver is hosed
PutMessage(msg)
continue
}
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
logErr = true
}
}
return r.l.Close()
}
// run consumes messages from the ring buffer and forwards them to the underling
// logger.
// This is run in a goroutine when the RingLogger is created
func (r *RingLogger) run() {
for {
if r.closed() {
return
}
msg, err := r.buffer.Dequeue()
if err != nil {
// buffer is closed
return
}
if err := r.l.Log(msg); err != nil {
logrus.WithField("driver", r.l.Name()).WithField("container", r.logInfo.ContainerID).Errorf("Error writing log message: %v", r.l)
}
}
}
type messageRing struct {
mu sync.Mutex
// signals callers of `Dequeue` to wake up either on `Close` or when a new `Message` is added
wait *sync.Cond
sizeBytes int64 // current buffer size
maxBytes int64 // max buffer size size
queue []*Message
closed bool
}
func newRing(maxBytes int64) *messageRing {
queueSize := 1000
if maxBytes == 0 || maxBytes == 1 {
// With 0 or 1 max byte size, the maximum size of the queue would only ever be 1
// message long.
queueSize = 1
}
r := &messageRing{queue: make([]*Message, 0, queueSize), maxBytes: maxBytes}
r.wait = sync.NewCond(&r.mu)
return r
}
// Enqueue adds a message to the buffer queue
// If the message is too big for the buffer it drops the oldest messages to make room
// If there are no messages in the queue and the message is still too big, it adds the message anyway.
func (r *messageRing) Enqueue(m *Message) error {
mSize := int64(len(m.Line))
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return errClosed
}
if mSize+r.sizeBytes > r.maxBytes && len(r.queue) > 0 {
r.wait.Signal()
r.mu.Unlock()
return nil
}
r.queue = append(r.queue, m)
r.sizeBytes += mSize
r.wait.Signal()
r.mu.Unlock()
return nil
}
// Dequeue pulls a message off the queue
// If there are no messages, it waits for one.
// If the buffer is closed, it will return immediately.
func (r *messageRing) Dequeue() (*Message, error) {
r.mu.Lock()
for len(r.queue) == 0 && !r.closed {
r.wait.Wait()
}
if r.closed {
r.mu.Unlock()
return nil, errClosed
}
msg := r.queue[0]
r.queue = r.queue[1:]
r.sizeBytes -= int64(len(msg.Line))
r.mu.Unlock()
return msg, nil
}
var errClosed = errors.New("closed")
// Close closes the buffer ensuring no new messages can be added.
// Any callers waiting to dequeue a message will be woken up.
func (r *messageRing) Close() {
r.mu.Lock()
if r.closed {
r.mu.Unlock()
return
}
r.closed = true
r.wait.Broadcast()
r.mu.Unlock()
}
// Drain drains all messages from the queue.
// This can be used after `Close()` to get any remaining messages that were in queue.
func (r *messageRing) Drain() []*Message {
r.mu.Lock()
ls := make([]*Message, 0, len(r.queue))
ls = append(ls, r.queue...)
r.sizeBytes = 0
r.queue = r.queue[:0]
r.mu.Unlock()
return ls
}