blob: 8a4ed2be13473369bb2a0f1c64b5defe367fdf3f [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syslog
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"testing"
"go.fuchsia.dev/fuchsia/tools/lib/logger"
"go.fuchsia.dev/fuchsia/tools/net/sshutil"
"golang.org/x/crypto/ssh"
)
type fakeSSHRunner struct {
pr *io.PipeReader
closed bool
reconnected bool
errs chan error
reconnectErrs chan error
lastCmd []string
}
func (r *fakeSSHRunner) Run(_ context.Context, cmd []string, stdout, _ io.Writer) error {
r.lastCmd = cmd
copyErrs := make(chan error)
go func() {
// We won't "stream" large amounts of data in these tests.
buf := make([]byte, 64)
n, err := r.pr.Read(buf)
if err != nil {
copyErrs <- err
return
}
_, err = stdout.Write(buf[:n])
copyErrs <- err
}()
select {
case err := <-r.errs:
return err
case err := <-copyErrs:
return err
}
}
func (r *fakeSSHRunner) Reconnect(_ context.Context) (*ssh.Client, error) {
select {
case err := <-r.reconnectErrs:
if err != nil {
return nil, err
}
default:
// If the channel is empty, we consider reconnection to be successful.
}
r.reconnected = true
return nil, nil
}
func (r *fakeSSHRunner) Close() error {
r.closed = true
return nil
}
func TestStream(t *testing.T) {
// NoLogLevel may be changed for verbosity while debugging.
l := logger.NewLogger(logger.NoLogLevel, nil, nil, nil, "")
ctx := logger.WithLogger(context.Background(), l)
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
r := &fakeSSHRunner{
pr: pr,
errs: make(chan error),
reconnectErrs: make(chan error, 10),
}
// Use a reconnectInterval of zero to avoid sleeping.
syslogger := Syslogger{r: r, reconnectInterval: 0}
defer func() {
syslogger.Close()
if !r.closed {
panic("runner was not closed")
}
}()
var buf bytes.Buffer
streamErrs := make(chan error)
go func() {
for {
streamErrs <- syslogger.Stream(ctx, &buf)
}
}()
// The streaming will block until we stub in an error or bytes to stream.
// First let's test our abstraction to verify that bytes are indeed streamed.
t.Run("streams if no errors are encoutered", func(t *testing.T) {
io.WriteString(pw, "ABCDE")
err := <-streamErrs
if err != nil {
t.Fatalf("unexpected streaming error: %v", err)
}
b, _ := ioutil.ReadAll(&buf)
if string(b) != "ABCDE" {
t.Errorf("unexpected bytes:\nexpected: ABCDE\nactual: %s\n", b)
} else if r.reconnected {
t.Errorf("runner unexpectedly reconnected")
}
})
// Now we check that errors not of type *sshutil.ConnectionError will not
// effect a reconnection.
t.Run("non-connection error interrupts the stream", func(t *testing.T) {
r.errs <- fmt.Errorf("error unrelated to connection")
err := <-streamErrs
if err == nil {
t.Fatalf("expected a streaming failure")
}
b, _ := ioutil.ReadAll(&buf)
if len(b) > 0 {
t.Errorf("unexpected bytes streamed: %q", b)
} else if r.reconnected {
t.Errorf("runner unexpectedly reconnected")
}
})
// If we come across a connection error we should reconnect.
t.Run("stream should recover from a connection error", func(t *testing.T) {
// After the first connection error, we should attempt to stream again,
// at which point we have a stubbed nil error waiting to be consumed.
// The streaming error returned should too be nil, by which point we should
// have reconnected.
r.errs <- sshutil.ConnectionError
r.errs <- nil
err := <-streamErrs
if err != nil {
t.Errorf("unexpected streaming error: %v", err)
} else if !r.reconnected {
t.Errorf("runner failed to reconnect")
}
})
// If reconnection fails, we should keep trying to reconnect.
t.Run("stream should retry reconnection", func(t *testing.T) {
// After a connection error, we should keep trying to reconnect until we
// succeed.
reconnectErr := errors.New("failed to reconnect")
// Send N reconnection errors into the channel so that they get picked
// up when the syslogger tries to reconnect after being interrupted by
// the ConnectionError. This will cause Reconnect() to fail N times and
// succeed the N+1th time.
r.reconnectErrs <- reconnectErr
r.reconnectErrs <- reconnectErr
r.reconnectErrs <- reconnectErr
r.errs <- sshutil.ConnectionError
// Reconnection failures should not have caused `Stream()` to return, so
// the streamErrs channel should be empty.
select {
case err := <-streamErrs:
if err != nil {
t.Errorf("stream didn't keep trying to reconnect: %v", err)
} else {
t.Errorf("unexpected completion of streaming after reconnection failure")
}
default:
}
})
}