| package logger |
| |
| import ( |
| "encoding/binary" |
| "io" |
| "io/ioutil" |
| "os" |
| "testing" |
| "time" |
| |
| "github.com/docker/docker/api/types/plugins/logdriver" |
| protoio "github.com/gogo/protobuf/io" |
| "github.com/stretchr/testify/assert" |
| ) |
| |
| // mockLoggingPlugin implements the loggingPlugin interface for testing purposes |
| // it only supports a single log stream |
| type mockLoggingPlugin struct { |
| inStream io.ReadCloser |
| f *os.File |
| closed chan struct{} |
| t *testing.T |
| } |
| |
| func (l *mockLoggingPlugin) StartLogging(file string, info Info) error { |
| go func() { |
| io.Copy(l.f, l.inStream) |
| close(l.closed) |
| }() |
| return nil |
| } |
| |
| func (l *mockLoggingPlugin) StopLogging(file string) error { |
| l.inStream.Close() |
| l.f.Close() |
| os.Remove(l.f.Name()) |
| return nil |
| } |
| |
| func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) { |
| return Capability{ReadLogs: true}, nil |
| } |
| |
| func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) { |
| r, w := io.Pipe() |
| f, err := os.Open(l.f.Name()) |
| if err != nil { |
| return nil, err |
| } |
| go func() { |
| defer f.Close() |
| dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) |
| enc := logdriver.NewLogEntryEncoder(w) |
| |
| for { |
| select { |
| case <-l.closed: |
| w.Close() |
| return |
| default: |
| } |
| |
| var msg logdriver.LogEntry |
| if err := dec.ReadMsg(&msg); err != nil { |
| if err == io.EOF { |
| if !config.Follow { |
| w.Close() |
| return |
| } |
| dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6) |
| continue |
| } |
| |
| l.t.Fatal(err) |
| continue |
| } |
| |
| if err := enc.Encode(&msg); err != nil { |
| w.CloseWithError(err) |
| return |
| } |
| } |
| }() |
| |
| return r, nil |
| } |
| |
| func newMockPluginAdapter(t *testing.T) Logger { |
| r, w := io.Pipe() |
| f, err := ioutil.TempFile("", "mock-plugin-adapter") |
| assert.NoError(t, err) |
| |
| enc := logdriver.NewLogEntryEncoder(w) |
| a := &pluginAdapterWithRead{ |
| &pluginAdapter{ |
| plugin: &mockLoggingPlugin{ |
| inStream: r, |
| f: f, |
| closed: make(chan struct{}), |
| t: t, |
| }, |
| stream: w, |
| enc: enc, |
| }, |
| } |
| a.plugin.StartLogging("", Info{}) |
| return a |
| } |
| |
| func TestAdapterReadLogs(t *testing.T) { |
| l := newMockPluginAdapter(t) |
| |
| testMsg := []Message{ |
| {Line: []byte("Are you the keymaker?"), Timestamp: time.Now()}, |
| {Line: []byte("Follow the white rabbit"), Timestamp: time.Now()}, |
| } |
| for _, msg := range testMsg { |
| m := msg.copy() |
| assert.NoError(t, l.Log(m)) |
| } |
| |
| lr, ok := l.(LogReader) |
| assert.NotNil(t, ok) |
| |
| lw := lr.ReadLogs(ReadConfig{}) |
| |
| for _, x := range testMsg { |
| select { |
| case msg := <-lw.Msg: |
| testMessageEqual(t, &x, msg) |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout reading logs") |
| } |
| } |
| |
| select { |
| case _, ok := <-lw.Msg: |
| assert.False(t, ok, "expected message channel to be closed") |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout waiting for message channel to close") |
| |
| } |
| lw.Close() |
| |
| lw = lr.ReadLogs(ReadConfig{Follow: true}) |
| for _, x := range testMsg { |
| select { |
| case msg := <-lw.Msg: |
| testMessageEqual(t, &x, msg) |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout reading logs") |
| } |
| } |
| |
| x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()} |
| assert.NoError(t, l.Log(x.copy())) |
| |
| select { |
| case msg, ok := <-lw.Msg: |
| assert.NotNil(t, ok, "message channel unexpectedly closed") |
| testMessageEqual(t, &x, msg) |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout reading logs") |
| } |
| |
| l.Close() |
| select { |
| case msg, ok := <-lw.Msg: |
| assert.False(t, ok, "expected message channel to be closed") |
| assert.Nil(t, msg) |
| case <-time.After(10 * time.Second): |
| t.Fatal("timeout waiting for logger to close") |
| } |
| } |
| |
| func testMessageEqual(t *testing.T, a, b *Message) { |
| assert.Equal(t, a.Line, b.Line) |
| assert.Equal(t, a.Timestamp.UnixNano(), b.Timestamp.UnixNano()) |
| assert.Equal(t, a.Source, b.Source) |
| } |