blob: fb4781b9283a267c39b54508848eca6fc0068015 [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import os
import sys
import unittest
from io import StringIO
import test_env
from lib import args
from factory_fake import FakeFactory
from process_fake import FakeProcess
class TestCaseWithIO(unittest.TestCase):
# Unit test "constructor" and "destructor"
def setUp(self):
sys.stdin = StringIO()
self._stdout = StringIO()
self._stderr = StringIO()
def tearDown(self):
sys.stdin = sys.__stdin__
# Unit test utilities
def set_input(self, *lines):
sys.stdin.truncate(0)
sys.stdin.seek(0)
for line in lines:
sys.stdin.write(line)
sys.stdin.write('\n')
sys.stdin.flush()
sys.stdin.seek(0)
# Unit test assertions
def _dump_io(self, io):
data = io.getvalue()
io.truncate(0)
io.seek(0)
return data
def _assert_io_equals(self, io, lines, n=-1):
self.assertEqual(self._dump_io(io).split('\n')[:n], list(lines))
def assertOut(self, lines, n=-1):
"""Checks that 'n' lines of stdout match 'lines'.
Calling this method resets stdout. If 'n' is omitted all lines are
checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
resets stdout.
"""
self._assert_io_equals(self._stdout, lines, n)
def assertErr(self, lines, n=-1):
"""Checks that 'n' lines of stderr match 'lines'.
Calling this method resets stdout. If 'n' is omitted all lines are
checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
resets stderr.
"""
self._assert_io_equals(self._stderr, lines, n)
def assertOutContains(self, *strings):
"""Check that all 'strings' are contained in the stdout. Resets stdout."""
output = self._dump_io(self._stdout)
for s in strings:
self.assertIn(s, output)
class TestCaseWithFactory(TestCaseWithIO):
"""TestCase that provides common test context, utilities, and assertions."""
# Unit test "constructor"
def setUp(self):
super(TestCaseWithFactory, self).setUp()
self._factory = None
# Unit test context, as aliases to the Factory.
@property
def factory(self):
"""The associated FakeFactory object."""
if not self._factory:
self._factory = FakeFactory()
self.host.fd_out = self._stdout
self.host.fd_err = self._stderr
return self._factory
@property
def host(self):
"""The associated Host object."""
return self.factory.host
@property
def parser(self):
"""The associated ArgParser object."""
return self.factory.parser
@property
def buildenv(self):
"""The associated BuildEnv object."""
return self.factory.buildenv
@property
def device(self):
"""The associated Device object."""
return self.factory.device
# Unit test utilities
def _ssh_cmd(self, args):
"""Returns the command line arguments for an SSH commaned."""
return ['ssh'] + self.device.ssh_opts() + [self.device.addr] + args
def _scp_cmd(self, args):
return ['scp'] + self.device.ssh_opts() + args
def get_process(self, args, ssh=False):
cmd = self._ssh_cmd(args) if ssh else args
return self.host.create_process(cmd)
def parse_args(self, *args):
return self.parser.parse_args(args)
def set_outputs(
self,
args,
outputs,
start=None,
end=None,
returncode=None,
reset=True,
raw=False,
ssh=False):
"""Sets what will be returned from the stdout and return code of a fake
process.
Providing a start and/or end will schedule the output to be added and/or
removed, respectively, at a later time; see FakeProcess.schedule.
Setting reset to True will replace any existing output for the command.
Setting ssh to True will automatically add the necessary SSH arguments.
If raw is True, the output will be passed as-is (useful for processes
that output JSON).
"""
process = self.get_process(args, ssh=ssh)
if reset:
process.clear()
if not raw:
outputs = '\n'.join(outputs)
process.schedule(outputs, start=start, end=end, returncode=returncode)
def set_running(self, url, refresh=True, duration=None):
"""Marks a packaged executable as running on device.
If refresh is True, this will cause the device to refresh its URLs.
If a duration is provided, the component will stop running
after the given duration.
"""
cmd = [
'/fuchsia_dir/.jiri_root/bin/fx', 'ffx', '--machine', 'json',
'component', 'show', '/core/appmgr/sys'
]
output = {
"url": url,
"resolved": {
"started": {
"elf_runtime": {
"job_id": 123
}
}
}
}
end = None if not duration else self.host.elapsed + duration
self.set_outputs(cmd, output, end=end, reset=False, raw=True)
if refresh:
self.device.v1_component_is_running(url, refresh)
def touch_on_device(
self, pathname, start=None, end=None, reset=False, size=1000):
"""Prepares the 'ls' response for a file and its parent directory."""
parts = pathname.split('/')
dirname = '/'.join(parts[:-1])
output = '-rw-r--r-- 1 0 0 {} Dec 25 12:34 {}'.format(size, parts[-1])
self.set_outputs(
['ls', '-l', dirname], [output],
start=start,
end=end,
reset=reset,
ssh=True)
self.set_outputs(
['ls', '-l', pathname], [output],
start=start,
end=end,
reset=reset,
ssh=True)
def symbolize_cmd(self):
cmd = [self.buildenv.symbolizer_exec]
for build_id_dir in self.buildenv.build_id_dirs:
cmd += ['--build-id-dir', build_id_dir]
return cmd
def infra_testrunner_cmd(self, out_dir, test_file):
cmd = [os.path.join(self.buildenv.build_dir, 'host_x64', 'testrunner')] \
+ ['-out-dir', out_dir] \
+ ['-use-runtests', '-per-test-timeout', '600s'] \
+ [test_file]
return cmd
# Unit test assertions
def assertLogged(self, *logs):
"""Asserts logs were generated by calls to host.echo or host.error."""
self.assertOut(logs)
def assertError(self, expr, *logs):
assert logs, 'Missing error message.'
logs = ['ERROR: {}'.format(logs[0])
] + [' {}'.format(log) for log in logs[1:]]
with self.assertRaises(SystemExit):
expr()
self.assertErr(logs)
def assertRan(self, *args):
"""Asserts a previous call was made to host.create_process."""
self.assertIn(' '.join(args), self.host.processes.keys())
def assertScpTo(self, *args):
"""Asserts a previous call was made to device.scp with args."""
args = list(args)[:-1] + [self.device.scp_rpath(args[-1])]
cmd = self._scp_cmd(args)
self.assertRan(*cmd)
def assertScpFrom(self, *args):
"""Asserts a previous call was made to device.scp with args."""
args = [self.device.scp_rpath(arg) for arg in args[:-1]] + [args[-1]]
cmd = self._scp_cmd(args)
self.assertRan(*cmd)
def assertSsh(self, *args):
"""Asserts a previous call was made to device.ssh with cmd."""
cmd = self._ssh_cmd(list(args))
self.assertRan(*cmd)
class TestCaseWithFuzzer(TestCaseWithFactory):
# Unit test "constructor"
def setUp(self):
super(TestCaseWithFuzzer, self).setUp()
self.create_fuzzer('check', 'fake-package1/fake-target1')
def tearDown(self):
super(TestCaseWithFuzzer, self).tearDown()
self._fuzzer = None
# Unit test context.
@property
def fuzzer(self):
"""The most recently created Fuzzer object."""
assert self._fuzzer, 'No fuzzer created.'
return self._fuzzer
@property
def corpus(self):
return self.fuzzer.corpus
@property
def dictionary(self):
return self.fuzzer.dictionary
@property
def ns(self):
return self.fuzzer.ns
# Unit test utilities
def create_fuzzer(self, *args, **kwargs):
resolve = kwargs.pop('resolve', True)
include_tests = kwargs.pop('include_tests', False)
assert not kwargs, 'Unexpected keyword argument(s): {}'.format(kwargs)
args = self.parse_args(*args)
self._fuzzer = self.factory.create_fuzzer(
args, include_tests=include_tests)
if resolve:
self.resolve_fuzzer()
else:
base_cmx = self.ns.base_abspath(
'meta/{}.cmx'.format(self.fuzzer.executable))
cmd = ['test', '-f', base_cmx]
process = self.get_process(cmd, ssh=True)
process.succeeds = False
self.create_log()
return self.fuzzer
def resolve_fuzzer(self):
cmd = ['pkgctl', 'pkg-status', self.fuzzer.package_url]
merkle = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
package_path = '/pkgfs/versions/{}'.format(merkle)
self.set_outputs(
cmd, [
'Package in registered TUF repo: yes (merkle={})'.format(
merkle),
'Package on disk: yes (path={})'.format(package_path)
],
ssh=True)
def create_log(self, start=None, end=None):
self.touch_on_device(
self.ns.data_abspath('fuzz-[0-9].log'),
start=start,
end=end,
reset=True)