| package loggerutils |
| |
| import ( |
| "bufio" |
| "context" |
| "io" |
| "io/ioutil" |
| "os" |
| "strings" |
| "testing" |
| "time" |
| |
| "github.com/docker/docker/daemon/logger" |
| "github.com/docker/docker/pkg/tailfile" |
| "gotest.tools/assert" |
| ) |
| |
| func TestTailFiles(t *testing.T) { |
| s1 := strings.NewReader("Hello.\nMy name is Inigo Montoya.\n") |
| s2 := strings.NewReader("I'm serious.\nDon't call me Shirley!\n") |
| s3 := strings.NewReader("Roads?\nWhere we're going we don't need roads.\n") |
| |
| files := []SizeReaderAt{s1, s2, s3} |
| watcher := logger.NewLogWatcher() |
| createDecoder := func(r io.Reader) func() (*logger.Message, error) { |
| scanner := bufio.NewScanner(r) |
| return func() (*logger.Message, error) { |
| if !scanner.Scan() { |
| return nil, scanner.Err() |
| } |
| // some comment |
| return &logger.Message{Line: scanner.Bytes(), Timestamp: time.Now()}, nil |
| } |
| } |
| |
| tailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) { |
| return tailfile.NewTailReader(ctx, r, lines) |
| } |
| |
| for desc, config := range map[string]logger.ReadConfig{} { |
| t.Run(desc, func(t *testing.T) { |
| started := make(chan struct{}) |
| go func() { |
| close(started) |
| tailFiles(files, watcher, createDecoder, tailReader, config) |
| }() |
| <-started |
| }) |
| } |
| |
| config := logger.ReadConfig{Tail: 2} |
| started := make(chan struct{}) |
| go func() { |
| close(started) |
| tailFiles(files, watcher, createDecoder, tailReader, config) |
| }() |
| <-started |
| |
| select { |
| case <-time.After(60 * time.Second): |
| t.Fatal("timeout waiting for tail line") |
| case err := <-watcher.Err: |
| assert.NilError(t, err) |
| case msg := <-watcher.Msg: |
| assert.Assert(t, msg != nil) |
| assert.Assert(t, string(msg.Line) == "Roads?", string(msg.Line)) |
| } |
| |
| select { |
| case <-time.After(60 * time.Second): |
| t.Fatal("timeout waiting for tail line") |
| case err := <-watcher.Err: |
| assert.NilError(t, err) |
| case msg := <-watcher.Msg: |
| assert.Assert(t, msg != nil) |
| assert.Assert(t, string(msg.Line) == "Where we're going we don't need roads.", string(msg.Line)) |
| } |
| } |
| |
| func TestFollowLogsConsumerGone(t *testing.T) { |
| lw := logger.NewLogWatcher() |
| |
| f, err := ioutil.TempFile("", t.Name()) |
| assert.NilError(t, err) |
| defer func() { |
| f.Close() |
| os.Remove(f.Name()) |
| }() |
| |
| makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { |
| return func() (*logger.Message, error) { |
| return &logger.Message{}, nil |
| } |
| } |
| |
| followLogsDone := make(chan struct{}) |
| var since, until time.Time |
| go func() { |
| followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) |
| close(followLogsDone) |
| }() |
| |
| select { |
| case <-lw.Msg: |
| case err := <-lw.Err: |
| assert.NilError(t, err) |
| case <-followLogsDone: |
| t.Fatal("follow logs finished unexpectedly") |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout waiting for log message") |
| } |
| |
| lw.ConsumerGone() |
| select { |
| case <-followLogsDone: |
| case <-time.After(20 * time.Second): |
| t.Fatal("timeout waiting for followLogs() to finish") |
| } |
| } |
| |
| func TestFollowLogsProducerGone(t *testing.T) { |
| lw := logger.NewLogWatcher() |
| |
| f, err := ioutil.TempFile("", t.Name()) |
| assert.NilError(t, err) |
| defer os.Remove(f.Name()) |
| |
| var sent, received, closed int |
| makeDecoder := func(rdr io.Reader) func() (*logger.Message, error) { |
| return func() (*logger.Message, error) { |
| if closed == 1 { |
| closed++ |
| t.Logf("logDecode() closed after sending %d messages\n", sent) |
| return nil, io.EOF |
| } else if closed > 1 { |
| t.Fatal("logDecode() called after closing!") |
| return nil, io.EOF |
| } |
| sent++ |
| return &logger.Message{}, nil |
| } |
| } |
| var since, until time.Time |
| |
| followLogsDone := make(chan struct{}) |
| go func() { |
| followLogs(f, lw, make(chan interface{}), makeDecoder, since, until) |
| close(followLogsDone) |
| }() |
| |
| // read 1 message |
| select { |
| case <-lw.Msg: |
| received++ |
| case err := <-lw.Err: |
| assert.NilError(t, err) |
| case <-followLogsDone: |
| t.Fatal("followLogs() finished unexpectedly") |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout waiting for log message") |
| } |
| |
| // "stop" the "container" |
| closed = 1 |
| lw.ProducerGone() |
| |
| // should receive all the messages sent |
| readDone := make(chan struct{}) |
| go func() { |
| defer close(readDone) |
| for { |
| select { |
| case <-lw.Msg: |
| received++ |
| if received == sent { |
| return |
| } |
| case err := <-lw.Err: |
| assert.NilError(t, err) |
| } |
| } |
| }() |
| select { |
| case <-readDone: |
| case <-time.After(30 * time.Second): |
| t.Fatalf("timeout waiting for log messages to be read (sent: %d, received: %d", sent, received) |
| } |
| |
| t.Logf("messages sent: %d, received: %d", sent, received) |
| |
| // followLogs() should be done by now |
| select { |
| case <-followLogsDone: |
| case <-time.After(30 * time.Second): |
| t.Fatal("timeout waiting for followLogs() to finish") |
| } |
| |
| select { |
| case <-lw.WatchConsumerGone(): |
| t.Fatal("consumer should not have exited") |
| default: |
| } |
| } |