blob: ae3a74ced768fa2a2de46e2b69a3af82ed95b4e3 [file] [log] [blame]
package loggerutils
import (
"bufio"
"context"
"io"
"io/ioutil"
"os"
"strings"
"testing"
"time"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/tailfile"
"gotest.tools/assert"
)
func TestTailFiles(t *testing.T) {
s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n")
s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n")
s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n")
files := []SizeReaderAt{s1, s2, s3}
watcher := logger.NewLogWatcher()
createDecoder := func(r io.Reader) func() (*logger.Message, error) {
scanner := bufio.NewScanner(r)
return func() (*logger.Message, error) {
if !scanner.Scan() {
return nil, scanner.Err()
}
// some comment
return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil
}
}
tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, lines)
}
for desc, config := range map[string]logger.ReadConfig{} {
t.Run(desc, func(t *testing.T) {
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, createDecoder, tailReader, config)
}()
<-started
})
}
config := logger.ReadConfig{Tail: 2}
started := make(chan struct{})
go func() {
close(started)
tailFiles(files, watcher, createDecoder, tailReader, config)
}()
<-started
select {
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for tail line")
case err := <-watcher.Err:
assert.NilError(t, err)
case msg := <-watcher.Msg:
assert.Assert(t, msg != nil)
assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line))
}
select {
case <-time.After(60 * time.Second):
t.Fatal("timeout waiting for tail line")
case err := <-watcher.Err:
assert.NilError(t, err)
case msg := <-watcher.Msg:
assert.Assert(t, msg != nil)
assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line))
}
}
func TestFollowLogsConsumerGone(t *testing.T) {
lw := logger.NewLogWatcher()
f, err := ioutil.TempFile("", t.Name())
assert.NilError(t, err)
defer func() {
f.Close()
os.Remove(f.Name())
}()
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
return func() (*logger.Message, error) {
return &logger.Message{}, nil
}
}
followLogsDone := make(chan struct{})
var since, until time.Time
go func() {
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
close(followLogsDone)
}()
select {
case <-lw.Msg:
case err := <-lw.Err:
assert.NilError(t, err)
case <-followLogsDone:
t.Fatal("follow logs finished unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for log message")
}
lw.ConsumerGone()
select {
case <-followLogsDone:
case <-time.After(20 * time.Second):
t.Fatal("timeout waiting for followLogs() to finish")
}
}
func TestFollowLogsProducerGone(t *testing.T) {
lw := logger.NewLogWatcher()
f, err := ioutil.TempFile("", t.Name())
assert.NilError(t, err)
defer os.Remove(f.Name())
var sent, received, closed int
makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) {
return func() (*logger.Message, error) {
if closed == 1 {
closed++
t.Logf("logDecode() closed after sending %d messages\n", sent)
return nil, io.EOF
} else if closed > 1 {
t.Fatal("logDecode() called after closing!")
return nil, io.EOF
}
sent++
return &logger.Message{}, nil
}
}
var since, until time.Time
followLogsDone := make(chan struct{})
go func() {
followLogs(f, lw, make(chan interface{}), makeDecoder, since, until)
close(followLogsDone)
}()
// read 1 message
select {
case <-lw.Msg:
received++
case err := <-lw.Err:
assert.NilError(t, err)
case <-followLogsDone:
t.Fatal("followLogs() finished unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for log message")
}
// "stop" the "container"
closed = 1
lw.ProducerGone()
// should receive all the messages sent
readDone := make(chan struct{})
go func() {
defer close(readDone)
for {
select {
case <-lw.Msg:
received++
if received == sent {
return
}
case err := <-lw.Err:
assert.NilError(t, err)
}
}
}()
select {
case <-readDone:
case <-time.After(30 * time.Second):
t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received)
}
t.Logf("messages sent: %d, received: %d", sent, received)
// followLogs() should be done by now
select {
case <-followLogsDone:
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for followLogs() to finish")
}
select {
case <-lw.WatchConsumerGone():
t.Fatal("consumer should not have exited")
default:
}
}