blob: e53adfa866d6cac119d519cc22320764af5465b0 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package syslog
import (
type fakeSSHRunner struct {
pr *io.PipeReader
closed bool
reconnected bool
errs chan error
lastCmd []string
func (r *fakeSSHRunner) Run(_ context.Context, cmd []string, stdout, _ io.Writer) error {
r.lastCmd = cmd
copyErrs := make(chan error)
go func() {
// We won't "stream" large amounts of data in these tests.
buf := make([]byte, 64)
n, err :=
if err != nil {
copyErrs <- err
_, err = stdout.Write(buf[:n])
copyErrs <- err
select {
case err := <-r.errs:
return err
case err := <-copyErrs:
return err
func (r *fakeSSHRunner) Reconnect(_ context.Context) error {
r.reconnected = true
return nil
func (r *fakeSSHRunner) Close() error {
r.closed = true
return nil
func incrementalStream(cmd []string) bool {
if reflect.DeepEqual(cmd, []string{"/bin/log_listener"}) {
return false
} else if reflect.DeepEqual(cmd, []string{"/bin/log_listener", "--since_now", "yes"}) {
return true
} else {
panic(fmt.Sprintf("unrecognized command %v", cmd))
func TestStream(t *testing.T) {
// NoLogLevel may be changed for verbosity while debugging.
l := logger.NewLogger(logger.NoLogLevel, nil, nil, nil, "")
ctx := logger.WithLogger(context.Background(), l)
pr, pw := io.Pipe()
r := &fakeSSHRunner{
pr: pr,
errs: make(chan error),
defer pr.Close()
defer pw.Close()
syslogger := Syslogger{r: r}
defer func() {
if !r.closed {
panic("runner was not closed")
var buf bytes.Buffer
streamErrs := make(chan error)
go func() {
for {
streamErrs <- syslogger.Stream(ctx, &buf, false)
// The streaming will block until we stub in an error or bytes to stream.
// First let's test our abstraction to verify that bytes are indeed streamed.
t.Run("streams if no errors are encoutered", func(t *testing.T) {
io.WriteString(pw, "ABCDE")
err := <-streamErrs
if err != nil {
t.Fatalf("unexpected streaming error: %v", err)
b, _ := ioutil.ReadAll(&buf)
if string(b) != "ABCDE" {
t.Errorf("unexpected bytes:\nexpected: ABCDE\nactual: %s\n", b)
} else if r.reconnected {
t.Errorf("runner unexpectedly reconnected")
} else if incrementalStream(r.lastCmd) {
t.Errorf("streaming should only be incremental after reconnection")
// Now we check that errors not of type *sshutil.ConnectionError will not
// effect a reconnection.
t.Run("non-connection error interrupts the stream", func(t *testing.T) {
r.errs <- fmt.Errorf("error unrelated to connection")
err := <-streamErrs
if err == nil {
t.Fatalf("expected a streaming failure")
b, _ := ioutil.ReadAll(&buf)
if len(b) > 0 {
t.Errorf("unexpected bytes streamed: %q", b)
} else if r.reconnected {
t.Errorf("runner unexpectedly reconnected")
} else if incrementalStream(r.lastCmd) {
t.Errorf("streaming should only be incremental after reconnection")
// Finally, if we come across a connection error we should reconnect.
t.Run("stream should recover from a connection error", func(t *testing.T) {
// After the first connection error, we should attempt to stream again,
// at which point we have a stubbed nil error waiting to be consumed.
// The streaming error returned should too be nil, by which point we should
// have reconnected.
r.errs <- &sshutil.ConnectionError{fmt.Errorf("connection error")}
r.errs <- nil
err := <-streamErrs
if err != nil {
t.Errorf("unexpected streaming error: %v", err)
} else if !r.reconnected {
t.Errorf("runner failed to reconnect")
} else if !incrementalStream(r.lastCmd) {
t.Errorf("streaming should be incremental after reconnection")