blob: 086a060c43f0009512a93c3e304c47c0aa1f6c27 [file] [log] [blame]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import contextlib
import gzip
import io
import json
import os
import re
import shutil
import tempfile
import typing
import unittest
import unittest.mock as mock
from parameterized import parameterized
import args
import environment
import event
import main
import util.command
class TestMainIntegration(unittest.IsolatedAsyncioTestCase):
"""Integration tests for the main entrypoint.
These tests encapsulate several real-world invocations of fx test,
with mocked dependencies.
"""
DEVICE_TESTS_IN_INPUT = 1
HOST_TESTS_IN_INPUT = 3
E2E_TESTS_IN_INPUT = 1
TOTAL_TESTS_IN_INPUT = DEVICE_TESTS_IN_INPUT + HOST_TESTS_IN_INPUT
TOTAL_NON_E2E_TESTS_IN_INPUT = TOTAL_TESTS_IN_INPUT - E2E_TESTS_IN_INPUT
def setUp(self) -> None:
# Set up a Fake fuchsia directory.
self.fuchsia_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.fuchsia_dir.cleanup)
# Set up mocks
self.mocks = []
# Retain the real build dir, if one exists.
real_fuchsia_dir = os.getenv("FUCHSIA_DIR")
# Intercept environment and instantiate a new mock FUCHSIA_DIR.
self.mocks.append(
mock.patch(
"os.environ",
{"FUCHSIA_DIR": self.fuchsia_dir.name},
)
)
for m in self.mocks:
m.start()
self.addCleanup(m.stop)
# Correct for location of the test data files between coverage.py
# script and how tests are run in-tree.
cur_path = os.path.dirname(__file__)
while not os.path.isdir(cur_path):
cur_path = os.path.split(cur_path)[0]
# We use an external program to handle fuzzy matching called "dldist".
# Put the program in the correct location so that the main script can
# find it.
os.makedirs(os.path.join(self.fuchsia_dir.name, "bin"))
dldist_path = os.path.join(self.fuchsia_dir.name, "bin", "dldist")
if os.path.exists(os.path.join(cur_path, "bin", "dldist")):
# This path is used when executing the python_host_test target.
print("Using the local dldist for matching")
shutil.copy(os.path.join(cur_path, "bin", "dldist"), dldist_path)
else:
# This path is used when running coverage.py.
print("Trying to use FUCHSIA_DIR dldist for the test")
assert real_fuchsia_dir is not None
build_dir: str
with open(os.path.join(real_fuchsia_dir, ".fx-build-dir")) as f:
build_dir = os.path.join(real_fuchsia_dir, f.read().strip())
print(build_dir)
assert os.path.isdir(build_dir)
shutil.copy(
os.path.join(build_dir, "host-tools", "dldist"), dldist_path
)
self.test_data_path = os.path.join(cur_path, "test_data/build_output")
self.assertTrue(
os.path.isfile(os.path.join(self.test_data_path, "tests.json")),
f"path was {self.test_data_path} for {__file__}",
)
self.assertTrue(
os.path.isfile(os.path.join(self.test_data_path, "test-list.json")),
f"path was {self.test_data_path} for {__file__}",
)
self.assertTrue(
os.path.isfile(
os.path.join(self.test_data_path, "package-repositories.json")
),
f"path was {self.test_data_path} for {__file__}",
)
with open(
os.path.join(self.fuchsia_dir.name, ".fx-build-dir"), "w"
) as f:
f.write("out/default")
self.out_dir = os.path.join(self.fuchsia_dir.name, "out/default")
os.makedirs(self.out_dir)
for name in [
"tests.json",
"test-list.json",
"package-repositories.json",
"package-targets.json",
]:
shutil.copy(
os.path.join(self.test_data_path, name),
os.path.join(self.out_dir, name),
)
self._mock_get_device_environment(
environment.DeviceEnvironment(
"localhost", "8080", "foo", "/foo.key"
)
)
return super().setUp()
def _mock_run_commands_in_parallel(self, stdout: str) -> mock.MagicMock:
m = mock.AsyncMock(
return_value=[mock.MagicMock(stdout=stdout, return_code=0)]
)
patch = mock.patch("main.run_commands_in_parallel", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_run_command(self, return_code: int) -> mock.MagicMock:
m = mock.AsyncMock(
return_value=mock.MagicMock(
return_code=return_code, stdout="", stderr="", was_timeout=False
)
)
patch = mock.patch("main.execution.run_command", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_subprocess_call(self, value: int) -> mock.MagicMock:
m = mock.MagicMock(return_value=value)
patch = mock.patch("main.subprocess.call", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_has_device_connected(self, value: bool) -> None:
m = mock.AsyncMock(return_value=value)
patch = mock.patch("main.has_device_connected", m)
patch.start()
self.addCleanup(patch.stop)
def _mock_has_tests_in_base(self, test_packages: list[str]) -> None:
with open(os.path.join(self.out_dir, "base_packages.list"), "w") as f:
json.dump(
{
"content": {
"manifests": test_packages,
}
},
f,
)
def _make_call_args_prefix_set(
self, call_list: mock._CallList
) -> set[tuple[str, ...]]:
"""Given a list of mock calls, turn them into a set of prefixes for comparison.
For instance, if the mock call is ("fx", "run", "command") the output
is: {
('fx',),
('fx', 'run'),
('fx', 'run', 'command'),
}
This can be used to check containment.
Args:
call_list (mock._CallList): Calls to process.
Returns:
set[list[typing.Any]]: Set of prefixes to calls.
"""
ret: set[tuple[str, ...]] = set()
for call in call_list:
args, _ = call
cur = []
if args and isinstance(args[0], list):
# Correct for subprocess.call using lists and not *args.
args = args[0]
for a in args:
cur.append(a)
ret.add(tuple(cur))
return ret
def _mock_get_device_environment(
self, env: environment.DeviceEnvironment
) -> mock.MagicMock:
m = mock.AsyncMock(return_value=env)
patch = mock.patch(
"main.execution.get_device_environment_from_exec_env", m
)
patch.start()
self.addCleanup(patch.stop)
return m
def assertIsSubset(
self, subset: set[typing.Any], full: set[typing.Any]
) -> None:
inter = full.intersection(subset)
self.assertEqual(
inter, subset, f"Full set was\n {self.prettyFormatPrefixes(full)}"
)
def prettyFormatPrefixes(self, vals: set[typing.Any]) -> str:
return "\n ".join(map(lambda x: " ".join(x), sorted(vals)))
async def test_dry_run(self) -> None:
"""Test a basic dry run of the command."""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry"]), recorder=recorder
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(
len(selection_event.selected), self.TOTAL_TESTS_IN_INPUT
)
async def test_fuzzy_dry_run(self) -> None:
"""Test a dry run of the command for fuzzy matching"""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry", "--fuzzy=1", "foo_test"]),
recorder=recorder,
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(len(selection_event.selected), 1)
@parameterized.expand( # type: ignore[misc]
[("--host", HOST_TESTS_IN_INPUT), ("--device", DEVICE_TESTS_IN_INPUT)]
)
async def test_selection_flags(
self, flag_name: str, expected_count: int
) -> None:
"""Test that the correct --device or --host tests are selected"""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry"] + [flag_name]),
recorder=recorder,
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(len(selection_event.selected), expected_count)
@parameterized.expand( # type: ignore[misc]
[
("--use-package-hash", DEVICE_TESTS_IN_INPUT),
("--no-use-package-hash", 0),
]
)
async def test_use_package_hash(
self, flag_name: str, expected_hash_matches: int
) -> None:
"""Test ?hash= is used only when --use-package-hash is set"""
command_mock = self._mock_run_command(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"] + [flag_name])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertIsSubset(
{
("fx", "ffx", "test", "run"),
},
call_prefixes,
)
hash_params_found: int = 0
for prefix_list in call_prefixes:
entry: str
for entry in prefix_list:
if "?hash=" in entry:
hash_params_found += 1
self.assertEqual(
hash_params_found,
expected_hash_matches,
f"Prefixes were\n{self.prettyFormatPrefixes(call_prefixes)}",
)
@parameterized.expand( # type: ignore[misc]
[
("default suggestions", [], 6),
("custom suggestion count", ["--suggestion-count=10"], 10),
("suppress suggestions", ["--no-show-suggestions"], 0),
]
)
async def test_suggestions(
self,
_unused_name: str,
extra_flags: list[str],
expected_suggestion_count: int,
) -> None:
"""Test that targets are suggested when there are no test matches."""
mocked_commands = self._mock_run_commands_in_parallel("No matches")
ret = await main.async_main_wrapper(
args.parse_args(
["--simple", "non_existent_test_does_not_match"] + extra_flags
)
)
self.assertEqual(ret, 1)
if expected_suggestion_count > 0:
self.assertListEqual(
mocked_commands.call_args[0][0],
[
[
"fx",
"search-tests",
f"--max-results={expected_suggestion_count}",
"--no-color",
"non_existent_test_does_not_match",
]
],
)
else:
self.assertListEqual(mocked_commands.call_args_list, [])
# TODO(b/295340412): Test that suggestions are suppressed.
async def test_full_success(self) -> None:
"""Test that we can run all tests and report success"""
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(args.parse_args(["--simple"]))
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
# Make sure we built, published, and ran the device test.
self.assertIsSubset(
{
(
"fx",
"build",
"src/sys:foo_test_package",
"--host",
"//src/sys:bar_test",
),
("fx", "ffx", "repository", "publish"),
("fx", "ffx", "test", "run"),
},
call_prefixes,
)
# Make sure we ran the host tests.
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))
async def test_no_build(self) -> None:
"""Test that we can run all tests and report success"""
command_mock = self._mock_run_command(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertFalse(("fx", "build") in call_prefixes)
self.assertFalse(
("fx", "ffx", "repository", "publish") in call_prefixes
)
self.assertIsSubset(
{
("fx", "ffx", "test", "run"),
},
call_prefixes,
)
# Make sure we ran the host test.
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))
async def test_first_failure(self) -> None:
"""Test that one failing test aborts the rest with --fail"""
command_mock = self._mock_run_command(1)
command_mock.side_effect = [
util.command.CommandOutput("out", "err", 1, 10, None),
util.command.CommandOutput("out", "err", 1, 10, None),
util.command.CommandOutput("out", "err", 1, 10, None),
]
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build", "--fail"])
)
# bar_test and baz_test are not hermetic, so cannot run at the same time.
# One of them will run before the other, which means --fail
# prevents one of them from starting, and we expect to see
# only bar_test (since baz_test is defined later in the file)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertEqual(ret, 1)
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertFalse(any(["baz_test" in v[0] for v in call_prefixes]))
async def test_count(self) -> None:
"""Test that we can re-run a test multiple times with --count"""
command_mock = self._mock_run_command(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
# Run each test 3 times, no parallel to better match behavior of failure case test.
ret = await main.async_main_wrapper(
args.parse_args(
["--simple", "--no-build", "--count=3", "--parallel=1"]
)
)
self.assertEqual(ret, 0)
self.assertEqual(
3,
sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(
[
"foo-test?hash=" in " ".join(v[0])
for v in command_mock.call_args_list
]
),
command_mock.call_args_list,
)
async def test_count_with_timeout(self) -> None:
"""Test that we abort running the rest of the tests in a --count group if a timeout occurs."""
command_mock = self._mock_run_command(1)
command_mock.return_value = util.command.CommandOutput(
"", "", 1, 10, None, was_timeout=True
)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
# Run each test 3 times, no parallel to better match behavior of failure case test.
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"--no-build",
"--count=3",
"--parallel=1",
]
)
)
self.assertEqual(ret, 1)
self.assertEqual(
1,
sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
1,
sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
1,
sum(
[
"foo-test?hash=" in " ".join(v[0])
for v in command_mock.call_args_list
]
),
command_mock.call_args_list,
)
async def test_list_command(self) -> None:
"""Test that we can list test cases using --list"""
command_mock = self._mock_run_commands_in_parallel(
"foo::test\nbar::test",
)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
recorder = event.EventRecorder()
# This only works if the first test is a device test.
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build", "--list", "--limit=1"]),
recorder=recorder,
)
self.assertEqual(ret, 0)
self.assertEqual(command_mock.call_count, 1)
events = [
e.payload.enumerate_test_cases
async for e in recorder.iter()
if e.payload is not None
and e.payload.enumerate_test_cases is not None
]
self.assertEqual(len(events), 1)
self.assertEqual(
events[0].test_case_names,
[
"foo::test",
"bar::test",
],
)
@mock.patch("main.run_build_with_suspended_output", side_effect=[0])
async def test_updateifinbase(self, _build_mock: mock.AsyncMock) -> None:
"""Test that we appropriately update tests in base"""
command_mock = self._mock_run_command(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base(["foo-test"])
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"--no-build",
"--updateifinbase",
"--parallel",
"1",
]
)
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertIsSubset({("fx", "ota", "--no-build")}, call_prefixes)
async def test_print_logs_success(self) -> None:
"""Test that --print-logs searches for logs, can be given a log,
and handles invalid data
"""
env = environment.ExecutionEnvironment.initialize_from_args(
args.parse_args([])
)
assert env.log_file
# Create a sample log with 3 tests running
recorder = event.EventRecorder()
recorder.emit_init()
# Simulate one test suite
test_id = recorder.emit_test_suite_started("foo", hermetic=False)
program_id = recorder.emit_program_start(
"bar", ["abcd"], parent=test_id
)
recorder.emit_program_output(
program_id, "Data", event.ProgramOutputStream.STDOUT
)
recorder.emit_program_termination(program_id, 0)
recorder.emit_test_suite_ended(
test_id,
event.TestSuiteStatus.PASSED,
message=None,
)
test_2 = recorder.emit_test_suite_started(
"//other:test2", hermetic=True
)
test_3 = recorder.emit_test_suite_started(
"//other:test3", hermetic=True
)
program_2 = recorder.emit_program_start(
"test", ["arg", "1"], parent=test_2
)
program_3 = recorder.emit_program_start(
"test", ["arg", "2"], parent=test_3
)
recorder.emit_program_output(
program_3,
"line for test 3",
stream=event.ProgramOutputStream.STDOUT,
)
recorder.emit_program_output(
program_2,
"line for test 2",
stream=event.ProgramOutputStream.STDOUT,
)
recorder.emit_program_termination(program_2, 0)
recorder.emit_program_termination(program_3, 0)
recorder.emit_test_suite_ended(
test_2, event.TestSuiteStatus.FAILED, message=None
)
recorder.emit_test_suite_ended(
test_3, event.TestSuiteStatus.PASSED, message=None
)
recorder.emit_end()
with gzip.open(env.log_file, "wt") as out_file:
async for e in recorder.iter():
json.dump(e.to_dict(), out_file) # type:ignore[attr-defined]
print("", file=out_file)
def assert_print_logs_output(return_code: int, output: str) -> None:
self.assertEqual(return_code, 0, f"Content was:\n{output}")
self.assertIsNotNone(
re.search(r"3 tests were run", output, re.MULTILINE),
f"Did not find substring, content was:\n{output}",
)
# Test finding most recent log file.
output = io.StringIO()
with contextlib.redirect_stdout(output):
return_code = main.do_print_logs(args.parse_args([]))
assert_print_logs_output(return_code, output.getvalue())
# Test finding specific log file.
output = io.StringIO()
new_file_path = os.path.join(env.out_dir, "other-file.json.gz")
shutil.move(env.log_file, new_file_path)
with contextlib.redirect_stdout(output):
return_code = main.do_print_logs(
args.parse_args(["--logpath", new_file_path])
)
self.assertEqual(
return_code, 0, f"Content was:\n{output.getvalue()}"
)
self.assertIsNotNone(
re.search(r"3 tests were run", output.getvalue(), re.MULTILINE),
f"Did not find substring, content was:\n{output.getvalue()}",
)
# Corrupt the data a bit, the output should still work.
with open(new_file_path, "r+b") as f:
# Overwrite last 20 bytes of the file with 0
f.seek(-20, 2)
f.write(b"\0" * 80)
output = io.StringIO()
with contextlib.redirect_stdout(output):
return_code = main.do_print_logs(
args.parse_args(["--logpath", new_file_path])
)
self.assertEqual(
return_code, 0, f"Content was:\n{output.getvalue()}"
)
self.assertIsNotNone(
re.search(r"3 tests were run", output.getvalue(), re.MULTILINE),
f"Did not find substring, content was:\n{output.getvalue()}",
)
async def test_print_logs_failure(self) -> None:
"""Test that --print-logs prints an error and exits if the log cannot be found"""
# Default search location
output = io.StringIO()
with contextlib.redirect_stderr(output):
self.assertEqual(main.do_print_logs(args.parse_args([])), 1)
self.assertIsNotNone(
re.search(r"No log files found", output.getvalue()),
f"Did not find substring, output was:\n{output.getvalue()}",
)
# Specific missing file
output = io.StringIO()
with contextlib.redirect_stderr(output):
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "does-not-exist")
self.assertEqual(
main.do_print_logs(args.parse_args(["--logpath", path])),
1,
)
self.assertIsNotNone(
re.search(r"No log files found", output.getvalue()),
f"Did not find substring, output was:\n{output.getvalue()}",
)
# Specific file is not a gzip file
output = io.StringIO()
with contextlib.redirect_stderr(output):
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "does-not-exist")
with open(path, "w") as f:
f.writelines(["hello world"])
self.assertEqual(
main.do_print_logs(args.parse_args(["--logpath", path])),
1,
)
self.assertIsNotNone(
re.search(
r"File does not appear to be a gzip file",
output.getvalue(),
),
f"Did not find substring, output was:\n{output.getvalue()}",
)
@mock.patch("main.termout.is_valid", return_value=False)
async def test_log_to_stdout(self, _termout_mock: mock.Mock) -> None:
"""Test that we can log everything to stdout, and it parses as JSON lines"""
_command_mock = self._mock_run_command(0)
_subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_device_connected(True)
self._mock_has_tests_in_base([])
output = io.StringIO()
with contextlib.redirect_stdout(output):
ret = await main.async_main_wrapper(
args.parse_args(["--logpath", "-"])
)
self.assertEqual(ret, 0)
for line in output.getvalue().splitlines():
if not line:
continue
try:
json.loads(line)
except json.JSONDecodeError as e:
self.fail(
f"Failed to parse line as JSON.\nLine: {line}\nError: {e}"
)