blob: a29188d95303691a8f45633cd5332d787a33defb [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package testrunner
import (
botanistconstants ""
sshutilconstants ""
const (
// TODO( Fix this path.
// The output data directory for component v2 tests.
dataOutputDirV2 = "/tmp/test_manager:0/data/debug"
// The output data directory for early boot coverage.
dataOutputDirEarlyBoot = "/tmp/test_manager:0/data/kernel_debug/debugdata"
// Various tools for running tests.
runtestsName = "runtests"
runTestSuiteName = "run-test-suite"
// Returned by run-test-suite to indicate the test timed out.
timeoutExitCode = 21
// Printed to the serial console when ready to accept user input.
serialConsoleCursor = "\n$"
// Number of times to try running a test command over serial before giving
// up. This value was somewhat arbitrarily chosen and can be adjusted higher
// or lower if deemed appropriate.
startSerialCommandMaxAttempts = 3
llvmProfileEnvKey = "LLVM_PROFILE_FILE"
llvmProfileExtension = ".profraw"
llvmProfileSinkType = "llvm-profile"
// This needs to be long enough to allow for significant serial RTT during
// startup shortly after booting. We've seen a serial RTT ~8s, so maybe 15s
// will be enough margin above ~8s.
testStartedTimeout = 15 * time.Second
// The name of the test to associate early boot data sinks with.
earlyBootSinksTestName = "early_boot_sinks"
// The max number of times to try reconnecting to the target.
maxReconnectAttempts = 3
// Tester describes the interface for all different types of testers.
type Tester interface {
Test(context.Context, testsharder.Test, io.Writer, io.Writer, string) (*TestResult, error)
ProcessResult(context.Context, testsharder.Test, string, *TestResult, error) (*TestResult, error)
Close() error
EnsureSinks(context.Context, []runtests.DataSinkReference, *TestOutputs) error
RunSnapshot(context.Context, string) error
Reconnect(context.Context) error
// For testability
type cmdRunner interface {
Run(ctx context.Context, command []string, options subprocess.RunOptions) error
// For testability
var newRunner = func(dir string, env []string) cmdRunner {
return &subprocess.Runner{Dir: dir, Env: env}
// For testability.
var newTempDir = func(dir, pattern string) (string, error) {
return os.MkdirTemp(dir, pattern)
// For testability
type sshClient interface {
DisconnectionListener() <-chan struct{}
ReconnectWithBackoff(ctx context.Context, backoff retry.Backoff) error
Run(ctx context.Context, command []string, stdout, stderr io.Writer) error
// For testability
type dataSinkCopier interface {
GetAllDataSinks(remoteDir string) ([]runtests.DataSink, error)
GetReferences(remoteDir string) (map[string]runtests.DataSinkReference, error)
Copy(sinks []runtests.DataSinkReference, localDir string) (runtests.DataSinkMap, error)
RemoveAll(remoteDir string) error
Reconnect() error
Close() error
// For testability
type serialClient interface {
runDiagnostics(ctx context.Context) error
// BaseTestResultFromTest returns a TestResult for a Tester.Test() to modify
// and return with some pre-filled values and a starting failure result which
// should be changed as needed within the tester's Test() method.
func BaseTestResultFromTest(test testsharder.Test) *TestResult {
return &TestResult{
Name: test.Name,
GNLabel: test.Label,
Result: runtests.TestFailure,
DataSinks: runtests.DataSinkReference{},
Tags: test.Tags,
// SubprocessTester executes tests in local subprocesses.
type SubprocessTester struct {
env []string
dir string
localOutputDir string
sProps *sandboxingProps
testRuns map[string]string
type sandboxingProps struct {
nsjailPath string
nsjailRoot string
mountQEMU bool
mountUserHome bool
cwd string
// NewSubprocessTester returns a SubprocessTester that can execute tests
// locally with a given working directory and environment.
func NewSubprocessTester(dir string, env []string, localOutputDir, nsjailPath, nsjailRoot string) (Tester, error) {
s := &SubprocessTester{
dir: dir,
env: env,
localOutputDir: localOutputDir,
testRuns: make(map[string]string),
// If the caller provided a path to NsJail, then intialize sandboxing properties.
if nsjailPath != "" {
s.sProps = &sandboxingProps{
nsjailPath: nsjailPath,
nsjailRoot: nsjailRoot,
// TODO(rudymathu): Remove this once ssh/ssh-keygen usage is removed.
mountUserHome: true,
if _, err := os.Stat("/sys/class/net/qemu/"); err == nil {
s.sProps.mountQEMU = true
} else if !errors.Is(err, os.ErrNotExist) {
return &SubprocessTester{}, nil
cwd, err := os.Getwd()
if err != nil {
return &SubprocessTester{}, err
s.sProps.cwd = cwd
return s, nil
func (t *SubprocessTester) setTestRun(test testsharder.Test, profileRelDir string) {
t.testRuns[test.Path] = profileRelDir
func (t *SubprocessTester) getTestRun(test testsharder.Test) string {
profileRelDir, ok := t.testRuns[test.Path]
if !ok {
return ""
return profileRelDir
func (t *SubprocessTester) Test(ctx context.Context, test testsharder.Test, stdout io.Writer, stderr io.Writer, outDir string) (*TestResult, error) {
testResult := BaseTestResultFromTest(test)
if test.Path == "" {
testResult.FailReason = fmt.Sprintf("test %q has no `path` set", test.Name)
return testResult, nil
// Some tests read TestOutDirEnvKey so ensure they get their own output dir.
if err := os.MkdirAll(outDir, 0o770); err != nil {
testResult.FailReason = err.Error()
return testResult, nil
// Might as well emit any profiles directly to the output directory.
// We'll set
// LLVM_PROFILE_FILE=<output dir>/<test-specific namsepace>/%m.profraw
// and then record any .profraw file written to that directory as an
// emitted profile.
profileRelDir := filepath.Join(llvmProfileSinkType, test.Path)
profileAbsDir := filepath.Join(t.localOutputDir, profileRelDir)
os.MkdirAll(profileAbsDir, os.ModePerm)
r := newRunner(t.dir, append(
fmt.Sprintf("%s=%s", constants.TestOutDirEnvKey, outDir),
// When host-side tests are instrumented for profiling, executing
// them will write a profile to the location under this environment variable.
fmt.Sprintf("%s=%s", llvmProfileEnvKey, filepath.Join(profileAbsDir, "%m"+llvmProfileExtension)),
if test.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, test.Timeout)
defer cancel()
// './' is a package-level construct in os/exec whose use is recommended
// when the provided invocation executes things relative to the local
// working directory. In particular, unless it is in $PATH, executables in
// the immediate working directory require this to be referenced
// relatively as of Go v1.19.
testCmd := []string{"./" + test.Path}
if t.sProps != nil {
testCmdBuilder := &NsJailCmdBuilder{
Bin: t.sProps.nsjailPath,
// TODO(rudymathu): Eventually, this should be a more fine grained
// property that disables network isolation only on tests that explicitly
// request it.
IsolateNetwork: false,
MountPoints: []*MountPt{
Src: t.localOutputDir,
Writable: true,
Src: outDir,
Writable: true,
// The fx_script_tests utilize this file.
Src: "/usr/share/misc/magic.mgc",
Symlinks: map[string]string{
"/proc/self/fd": "/dev/fd",
if isolateDir, ok := os.LookupEnv(ffxutil.FFXIsolateDirEnvKey); ok {
testCmdBuilder.MountPoints = append(
Src: isolateDir,
Writable: true,
// Mount the QEMU tun_flags if the qemu interface exists. This is used
// by VDL to ascertain that the interface exists.
if t.sProps.mountQEMU {
testCmdBuilder.MountPoints = append(
Src: "/sys/class/net/qemu/",
// Some tests invoke the `ssh` command line tool, which always creates
// a .ssh file in the home directory. Unfortunately, it prefers to read
// the home directory from the /etc/passwd file, and only reads $HOME
// if this doesn't work. Because we need to mount /etc/passwd for
// ssh-keygen, we need to create the same home directory in
// /etc/passwd. This is really quite a big hack, and we should remove
// it ASAP.
if t.sProps.mountUserHome {
currentUser, err := user.Current()
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
pwdFile, err := os.Open("/etc/passwd")
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
defer pwdFile.Close()
pwdScanner := bufio.NewScanner(pwdFile)
for pwdScanner.Scan() {
elems := strings.Split(pwdScanner.Text(), ":")
if elems[0] == currentUser.Username {
testCmdBuilder.MountPoints = append(
Dst: elems[5],
UseTmpfs: true,
if pwdScanner.Err() != nil {
testResult.FailReason = pwdScanner.Err().Error()
return testResult, nil
// Mount /tmp. Ideally, we would use a tmpfs mount, but we write quite a
// lot of data to it, so we instead create a temp dir and mount it
// instead.
tmpDir, err := newTempDir("", "")
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
defer os.RemoveAll(tmpDir)
testCmdBuilder.MountPoints = append(
Src: tmpDir,
Dst: "/tmp",
Writable: true,
// Construct the sandbox's environment by forwarding the current env
// but overriding the TempDirEnvVars with /tmp.
// Also override FUCHSIA_TEST_OUTDIR with the outdir specific to this
// test.
envOverrides := map[string]string{
"TMPDIR": "/tmp",
constants.TestOutDirEnvKey: outDir,
llvmProfileEnvKey: filepath.Join(profileAbsDir, "%m"+llvmProfileExtension),
for _, key := range environment.TempDirEnvVars() {
envOverrides[key] = "/tmp"
// Set the root of the NsJail and the working directory.
// The working directory is expected to be a subdirectory of the root.
if t.sProps.nsjailRoot != "" {
absRoot, err := filepath.Abs(t.sProps.nsjailRoot)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
testCmdBuilder.MountPoints = append(
&MountPt{Src: absRoot, Writable: true},
testCmdBuilder.Cwd = t.sProps.cwd
// Mount the testbed config and any serial sockets.
testbedConfigPath := os.Getenv(botanistconstants.TestbedConfigEnvKey)
if testbedConfigPath != "" {
// Mount the actual config.
testCmdBuilder.MountPoints = append(testCmdBuilder.MountPoints, &MountPt{Src: testbedConfigPath})
// Mount the SSH keys and serial sockets for each target in the testbed.
type targetInfo struct {
SerialSocket string `json:"serial_socket"`
SSHKey string `json:"ssh_key"`
b, err := os.ReadFile(testbedConfigPath)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
var testbedConfig []targetInfo
if err := json.Unmarshal(b, &testbedConfig); err != nil {
testResult.FailReason = err.Error()
return testResult, nil
serialSockets := make(map[string]struct{})
sshKeys := make(map[string]struct{})
for _, config := range testbedConfig {
if config.SSHKey != "" {
sshKeys[config.SSHKey] = struct{}{}
if config.SerialSocket != "" {
serialSockets[config.SerialSocket] = struct{}{}
for socket := range serialSockets {
absSocketPath, err := filepath.Abs(socket)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
testCmdBuilder.MountPoints = append(testCmdBuilder.MountPoints, &MountPt{
Src: absSocketPath,
Writable: true,
for key := range sshKeys {
absKeyPath, err := filepath.Abs(key)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
testCmdBuilder.MountPoints = append(testCmdBuilder.MountPoints, &MountPt{
Src: absKeyPath,
testCmd, err = testCmdBuilder.Build(testCmd)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
err := r.Run(ctx, testCmd, subprocess.RunOptions{Stdout: stdout, Stderr: stderr})
t.setTestRun(test, profileRelDir)
if err == nil {
testResult.Result = runtests.TestSuccess
} else if errors.Is(err, context.DeadlineExceeded) {
testResult.Result = runtests.TestAborted
} else {
testResult.FailReason = err.Error()
return testResult, nil
func (t *SubprocessTester) ProcessResult(ctx context.Context, test testsharder.Test, outDir string, testResult *TestResult, err error) (*TestResult, error) {
profileRelDir := t.getTestRun(test)
if profileRelDir == "" {
return testResult, err
profileAbsDir := filepath.Join(t.localOutputDir, profileRelDir)
var sinks []runtests.DataSink
profileErr := filepath.WalkDir(profileAbsDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
if !d.IsDir() {
profileRel, err := filepath.Rel(profileAbsDir, path)
if err != nil {
return err
sinks = append(sinks, runtests.DataSink{
Name: filepath.Base(path),
File: filepath.Join(profileRelDir, profileRel),
return nil
if profileErr != nil {
logger.Errorf(ctx, "unable to determine whether profiles were emitted: %s", profileErr)
if len(sinks) > 0 {
testResult.DataSinks.Sinks = runtests.DataSinkMap{
llvmProfileSinkType: sinks,
return testResult, err
func (t *SubprocessTester) EnsureSinks(ctx context.Context, sinkRefs []runtests.DataSinkReference, _ *TestOutputs) error {
// Nothing to actually copy; if any profiles were emitted, they would have
// been written directly to the output directory. We verify here that all
// recorded data sinks are actually present.
numSinks := 0
for _, ref := range sinkRefs {
for _, sinks := range ref.Sinks {
for _, sink := range sinks {
abs := filepath.Join(t.localOutputDir, sink.File)
exists, err := osmisc.FileExists(abs)
if err != nil {
return fmt.Errorf("unable to determine if local data sink %q exists: %w", sink.File, err)
} else if !exists {
return fmt.Errorf("expected a local data sink %q, but no such file exists", sink.File)
if numSinks > 0 {
logger.Debugf(ctx, "local data sinks present: %d", numSinks)
return nil
func (t *SubprocessTester) RunSnapshot(_ context.Context, _ string) error {
return nil
func (t *SubprocessTester) Reconnect(_ context.Context) error {
return nil
func (t *SubprocessTester) Close() error {
return nil
type serialSocket struct {
socketPath string
func (s *serialSocket) runDiagnostics(ctx context.Context) error {
if s.socketPath == "" {
return fmt.Errorf("serialSocketPath not set")
socket, err := serial.NewSocket(ctx, s.socketPath)
if err != nil {
return fmt.Errorf("newSerialSocket failed: %w", err)
defer socket.Close()
return serial.RunDiagnostics(ctx, socket)
// for testability
type FFXInstance interface {
RunWithTarget(ctx context.Context, args ...string) error
RunWithTargetAndTimeout(ctx context.Context, timeout time.Duration, args ...string) error
Stdout() io.Writer
Stderr() io.Writer
SetStdoutStderr(stdout, stderr io.Writer)
Test(ctx context.Context, tests build.TestList, outDir string, args ...string) (*ffxutil.TestRunResult, error)
Snapshot(ctx context.Context, outDir string, snapshotFilename string) error
Stop() error
// FFXTester uses ffx to run tests and other enabled features.
type FFXTester struct {
ffx FFXInstance
experimentLevel int
// It will temporarily use an sshTester for functions where ffx has not been
// enabled to run yet.
// TODO(ihuh): Remove once all v1 tests are migrated to v2 and data sinks are
// available as an output artifact of `ffx test`.
sshTester Tester
localOutputDir string
// The test output dirs from all the calls to Test().
testOutDirs []string
// A map of the test PackageURL to data needed for processing the test results.
testRuns map[string]ffxTestRun
type ffxTestRun struct {
result *ffxutil.TestRunResult
output string
totalDuration time.Duration
// NewFFXTester returns an FFXTester.
func NewFFXTester(ctx context.Context, ffx FFXInstance, sshTester Tester, localOutputDir string, experimentLevel int) (*FFXTester, error) {
if err := retry.Retry(ctx, retry.WithMaxAttempts(retry.NewConstantBackoff(time.Second), maxReconnectAttempts), func() error {
return ffx.RunWithTarget(ctx, "target", "wait", "-t", "10")
}, nil); err != nil {
return nil, err
return &FFXTester{
ffx: ffx,
sshTester: sshTester,
localOutputDir: localOutputDir,
experimentLevel: experimentLevel,
testRuns: make(map[string]ffxTestRun),
}, nil
func (t *FFXTester) EnabledForTesting() bool {
return t.experimentLevel >= 2
func (t *FFXTester) Test(ctx context.Context, test testsharder.Test, stdout, stderr io.Writer, outDir string) (*TestResult, error) {
if t.EnabledForTesting() {
return BaseTestResultFromTest(test), t.testWithFile(ctx, test, stdout, stderr, outDir)
return t.sshTester.Test(ctx, test, stdout, stderr, outDir)
func (t *FFXTester) Reconnect(ctx context.Context) error {
if t.EnabledForTesting() {
return retry.Retry(ctx, retry.WithMaxDuration(retry.NewConstantBackoff(time.Second), 30*time.Second), func() error {
return t.ffx.RunWithTarget(ctx, "target", "wait", "-t", "10")
}, nil)
return t.sshTester.Reconnect(ctx)
// testWithFile runs `ffx test` with -test-file and returns the test result.
func (t *FFXTester) testWithFile(ctx context.Context, test testsharder.Test, stdout, stderr io.Writer, outDir string) error {
testDef := []build.TestListEntry{{
Name: test.PackageURL,
Labels: []string{test.Label},
Execution: build.ExecutionDef{
Type: "fuchsia_component",
ComponentURL: test.PackageURL,
TimeoutSeconds: int(test.Timeout.Seconds()),
Parallel: test.Parallel,
MaxSeverityLogs: test.LogSettings.MaxSeverity,
MinSeverityLogs: test.LogSettings.MinSeverity,
Realm: test.Realm,
Tags: test.Tags,
origStdout := t.ffx.Stdout()
origStderr := t.ffx.Stderr()
var buf bytes.Buffer
stdout = io.MultiWriter(stdout, &buf)
stderr = io.MultiWriter(stderr, &buf)
t.ffx.SetStdoutStderr(stdout, stderr)
defer t.ffx.SetStdoutStderr(origStdout, origStderr)
extraArgs := []string{"--filter-ansi"}
if t.experimentLevel == 3 {
extraArgs = append(extraArgs, "--experimental-parallel-execution", "8")
startTime := clock.Now(ctx)
runResult, err := t.ffx.Test(ctx, build.TestList{Data: testDef, SchemaID: build.TestListSchemaIDExperimental}, outDir, extraArgs...)
if runResult == nil && err == nil {
err = fmt.Errorf("no test result was found")
t.testRuns[test.PackageURL] = ffxTestRun{runResult, buf.String(), clock.Now(ctx).Sub(startTime)}
return err
func (t *FFXTester) ProcessResult(ctx context.Context, test testsharder.Test, outDir string, testResult *TestResult, err error) (*TestResult, error) {
if !t.EnabledForTesting() {
return t.sshTester.ProcessResult(ctx, test, outDir, testResult, err)
finalTestResult := testResult
testRun := t.testRuns[test.PackageURL]
if testRun.result != nil {
testOutDir := testRun.result.GetTestOutputDir()
t.testOutDirs = append(t.testOutDirs, testOutDir)
testResult, err = processTestResult(testRun.result, test, testRun.totalDuration, false)
if err != nil {
finalTestResult.FailReason = err.Error()
} else if testResult == nil {
finalTestResult.FailReason = "expected 1 test result, got none"
} else {
finalTestResult = testResult
ffxTag := build.TestTag{Key: "use_ffx", Value: "true"}
finalTestResult.Tags = append(finalTestResult.Tags, ffxTag)
for i, testCase := range finalTestResult.Cases {
finalTestResult.Cases[i].Tags = append(testCase.Tags, ffxTag)
if finalTestResult.Result != runtests.TestSuccess && strings.Contains(testRun.output, sshutilconstants.ProcessTerminatedMsg) {
if err == nil {
err = errors.New(sshutilconstants.ProcessTerminatedMsg)
return finalTestResult, connectionError{err}
return finalTestResult, nil
func processTestResult(runResult *ffxutil.TestRunResult, test testsharder.Test, totalDuration time.Duration, removeProfiles bool) (*TestResult, error) {
testOutDir := runResult.GetTestOutputDir()
suiteResults, err := runResult.GetSuiteResults()
if err != nil {
return nil, err
if len(suiteResults) != 1 {
return nil, fmt.Errorf("expected 1 test result, got %d", len(suiteResults))
suiteResult := suiteResults[0]
testResult := BaseTestResultFromTest(test)
switch suiteResult.Outcome {
case ffxutil.TestPassed:
testResult.Result = runtests.TestSuccess
case ffxutil.TestTimedOut:
testResult.Result = runtests.TestAborted
case ffxutil.TestNotStarted:
testResult.Result = runtests.TestSkipped
testResult.Result = runtests.TestFailure
testResult.Tags = append(testResult.Tags, build.TestTag{Key: "test_outcome", Value: suiteResult.Outcome})
var suiteArtifacts []string
var stdioPath string
suiteArtifactDir := filepath.Join(testOutDir, suiteResult.ArtifactDir)
for artifact, metadata := range suiteResult.Artifacts {
if _, err := os.Stat(filepath.Join(suiteArtifactDir, artifact)); os.IsNotExist(err) {
// Don't record artifacts that don't exist.
if metadata.ArtifactType == ffxutil.ReportType {
// Copy the report log into the filename expected by infra.
// TODO( Remove dependencies on this filename.
absPath := filepath.Join(suiteArtifactDir, artifact)
stdioPath = filepath.Join(suiteArtifactDir, runtests.TestOutputFilename)
if err := os.Rename(absPath, stdioPath); err != nil {
return testResult, err
suiteArtifacts = append(suiteArtifacts, runtests.TestOutputFilename)
} else {
suiteArtifacts = append(suiteArtifacts, artifact)
testResult.OutputFiles = suiteArtifacts
testResult.OutputDir = suiteArtifactDir
var cases []runtests.TestCaseResult
for _, testCase := range suiteResult.Cases {
var status runtests.TestResult
switch testCase.Outcome {
case ffxutil.TestPassed:
status = runtests.TestSuccess
case ffxutil.TestSkipped:
status = runtests.TestSkipped
status = runtests.TestFailure
var artifacts []string
var failReason string
testCaseArtifactDir := filepath.Join(testOutDir, testCase.ArtifactDir)
for artifact, metadata := range testCase.Artifacts {
if _, err := os.Stat(filepath.Join(testCaseArtifactDir, artifact)); os.IsNotExist(err) {
// Don't record artifacts that don't exist.
// Get the failReason from the stderr log.
// TODO(ihuh): The stderr log may contain unsymbolized logs.
// Consider symbolizing them within ffx or testrunner.
if metadata.ArtifactType == ffxutil.StderrType {
stderrBytes, err := os.ReadFile(filepath.Join(testCaseArtifactDir, artifact))
if err != nil {
failReason = fmt.Sprintf("failed to read stderr for test case %s: %s", testCase.Name, err)
} else {
failReason = string(stderrBytes)
artifacts = append(artifacts, artifact)
cases = append(cases, runtests.TestCaseResult{
DisplayName: testCase.Name,
CaseName: testCase.Name,
Status: status,
FailReason: failReason,
Format: "FTF",
OutputFiles: artifacts,
OutputDir: testCaseArtifactDir,
Tags: []build.TestTag{{Key: "test_outcome", Value: testCase.Outcome}},
testResult.Cases = cases
testResult.StartTime = time.UnixMilli(suiteResult.StartTime)
testResult.EndTime = time.UnixMilli(suiteResult.StartTime + suiteResult.DurationMilliseconds)
// Calculate overhead from running ffx test and add it to the recorded
// test duration to more accurately capture the total duration of the test.
overhead := totalDuration - testResult.Duration()
testResult.EndTime = testResult.EndTime.Add(overhead)
// The runResult's artifacts should contain a directory with the profiles from
// component v2 tests along with a summary.json that lists the data sinks per test.
// It should also contain a second directory with early boot data sinks.
// TODO( Merge profiles on host when using ffx test. When using
// run-test-suite, we can just remove the entire artifact directory because we'll
// scp the profiles off the target at the end of the task instead.
if removeProfiles {
runArtifactDir := filepath.Join(testOutDir, runResult.ArtifactDir)
if err := os.RemoveAll(runArtifactDir); err != nil {
return testResult, err
return testResult, nil
// UpdateOutputDir updates the output dir with the oldDir substring with the newDir.
// This should be called if the outputs are moved.
func (t *FFXTester) UpdateOutputDir(oldDir, newDir string) {
for i, outDir := range t.testOutDirs {
if strings.Contains(outDir, oldDir) {
t.testOutDirs[i] = strings.ReplaceAll(outDir, oldDir, newDir)
// RemoveAllEmptyOutputDirs cleans up the output dirs by removing all empty
// directories. This leaves the run_summary and suite_summaries for debugging.
func (t *FFXTester) RemoveAllEmptyOutputDirs() error {
var errs []string
for _, outDir := range t.testOutDirs {
err := filepath.WalkDir(outDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
if d.IsDir() {
files, err := os.ReadDir(path)
if err != nil {
return err
if len(files) == 0 {
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("failed to remove %s: %s", path, err)
return filepath.SkipDir
return nil
if err != nil {
errs = append(errs, fmt.Sprintf("%v", err))
return fmt.Errorf(strings.Join(errs, "; "))
func (t *FFXTester) Close() error {
return t.sshTester.Close()
func (t *FFXTester) EnsureSinks(ctx context.Context, sinks []runtests.DataSinkReference, outputs *TestOutputs) error {
useFFXEarlyBoot := t.experimentLevel >= 1
if !t.EnabledForTesting() {
if useFFXEarlyBoot {
if sshTester, ok := t.sshTester.(*FuchsiaSSHTester); ok {
if err := t.sshTester.EnsureSinks(ctx, sinks, outputs); err != nil {
return err
if !useFFXEarlyBoot {
return nil
sinksPerTest := make(map[string]runtests.DataSinkReference)
for _, testOutDir := range t.testOutDirs {
if err := t.getSinks(ctx, testOutDir, sinksPerTest, useFFXEarlyBoot); err != nil {
return err
if useFFXEarlyBoot {
if err := t.getEarlyBootProfiles(ctx, sinksPerTest); err != nil {
// OTA tests cause this command to fail, but aren't used for collecting
// coverage anyway. If this fails, just log the error and continue
// processing the rest of the data sinks.
logger.Debugf(ctx, "failed to determine early boot data sinks: %s", err)
// If there were early boot sinks, record the "early_boot_sinks" test in the outputs
// so that the test result can be updated with the early boot sinks.
if _, ok := sinksPerTest[earlyBootSinksTestName]; ok {
earlyBootSinksTest := &TestResult{
Name: earlyBootSinksTestName,
Result: runtests.TestSuccess,
outputs.Record(ctx, *earlyBootSinksTest)
if len(sinksPerTest) > 0 {
outputs.updateDataSinks(sinksPerTest, "v2")
// Copy v1 sinks using the sshTester if ffx was used for testing.
if sshTester, ok := t.sshTester.(*FuchsiaSSHTester); ok && t.EnabledForTesting() {
return sshTester.copySinks(ctx, sinks, t.localOutputDir)
return nil
func (t *FFXTester) getEarlyBootProfiles(ctx context.Context, sinksPerTest map[string]runtests.DataSinkReference) error {
testOutDir := filepath.Join(t.localOutputDir, "early-boot-profiles")
if err := os.MkdirAll(testOutDir, os.ModePerm); err != nil {
return err
if err := t.ffx.RunWithTargetAndTimeout(ctx, 0, "test", "early-boot-profile", "--output-directory", testOutDir); err != nil {
return err
return t.getSinks(ctx, testOutDir, sinksPerTest, false)
func (t *FFXTester) getSinks(ctx context.Context, testOutDir string, sinksPerTest map[string]runtests.DataSinkReference, ignoreEarlyBoot bool) error {
runResult, err := ffxutil.GetRunResult(testOutDir)
if err != nil {
return err
runArtifactDir := filepath.Join(testOutDir, runResult.ArtifactDir)
seen := make(map[string]struct{})
startTime := clock.Now(ctx)
// The runResult's artifacts should contain a directory with the profiles from
// component v2 tests along with a summary.json that lists the data sinks per test.
// It should also contain a second directory with early boot data sinks.
for artifact := range runResult.Artifacts {
artifactPath := filepath.Join(runArtifactDir, artifact)
if err := t.getSinksFromArtifactDir(ctx, artifactPath, sinksPerTest, seen, ignoreEarlyBoot); err != nil {
return err
copyDuration := clock.Now(ctx).Sub(startTime)
if len(seen) > 0 {
logger.Debugf(ctx, "copied %d data sinks in %s", len(seen), copyDuration)
return nil
func (t *FFXTester) getSinksFromArtifactDir(ctx context.Context, artifactDir string, sinksPerTest map[string]runtests.DataSinkReference, seen map[string]struct{}, ignoreEarlyBoot bool) error {
summaryPath := filepath.Join(artifactDir, runtests.TestSummaryFilename)
f, err := os.Open(summaryPath)
if os.IsNotExist(err) {
if ignoreEarlyBoot {
return nil
return t.getEarlyBootSinks(ctx, artifactDir, sinksPerTest, seen)
if err != nil {
return err
defer f.Close()
var summary runtests.TestSummary
if err = json.NewDecoder(f).Decode(&summary); err != nil {
return fmt.Errorf("failed to read test summary from %q: %w", summaryPath, err)
return t.getSinksPerTest(artifactDir, summary, sinksPerTest, seen)
// getSinksPerTest moves sinks from sinkDir to the localOutputDir and records
// the sinks in sinksPerTest.
func (t *FFXTester) getSinksPerTest(sinkDir string, summary runtests.TestSummary, sinksPerTest map[string]runtests.DataSinkReference, seen map[string]struct{}) error {
for _, details := range summary.Tests {
for _, sinks := range details.DataSinks {
for _, sink := range sinks {
if _, ok := seen[sink.File]; !ok {
newPath := filepath.Join(t.localOutputDir, "v2", sink.File)
if err := os.MkdirAll(filepath.Dir(newPath), os.ModePerm); err != nil {
return err
if err := os.Rename(filepath.Join(sinkDir, sink.File), newPath); err != nil {
return err
seen[sink.File] = struct{}{}
sinksPerTest[details.Name] = runtests.DataSinkReference{Sinks: details.DataSinks}
return nil
// getEarlyBootSinks moves the early boot sinks to the localOutputDir and records it with
// an "early_boot_sinks" test in sinksPerTest.
func (t *FFXTester) getEarlyBootSinks(ctx context.Context, sinkDir string, sinksPerTest map[string]runtests.DataSinkReference, seen map[string]struct{}) error {
return filepath.WalkDir(sinkDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
// TODO( Remove hardcoded check for logs once they
// are moved to a separate location.
if d.IsDir() || filepath.Ext(path) == ".log" {
return nil
// Record the file as an early boot profile.
sinkFile, err := filepath.Rel(sinkDir, path)
if err != nil {
return err
if _, ok := seen[path]; !ok {
newPath := filepath.Join(t.localOutputDir, "v2", sinkFile)
if err := os.MkdirAll(filepath.Dir(newPath), os.ModePerm); err != nil {
return err
if err := os.Rename(path, newPath); err != nil {
return err
seen[path] = struct{}{}
earlyBootSinks, ok := sinksPerTest[earlyBootSinksTestName]
if !ok {
earlyBootSinks = runtests.DataSinkReference{Sinks: runtests.DataSinkMap{}}
// The directory under sinkDir is named after the type of sinks it contains.
sinkType := strings.Split(filepath.ToSlash(sinkFile), "/")[0]
if _, ok := earlyBootSinks.Sinks[sinkType]; !ok {
earlyBootSinks.Sinks[sinkType] = []runtests.DataSink{}
earlyBootSinks.Sinks[sinkType] = append(earlyBootSinks.Sinks[sinkType], runtests.DataSink{Name: sinkFile, File: sinkFile})
sinksPerTest[earlyBootSinksTestName] = earlyBootSinks
return nil
func (t *FFXTester) RunSnapshot(ctx context.Context, snapshotFile string) error {
if snapshotFile == "" {
return nil
startTime := clock.Now(ctx)
err := retry.Retry(ctx, retry.WithMaxAttempts(retry.NewConstantBackoff(time.Second), maxReconnectAttempts), func() error {
return t.ffx.Snapshot(ctx, t.localOutputDir, snapshotFile)
}, nil)
if err != nil {
logger.Errorf(ctx, "%s: %s", constants.FailedToRunSnapshotMsg, err)
logger.Debugf(ctx, "ran snapshot in %s", clock.Now(ctx).Sub(startTime))
return err
func sshToTarget(ctx context.Context, addr net.IPAddr, sshKeyFile, connName string) (*sshutil.Client, error) {
key, err := os.ReadFile(sshKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to read SSH key file: %w", err)
config, err := sshutil.DefaultSSHConfig(key)
if err != nil {
return nil, fmt.Errorf("failed to create an SSH client config: %w", err)
return sshutil.NewNamedClient(
Addr: &net.TCPAddr{
IP: addr.IP,
Port: sshutil.SSHPort,
Zone: addr.Zone,
// FuchsiaSSHTester executes fuchsia tests over an SSH connection.
type FuchsiaSSHTester struct {
client sshClient
copier dataSinkCopier
localOutputDir string
serialSocket serialClient
ignoreEarlyBoot bool
testRuns map[string]sshTestRun
// NewFuchsiaSSHTester returns a FuchsiaSSHTester associated to a fuchsia
// instance of the given nodename and the private key paired with an authorized
// one.
func NewFuchsiaSSHTester(ctx context.Context, addr net.IPAddr, sshKeyFile, localOutputDir, serialSocketPath string) (Tester, error) {
client, err := sshToTarget(ctx, addr, sshKeyFile, "test")
if err != nil {
return nil, fmt.Errorf("failed to establish an SSH connection: %w", err)
copier, err := runtests.NewDataSinkCopier(client)
if err != nil {
return nil, err
return &FuchsiaSSHTester{
client: client,
copier: copier,
localOutputDir: localOutputDir,
serialSocket: &serialSocket{serialSocketPath},
testRuns: make(map[string]sshTestRun),
}, nil
func (t *FuchsiaSSHTester) reconnect(ctx context.Context) error {
if err := t.client.ReconnectWithBackoff(ctx, retry.WithMaxDuration(retry.NewConstantBackoff(time.Second), 10*time.Second)); err != nil {
if err := t.serialSocket.runDiagnostics(ctx); err != nil {
logger.Warningf(ctx, "failed to run serial diagnostics: %s", err)
return fmt.Errorf("failed to reestablish SSH connection: %w", err)
if err := t.copier.Reconnect(); err != nil {
return fmt.Errorf("failed to reconnect data sink copier: %w", err)
return nil
// sshExitError is an interface that ssh.ExitError conforms to. We use this for
// testability instead of unwrapping an error as an ssh.ExitError, because it's
// not possible to construct an ssh.ExitError in-memory in a test due to private
// field constraints.
type sshExitError interface {
ExitStatus() int
// Statically assert that ssh.ExitError implements the sshExitError interface.
var _ sshExitError = &ssh.ExitError{}
func (t *FuchsiaSSHTester) isTimeoutError(test testsharder.Test, err error) bool {
if test.Timeout <= 0 {
return false
var exitErr sshExitError
if errors.As(err, &exitErr) {
return exitErr.ExitStatus() == timeoutExitCode
return false
func (t *FuchsiaSSHTester) runSSHCommand(ctx context.Context, command []string, stdout, stderr io.Writer) error {
select {
case <-t.client.DisconnectionListener():
if err := t.Reconnect(ctx); err != nil {
return connectionError{err}
// The client is still connected, so continue to run the command.
cmdErr := t.client.Run(ctx, command, stdout, stderr)
if sshutil.IsConnectionError(cmdErr) {
return connectionError{cmdErr}
return cmdErr
func (t *FuchsiaSSHTester) Reconnect(ctx context.Context) error {
if err := t.reconnect(ctx); err != nil {
return fmt.Errorf("%s: %s", constants.FailedToReconnectMsg, err)
return nil
type sshTestRun struct {
targetOutDir string
totalDuration time.Duration
func (t *FuchsiaSSHTester) setTestRun(test testsharder.Test, targetOutDir string, duration time.Duration) {
t.testRuns[test.PackageURL] = sshTestRun{targetOutDir, duration}
func (t *FuchsiaSSHTester) getTestRun(test testsharder.Test) (string, time.Duration) {
run, ok := t.testRuns[test.PackageURL]
if !ok {
return "", 0
return run.targetOutDir, run.totalDuration
// Test runs a test over SSH.
func (t *FuchsiaSSHTester) Test(ctx context.Context, test testsharder.Test, stdout io.Writer, stderr io.Writer, outDir string) (*TestResult, error) {
testResult := BaseTestResultFromTest(test)
command, err := commandForTest(&test, false, test.Timeout)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
// Set the output directory to retrieve test outputs.
targetOutDir := fmt.Sprintf("/tmp/%s", strings.ReplaceAll(test.Name, "/", "_"))
// Setting an output directory adds latency to test execution for coverage builds
// which produce a lot of output files. Since we will collect the profiles at
// the end anyway, don't use --deprecated-output-directory for coverage builders.
setOutputDir := !strings.Contains(os.Getenv("BUILDER_NAME"), "coverage")
if setOutputDir {
command = append(command, "--deprecated-output-directory", targetOutDir)
startTime := clock.Now(ctx)
testErr := t.runSSHCommand(ctx, command, stdout, stderr)
if setOutputDir {
t.setTestRun(test, targetOutDir, clock.Now(ctx).Sub(startTime))
if t.isTimeoutError(test, testErr) {
testResult.Result = runtests.TestAborted
} else if testErr != nil {
testResult.FailReason = testErr.Error()
} else {
testResult.Result = runtests.TestSuccess
return testResult, testErr
func (t *FuchsiaSSHTester) ProcessResult(ctx context.Context, test testsharder.Test, outDir string, testResult *TestResult, testErr error) (*TestResult, error) {
targetOutDir, totalDuration := t.getTestRun(test)
// Collect test outputs. This is a best effort. If any of the following steps
// to process the outputs fail, just log the failure but don't fail the test.
if targetOutDir != "" {
outputs, err := t.copier.GetAllDataSinks(targetOutDir)
if err != nil {
logger.Debugf(ctx, "failed to get test outputs: %s", err)
if len(outputs) > 0 && outDir != "" {
// If there were test outputs, copy them to the outDir to be recorded as OutputFiles.
// As this code will be deprecated once we migrate to ffx, we reuse the existing
// sink structure as a temporary hack even though these are just arbitrary
// output files, so the sink type does not matter.
sinkMap := runtests.DataSinkMap{"outputfile": []runtests.DataSink{}}
for _, output := range outputs {
sinkMap["outputfile"] = append(sinkMap["outputfile"], output)
sinkRef := runtests.DataSinkReference{Sinks: sinkMap, RemoteDir: targetOutDir}
if err := t.copySinks(ctx, []runtests.DataSinkReference{sinkRef}, outDir); err != nil {
logger.Debugf(ctx, "failed to copy test outputs to host: %s", err)
if err := t.copier.RemoveAll(targetOutDir); err != nil {
logger.Debugf(ctx, "failed to remove test outputs: %s", err)
// Using --deprecated-output-directory should produce outputs following the same
// schema as `ffx test` outputs, so we can process them like ffx test outputs.
runResult, err := ffxutil.GetRunResult(outDir)
if err != nil {
logger.Debugf(ctx, "failed to get run result: %s", err)
} else if runResult != nil {
if result, err := processTestResult(runResult, test, totalDuration, true); err != nil {
// Log the error and continue to construct the test result
// without the run_summary.json in the outputs.
logger.Debugf(ctx, "failed to process run result: %s", err)
} else {
// If there was no processing error, return the test result
// constructed from the run_summary.json in the outputs.
testResult = result
if isConnectionError(testErr) {
return testResult, testErr
return testResult, nil
// IgnoreEarlyBoot should be called from the FFXTester if it uses
// the FuchsiaSSHTester to run tests but wants to collect early
// boot profiles separately using ffx instead of getting it using
// the FuchsiaSSHTester's EnsureSinks() method.
func (t *FuchsiaSSHTester) IgnoreEarlyBoot() {
t.ignoreEarlyBoot = true
func (t *FuchsiaSSHTester) EnsureSinks(ctx context.Context, sinkRefs []runtests.DataSinkReference, outputs *TestOutputs) error {
// Collect v2 references.
var v2Sinks map[string]runtests.DataSinkReference
var err error
if err := t.runCopierWithRetry(ctx, func() error {
v2Sinks, err = t.copier.GetReferences(dataOutputDirV2)
return err
}); err != nil {
// If we fail to get v2 sinks, just log the error but continue to copy v1 sinks.
logger.Debugf(ctx, "failed to determine data sinks for v2 tests: %s", err)
var v2SinkRefs []runtests.DataSinkReference
for _, ref := range v2Sinks {
v2SinkRefs = append(v2SinkRefs, ref)
if len(v2SinkRefs) > 0 {
if err := t.copySinks(ctx, v2SinkRefs, filepath.Join(t.localOutputDir, "v2")); err != nil {
return err
outputs.updateDataSinks(v2Sinks, "v2")
if t.ignoreEarlyBoot {
return t.copySinks(ctx, sinkRefs, t.localOutputDir)
// Collect early boot coverage.
var earlyBootSinks []runtests.DataSink
if err := t.runCopierWithRetry(ctx, func() error {
earlyBootSinks, err = t.copier.GetAllDataSinks(dataOutputDirEarlyBoot)
return err
}); err != nil {
logger.Debugf(ctx, "failed to determine early boot data sinks: %s", err)
if len(earlyBootSinks) > 0 {
// If there were early boot sinks, record the "early_boot_sinks" test in the outputs
// so that the test result can be updated with the early boot sinks.
earlyBootSinksTest := &TestResult{
Name: earlyBootSinksTestName,
Result: runtests.TestSuccess,
outputs.Record(ctx, *earlyBootSinksTest)
// The directory under dataOutputDirEarlyBoot is named after the type of sinks it contains.
sinkMap := runtests.DataSinkMap{}
for _, sink := range earlyBootSinks {
sinkType := strings.Split(filepath.ToSlash(sink.File), "/")[0]
if _, ok := sinkMap[sinkType]; !ok {
sinkMap[sinkType] = []runtests.DataSink{}
sinkMap[sinkType] = append(sinkMap[sinkType], sink)
earlyBootSinkRef := runtests.DataSinkReference{Sinks: sinkMap, RemoteDir: dataOutputDirEarlyBoot}
if err := t.copySinks(ctx, []runtests.DataSinkReference{earlyBootSinkRef}, filepath.Join(t.localOutputDir, "early-boot")); err != nil {
return err
outputs.updateDataSinks(map[string]runtests.DataSinkReference{earlyBootSinksTestName: earlyBootSinkRef}, "early-boot")
return t.copySinks(ctx, sinkRefs, t.localOutputDir)
func (t *FuchsiaSSHTester) runCopierWithRetry(ctx context.Context, copierFunc func() error) error {
return retryOnConnectionFailure(ctx, t, func() error {
err := copierFunc()
if errors.Is(err, sftp.ErrSSHFxConnectionLost) {
logger.Warningf(ctx, "connection lost while getting data sinks: %s", err)
return connectionError{err}
return err
func (t *FuchsiaSSHTester) copySinks(ctx context.Context, sinkRefs []runtests.DataSinkReference, localOutputDir string) error {
if err := t.runCopierWithRetry(ctx, func() error {
startTime := clock.Now(ctx)
// Copy() is assumed to be idempotent and thus safe to retry, which is
// the case for the SFTP-based data sink copier.
sinkMap, err := t.copier.Copy(sinkRefs, localOutputDir)
if err != nil {
return err
copyDuration := clock.Now(ctx).Sub(startTime)
sinkRef := runtests.DataSinkReference{Sinks: sinkMap}
numSinks := sinkRef.Size()
if numSinks > 0 {
logger.Debugf(ctx, "copied %d data sinks in %s", numSinks, copyDuration)
return nil
}); err != nil {
return fmt.Errorf("failed to copy data sinks off target: %w", err)
return nil
// RunSnapshot runs `snapshot` on the device.
func (t *FuchsiaSSHTester) RunSnapshot(ctx context.Context, snapshotFile string) error {
if snapshotFile == "" {
return nil
startTime := clock.Now(ctx)
snapshotOutFile, err := osmisc.CreateFile(filepath.Join(t.localOutputDir, snapshotFile))
if err != nil {
return fmt.Errorf("failed to create snapshot output file: %w", err)
defer snapshotOutFile.Close()
if err := retryOnConnectionFailure(ctx, t, func() error {
return t.runSSHCommand(ctx, []string{"/bin/snapshot"}, snapshotOutFile, os.Stderr)
}); err != nil {
logger.Errorf(ctx, "%s: %s", constants.FailedToRunSnapshotMsg, err)
return err
logger.Debugf(ctx, "ran snapshot in %s", clock.Now(ctx).Sub(startTime))
return nil
// Close terminates the underlying SSH connection. The object is no longer
// usable after calling this method.
func (t *FuchsiaSSHTester) Close() error {
defer t.client.Close()
return t.copier.Close()
// for testability
type socketConn interface {
SetIOTimeout(timeout time.Duration)
// FuchsiaSerialTester executes fuchsia tests over serial.
type FuchsiaSerialTester struct {
socket socketConn
// NewFuchsiaSerialTester creates a tester that runs tests over serial.
func NewFuchsiaSerialTester(ctx context.Context, serialSocketPath string) (Tester, error) {
socket, err := serial.NewSocket(ctx, serialSocketPath)
if err != nil {
return nil, err
return &FuchsiaSerialTester{socket: socket}, nil
// Exposed for testability.
var newTestStartedContext = func(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx, testStartedTimeout)
// lastWriteSaver is an io.Writer that saves the bytes written in the last Write().
type lastWriteSaver struct {
buf []byte
func (w *lastWriteSaver) Write(p []byte) (int, error) {
w.buf = make([]byte, len(p))
copy(w.buf, p)
return len(p), nil
// parseOutKernelReader is an io.Reader that reads from the underlying reader
// everything not pertaining to a kernel log. A kernel log is distinguished by
// a line that starts with the timestamp represented as a float inside brackets.
type parseOutKernelReader struct {
ctx context.Context
reader io.Reader
// unprocessed stores the last characters read from a Read() but not returned
// by it. This could happen if we read more than necessary to try to complete
// a possible kernel log and cannot return all of the bytes. This will be
// read in the next call to Read().
unprocessed []byte
// kernelLineStart stores the last characters read from a Read() block if it
// ended with a truncated line and possibly contains a kernel log. This will
// be prepended to the next Read() block.
kernelLineStart []byte
reachedEOF bool
func (r *parseOutKernelReader) Read(buf []byte) (int, error) {
// If the underlying reader already reached EOF, that means kernelLineStart is
// not the start of a kernel log, so append it to unprocessed to be read normally.
if r.reachedEOF {
r.unprocessed = append(r.unprocessed, r.kernelLineStart...)
r.kernelLineStart = []byte{}
// If there are any unprocessed bytes, read them first instead of calling the
// underlying reader's Read() again.
if len(r.unprocessed) > 0 {
bytesToRead := int(math.Min(float64(len(buf)), float64(len(r.unprocessed))))
copy(buf, r.unprocessed[:bytesToRead])
r.unprocessed = r.unprocessed[bytesToRead:]
return bytesToRead, nil
} else if r.reachedEOF {
// r.unprocessed was empty so we can just return EOF.
return 0, io.EOF
if r.ctx.Err() != nil {
return 0, r.ctx.Err()
b := make([]byte, len(buf))
type readResult struct {
n int
err error
ch := make(chan readResult, 1)
// Call the underlying reader's Read() in a goroutine so that we can
// break out if the context is canceled.
go func() {
readN, readErr := r.reader.Read(b)
ch <- readResult{readN, readErr}
var n int
var err error
select {
case res := <-ch:
n = res.n
err = res.err
case <-r.ctx.Done():
err = r.ctx.Err()
if err != nil && err != io.EOF {
return n, err
// readBlock contains everything stored in kernelLineStart (bytes last read
// from the underlying reader in the previous Read() call that possibly contain
// a truncated kernel log that has not been processed by this reader yet) along
// with the new bytes just read. Because readBlock contains unprocessed bytes,
// its length will likely be greater than len(buf).
// However, it is necessary to read more bytes in the case that the unprocessed
// bytes contain a long truncated kernel log and we need to keep reading more
// bytes until we get to the end of the line so we can discard it.
readBlock := append(r.kernelLineStart, b[:n]...)
r.kernelLineStart = []byte{}
lines := bytes.Split(readBlock, []byte("\n"))
var bytesRead, bytesLeftToRead int
for i, line := range lines {
bytesLeftToRead = len(buf) - bytesRead
isTruncated := i == len(lines)-1
line = r.lineWithoutKernelLog(line, isTruncated)
if bytesLeftToRead == 0 {
// If there are no more bytes left to read, store the rest of the lines
// into r.unprocessed to be read at the next call to Read().
r.unprocessed = append(r.unprocessed, line...)
if len(line) > bytesLeftToRead {
// If the line is longer than bytesLeftToRead, read as much as possible
// and store the rest in r.unprocessed.
copy(buf[bytesRead:], line[:bytesLeftToRead])
r.unprocessed = line[bytesLeftToRead:]
bytesRead += bytesLeftToRead
} else {
copy(buf[bytesRead:bytesRead+len(line)], line)
bytesRead += len(line)
if err == io.EOF {
r.reachedEOF = true
if len(r.unprocessed)+len(r.kernelLineStart) > 0 {
err = nil
return bytesRead, err
func (r *parseOutKernelReader) lineWithoutKernelLog(line []byte, isTruncated bool) []byte {
containsKernelLog := false
re := regexp.MustCompile(`\[[0-9]+\.?[0-9]+\]`)
match := re.FindIndex(line)
if match != nil {
if isTruncated {
r.kernelLineStart = line[match[0]:]
// The new line to add to bytes read contains everything in the line up to
// the bracket indicating the kernel log.
line = line[:match[0]]
containsKernelLog = true
} else if isTruncated {
// Match the beginning of a possible kernel log timestamp.
// i.e. `[`, `[123` `[123.4`
re = regexp.MustCompile(`\[[0-9]*\.?[0-9]*$`)
match = re.FindIndex(line)
if match != nil {
r.kernelLineStart = line[match[0]:]
line = line[:match[0]]
if !containsKernelLog && !isTruncated {
line = append(line, '\n')
return line
func (t *FuchsiaSerialTester) Test(ctx context.Context, test testsharder.Test, stdout, _ io.Writer, _ string) (*TestResult, error) {
testResult := BaseTestResultFromTest(test)
command, err := commandForTest(&test, true, test.Timeout)
if err != nil {
testResult.FailReason = err.Error()
return testResult, nil
logger.Debugf(ctx, "starting: %s", command)
// TODO( Currently, serial output is coming out jumbled,
// so the started string sometimes comes after the completed string, resulting
// in a timeout because we fail to read the completed string after the
// started string. Uncomment below to use the lastWriteSaver once the bug is
// fixed.
var lastWrite bytes.Buffer
// If a single read from the socket includes both the bytes that indicate the test started and the bytes
// that indicate the test completed, then the startedReader will consume the bytes needed for detecting
// completion. Thus we save the last read from the socket and replay it when searching for completion.
// lastWrite := &lastWriteSaver{}
reader := io.TeeReader(t.socket, &lastWrite)
commandStarted := false
var readErr error
for i := 0; i < startSerialCommandMaxAttempts; i++ {
if err := serial.RunCommands(ctx, t.socket, []serial.Command{{Cmd: command}}); err != nil {
return nil, fmt.Errorf("failed to write to serial socket: %w", err)
startedCtx, cancel := newTestStartedContext(ctx)
startedStr := runtests.StartedSignature + test.Name
_, readErr = iomisc.ReadUntilMatchString(startedCtx, reader, startedStr)
if readErr == nil {
commandStarted = true
} else if errors.Is(readErr, startedCtx.Err()) {
logger.Warningf(ctx, "test not started after timeout")
} else {
logger.Errorf(ctx, "unexpected error checking for test start signature: %s", readErr)
if !commandStarted {
err = fmt.Errorf("%s within %d attempts: %w",
constants.FailedToStartSerialTestMsg, startSerialCommandMaxAttempts, readErr)
// In practice, repeated failure to run a test means that the device has
// become unresponsive and we won't have any luck running later tests.
return nil, err
t.socket.SetIOTimeout(test.Timeout + 30*time.Second)
testOutputReader := io.TeeReader(
// See comment above lastWrite declaration.
&parseOutKernelReader{ctx: ctx, reader: io.MultiReader(&lastWrite, t.socket)},
// Writes to stdout as it reads from the above reader.
if success, err := runtests.TestPassed(ctx, testOutputReader, test.Name); err != nil {
testResult.FailReason = err.Error()
return testResult, nil
} else if !success {
if errors.Is(err, io.EOF) {
// EOF indicates that serial has become disconnected. That is
// unlikely to be caused by this test and we're unlikely to be able
// to keep running tests.
return nil, err
testResult.FailReason = "test failed"
return testResult, nil
testResult.Result = runtests.TestSuccess
return testResult, nil
func (t *FuchsiaSerialTester) ProcessResult(ctx context.Context, test testsharder.Test, outDir string, testResult *TestResult, err error) (*TestResult, error) {
return testResult, err
func (t *FuchsiaSerialTester) EnsureSinks(_ context.Context, _ []runtests.DataSinkReference, _ *TestOutputs) error {
return nil
func (t *FuchsiaSerialTester) RunSnapshot(_ context.Context, _ string) error {
return nil
func (t *FuchsiaSerialTester) Reconnect(_ context.Context) error {
return nil
// Close terminates the underlying Serial socket connection. The object is no
// longer usable after calling this method.
func (t *FuchsiaSerialTester) Close() error {
return t.socket.Close()
func commandForTest(test *testsharder.Test, useSerial bool, timeout time.Duration) ([]string, error) {
command := []string{}
if useSerial {
// `runtests` is used to run tests over serial.
command = []string{runtestsName}
if timeout > 0 {
command = append(command, "-i", fmt.Sprintf("%d", int64(timeout.Seconds())))
if test.Path != "" {
command = append(command, test.Path)
} else {
return nil, fmt.Errorf("Path is not set for %q", test.Name)
} else if test.PackageURL != "" {
if test.IsComponentV2() {
command = []string{runTestSuiteName, "--filter-ansi"}
if test.Realm != "" {
command = append(command, "--realm", fmt.Sprintf("%s", test.Realm))
if test.LogSettings.MaxSeverity != "" {
command = append(command, "--max-severity-logs", fmt.Sprintf("%s", test.LogSettings.MaxSeverity))
if test.LogSettings.MinSeverity != "" {
command = append(command, "--min-severity-logs", fmt.Sprintf("%s", test.LogSettings.MinSeverity))
if test.Parallel != 0 {
command = append(command, "--parallel", fmt.Sprintf("%d", test.Parallel))
// TODO( Once fixed, combine timeout flag setting for v1 and v2.
if timeout > 0 {
command = append(command, "--timeout", fmt.Sprintf("%d", int64(timeout.Seconds())))
} else {
return nil, fmt.Errorf("CFv1 tests are no longer supported: %q", test.PackageURL)
command = append(command, test.PackageURL)
} else {
return nil, fmt.Errorf("PackageURL is not set and useSerial is false for %q", test.Name)
return command, nil