blob: a62fc1791395fb63f8f8f53f84280a94de5f5fe5 [file] [log] [blame]
#!/usr/bin/env python2.7
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import sys
import unittest
import test_env
import lib.args
from factory_fake import FakeFactory
from process_fake import FakeProcess
from StringIO import StringIO
class TestCaseWithIO(unittest.TestCase):
# Unit test "constructor" and "destructor"
def setUp(self):
sys.stdin = StringIO()
self._stdout = StringIO()
self._stderr = StringIO()
def tearDown(self):
sys.stdin = sys.__stdin__
# Unit test utilities
def set_input(self, *lines):
sys.stdin.truncate(0)
for line in lines:
sys.stdin.write(line)
sys.stdin.write('\n')
sys.stdin.flush()
sys.stdin.seek(0)
# Unit test assertions
def _dump_io(self, io):
io.seek(0)
data = io.read()
io.truncate(0)
return data
def _assert_io_equals(self, io, lines, n=-1):
self.assertEqual(self._dump_io(io).split('\n')[:n], list(lines))
def assertOut(self, lines, n=-1):
"""Checks that 'n' lines of stdout match 'lines'.
Calling this method resets stdout. If 'n' is omitted all lines are
checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
resets stdout.
"""
self._assert_io_equals(self._stdout, lines, n)
def assertErr(self, lines, n=-1):
"""Checks that 'n' lines of stderr match 'lines'.
Calling this method resets stdout. If 'n' is omitted all lines are
checked. If 'n' is 0 and 'lines' is [], it passes trivially, but still
resets stderr.
"""
self._assert_io_equals(self._stderr, lines, n)
def assertOutContains(self, *strings):
"""Check that all 'strings' are contained in the stdout. Resets stdout."""
output = self._dump_io(self._stdout)
for s in strings:
self.assertIn(s, output)
class TestCaseWithFactory(TestCaseWithIO):
"""TestCase that provides common test context, utilities, and assertions."""
# Unit test "constructor"
def setUp(self):
super(TestCaseWithFactory, self).setUp()
self._factory = None
# Unit test context, as aliases to the Factory.
@property
def factory(self):
"""The associated FakeFactory object."""
if not self._factory:
self._factory = FakeFactory()
self.host.fd_out = self._stdout
self.host.fd_err = self._stderr
return self._factory
@property
def host(self):
"""The associated Host object."""
return self.factory.host
@property
def parser(self):
"""The associated ArgParser object."""
return self.factory.parser
@property
def buildenv(self):
"""The associated BuildEnv object."""
return self.factory.buildenv
@property
def device(self):
"""The associated Device object."""
return self.factory.device
# Unit test utilities
def _ssh_cmd(self, args):
"""Returns the command line arguments for an SSH commaned."""
return ['ssh'] + self.device.ssh_opts() + [self.device.addr] + args
def _scp_cmd(self, args):
return ['scp'] + self.device.ssh_opts() + args
def get_process(self, args, ssh=False):
cmd = self._ssh_cmd(args) if ssh else args
return self.host.create_process(cmd)
def parse_args(self, *args):
return self.parser.parse_args(args)
def set_outputs(
self,
args,
outputs,
start=None,
end=None,
returncode=None,
reset=True,
ssh=False):
"""Sets what will be returned from the stdout and return code of a fake
process.
Providing a start and/or end will schedule the output to be added and/or
removed, respectively, at a later time; see FakeProcess.schedule.
Setting reset to True will replace any existing output for the command.
Setting ssh to true will automatically add the necessary SSH arguments.
"""
process = self.get_process(args, ssh=ssh)
if reset:
process.clear()
process.schedule(
'\n'.join(outputs), start=start, end=end, returncode=returncode)
def set_running(self, url, refresh=True, duration=None):
"""Marks a packaged executable as running on device.
If refresh is True, this will cause the device to refresh its URLs.
If a duration is provided, the component will stop running
after the given duration.
"""
cmd = ['cs info']
output = 'URL: {}'.format(url)
end = None if not duration else self.host.elapsed + duration
self.set_outputs(cmd, [output], end=end, reset=False, ssh=True)
if refresh:
self.device.has_cs_info(url, refresh)
def touch_on_device(
self, pathname, start=None, end=None, reset=False, size=1000):
"""Prepares the 'ls' response for a file and its parent directory."""
parts = pathname.split('/')
dirname = '/'.join(parts[:-1])
output = '-rw-r--r-- 1 0 0 {} Dec 25 12:34 {}'.format(size, parts[-1])
self.set_outputs(
['ls', '-l', dirname], [output],
start=start,
end=end,
reset=reset,
ssh=True)
self.set_outputs(
['ls', '-l', pathname], [output],
start=start,
end=end,
reset=reset,
ssh=True)
def symbolize_cmd(self):
cmd = [
self.buildenv.symbolizer_exec, '-llvm-symbolizer',
self.buildenv.llvm_symbolizer
]
for build_id_dir in self.buildenv.build_id_dirs:
cmd += ['-build-id-dir', build_id_dir]
return cmd
def infra_testrunner_cmd(self, out_dir, test_file):
cmd = [os.path.join(self.buildenv.build_dir, 'host_x64', 'testrunner')] \
+ ['-out-dir', out_dir] \
+ ['-use-runtests', '-per-test-timeout', '600s'] \
+ [test_file]
return cmd
# Unit test assertions
def assertLogged(self, *logs):
"""Asserts logs were generated by calls to host.echo or host.error."""
self.assertOut(logs)
def assertError(self, expr, *logs):
assert logs, 'Missing error message.'
logs = ['ERROR: {}'.format(logs[0])
] + [' {}'.format(log) for log in logs[1:]]
with self.assertRaises(SystemExit):
expr()
self.assertErr(logs)
def assertRan(self, *args):
"""Asserts a previous call was made to host.create_process."""
self.assertIn(' '.join(args), self.host.processes.keys())
def assertScpTo(self, *args):
"""Asserts a previous call was made to device.scp with args."""
args = list(args)[:-1] + [self.device.scp_rpath(args[-1])]
cmd = self._scp_cmd(args)
self.assertRan(*cmd)
def assertScpFrom(self, *args):
"""Asserts a previous call was made to device.scp with args."""
args = [self.device.scp_rpath(arg) for arg in args[:-1]] + [args[-1]]
cmd = self._scp_cmd(args)
self.assertRan(*cmd)
def assertSsh(self, *args):
"""Asserts a previous call was made to device.ssh with cmd."""
cmd = self._ssh_cmd(list(args))
self.assertRan(*cmd)
class TestCaseWithFuzzer(TestCaseWithFactory):
# Unit test "constructor"
def setUp(self):
super(TestCaseWithFuzzer, self).setUp()
self.create_fuzzer('check', 'fake-package1/fake-target1')
# Unit test context.
@property
def fuzzer(self):
"""The most recently created Fuzzer object."""
assert self._fuzzer, 'No fuzzer created.'
return self._fuzzer
@property
def corpus(self):
return self.fuzzer.corpus
@property
def dictionary(self):
return self.fuzzer.dictionary
@property
def ns(self):
return self.fuzzer.ns
# Unit test utilities
def create_fuzzer(self, *args, **kwargs):
resolve = kwargs.pop('resolve', True)
include_tests = kwargs.pop('include_tests', False)
assert not kwargs, 'Unexpected keyword argument(s): {}'.format(kwargs)
args = self.parse_args(*args)
self._fuzzer = self.factory.create_fuzzer(
args, include_tests=include_tests)
if resolve:
self.resolve_fuzzer()
else:
base_cmx = self.ns.base_abspath(
'meta/{}.cmx'.format(self.fuzzer.executable))
cmd = ['test', '-f', base_cmx]
process = self.get_process(cmd, ssh=True)
process.succeeds = False
self.create_log()
return self.fuzzer
def resolve_fuzzer(self):
cmd = ['pkgctl', 'pkg-status', self.fuzzer.package_url]
merkle = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
package_path = '/pkgfs/versions/{}'.format(merkle)
self.set_outputs(
cmd, [
'Package in registered TUF repo: yes (merkle={})'.format(
merkle),
'Package on disk: yes (path={})'.format(package_path)
],
ssh=True)
def create_log(self, start=None, end=None):
self.touch_on_device(
self.ns.data_abspath('fuzz-[0-9].log'),
start=start,
end=end,
reset=True)