blob: 5b6b897d87c74c00cd185090345aafdd76f87df7 [file] [log] [blame] [edit]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import contextlib
import gzip
import io
import json
import os
import re
import shutil
import signal
import tempfile
import typing
import unittest
import unittest.mock as mock
import async_utils.command as command
from parameterized import parameterized
import args
import environment
import event
import log
import main
import test_list_file
WARNING_LEVEL = event.MessageLevel.WARNING
class TestMainIntegration(unittest.IsolatedAsyncioTestCase):
"""Integration tests for the main entrypoint.
These tests encapsulate several real-world invocations of fx test,
with mocked dependencies.
"""
DEVICE_TESTS_IN_INPUT = 1
HOST_TESTS_IN_INPUT = 3
E2E_TESTS_IN_INPUT = 1
TOTAL_TESTS_IN_INPUT = DEVICE_TESTS_IN_INPUT + HOST_TESTS_IN_INPUT
TOTAL_NON_E2E_TESTS_IN_INPUT = TOTAL_TESTS_IN_INPUT - E2E_TESTS_IN_INPUT
def setUp(self) -> None:
# Set up a Fake fuchsia directory.
self.fuchsia_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.fuchsia_dir.cleanup)
# Set up mocks
self.mocks = []
# Retain the real build dir, if one exists.
real_fuchsia_dir = os.getenv("FUCHSIA_DIR")
# Intercept environment and instantiate a new mock FUCHSIA_DIR.
self.mocks.append(
mock.patch(
"os.environ",
{"FUCHSIA_DIR": self.fuchsia_dir.name},
)
)
for m in self.mocks:
m.start()
self.addCleanup(m.stop)
# Correct for location of the test data files between coverage.py
# script and how tests are run in-tree.
cur_path = os.path.dirname(__file__)
while not os.path.isdir(cur_path):
cur_path = os.path.split(cur_path)[0]
# We use an external program to handle fuzzy matching called "dldist".
# Put the program in the correct location so that the main script can
# find it.
os.makedirs(os.path.join(self.fuchsia_dir.name, "bin"))
dldist_path = os.path.join(self.fuchsia_dir.name, "bin", "dldist")
if os.path.exists(os.path.join(cur_path, "bin", "dldist")):
# This path is used when executing the python_host_test target.
print("Using the local dldist for matching")
shutil.copy(os.path.join(cur_path, "bin", "dldist"), dldist_path)
else:
# This path is used when running coverage.py.
print("Trying to use FUCHSIA_DIR dldist for the test")
assert real_fuchsia_dir is not None
build_dir: str
with open(os.path.join(real_fuchsia_dir, ".fx-build-dir")) as f:
build_dir = os.path.join(real_fuchsia_dir, f.read().strip())
print(build_dir)
assert os.path.isdir(build_dir)
shutil.copy(
os.path.join(build_dir, "host-tools", "dldist"), dldist_path
)
self.test_data_path = os.path.join(cur_path, "test_data/build_output")
self.assertTrue(
os.path.isfile(os.path.join(self.test_data_path, "tests.json")),
f"path was {self.test_data_path} for {__file__}",
)
self.assertTrue(
os.path.isfile(os.path.join(self.test_data_path, "test-list.json")),
f"path was {self.test_data_path} for {__file__}",
)
self.test_list_input = os.path.join(
self.test_data_path, "test-list.json"
)
self.assertTrue(
os.path.isfile(
os.path.join(self.test_data_path, "package-repositories.json")
),
f"path was {self.test_data_path} for {__file__}",
)
disabled_tests_source_file = os.path.join(
self.test_data_path, "disabled_tests.json"
)
self.assertTrue(
os.path.isfile(disabled_tests_source_file),
f"path was {self.test_data_path} for {__file__}",
)
with open(
os.path.join(self.fuchsia_dir.name, ".fx-build-dir"), "w"
) as f:
f.write("out/default")
self.out_dir = os.path.join(self.fuchsia_dir.name, "out/default")
os.makedirs(self.out_dir)
for name in [
"tests.json",
"test-list.json",
"package-repositories.json",
"package-targets.json",
"all_package_manifests.list",
]:
shutil.copy(
os.path.join(self.test_data_path, name),
os.path.join(self.out_dir, name),
)
self.package_target_file_path = os.path.join(
self.out_dir, "package-targets.json"
)
# disabled_tests.json must be in place for e2e tests to pass.
disabled_tests_dest_path = os.path.join(
self.fuchsia_dir.name, "sdk", "ctf"
)
os.makedirs(disabled_tests_dest_path)
shutil.copy(
disabled_tests_source_file,
os.path.join(disabled_tests_dest_path, "disabled_tests.json"),
)
# Simulate the generated package metadata to test merging.
gen_dir = os.path.join(
self.out_dir, "gen", "build", "images", "updates"
)
os.makedirs(gen_dir)
with open(
os.path.join(
gen_dir, "package_manifests_from_metadata.list.package_metadata"
),
"w",
) as f:
f.writelines(
[
"obj/foo/package_manifest.json",
"obj/bar/package_manifest.json",
"obj/baz/package_manifest.json",
]
)
self._mock_get_device_environment(
environment.DeviceEnvironment(
"localhost", "8080", "foo", "/foo.key"
)
)
# Provide hard-coded predictable test-list.json content rather than
# actually running the generate_test_list program.
self._mock_generate_test_list()
return super().setUp()
def _mock_run_commands_in_parallel(
self, stdout: str, return_code: int = 0
) -> mock.MagicMock:
m = mock.AsyncMock(
return_value=[
mock.MagicMock(stdout=stdout, return_code=return_code)
]
)
patch = mock.patch("main.run_commands_in_parallel", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_run_command(
self,
return_code: int,
async_handler: (
typing.Callable[[typing.Any, typing.Any], typing.Awaitable[None]]
| None
) = None,
) -> mock.MagicMock:
async def handler(
*args: typing.Any, **kwargs: typing.Any
) -> typing.Any:
if async_handler is not None:
await async_handler(*args, **kwargs)
return mock.MagicMock(
return_code=return_code, stdout="", stderr="", was_timeout=False
)
m = mock.AsyncMock(side_effect=handler)
patch = mock.patch("main.execution.run_command", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_generate_test_list(self) -> mock.MagicMock:
test_list_entries = test_list_file.TestListFile.entries_from_file(
self.test_list_input
)
m = mock.AsyncMock(return_value=test_list_entries)
patch = mock.patch("main.AsyncMain._generate_test_list", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_subprocess_call(self, value: int) -> mock.MagicMock:
m = mock.MagicMock(return_value=value)
patch = mock.patch("main.subprocess.call", m)
patch.start()
self.addCleanup(patch.stop)
return m
def _mock_has_package_server_connected_to_device(self, value: bool) -> None:
m = mock.AsyncMock(return_value=value)
patch = mock.patch("main.has_package_server_connected_to_device", m)
patch.start()
self.addCleanup(patch.stop)
def _mock_has_tests_in_base(self, test_packages: list[str]) -> None:
with open(os.path.join(self.out_dir, "base_packages.list"), "w") as f:
json.dump(
{
"content": {
"names": test_packages,
}
},
f,
)
def _make_call_args_prefix_set(
self, call_list: mock._CallList
) -> set[tuple[str, ...]]:
"""Given a list of mock calls, turn them into a set of prefixes for comparison.
For instance, if the mock call is ("fx", "run", "command") the output
is: {
('fx',),
('fx', 'run'),
('fx', 'run', 'command'),
}
This can be used to check containment.
Args:
call_list (mock._CallList): Calls to process.
Returns:
set[list[typing.Any]]: Set of prefixes to calls.
"""
ret: set[tuple[str, ...]] = set()
for call in call_list:
args, _ = call
cur = []
if args and isinstance(args[0], list):
# Correct for subprocess.call using lists and not *args.
args = args[0]
for a in args:
cur.append(a)
ret.add(tuple(cur))
return ret
def _assert_ffx_test_has_args(
self, call_list: mock._CallList, desired_args: list[str]
) -> None:
"""Verifies args were passed to "ffx test".
Given a list of mock calls, verifies it includes a call to
"ffx test run" which includes the given sequence of args."""
for call in call_list:
try:
call_args = list(call.args)
ffx_pos = call_args.index("ffx")
except:
continue
if call_args[ffx_pos : ffx_pos + 3] != ["ffx", "test", "run"]:
continue
args_after_ffx_run = call_args[ffx_pos + 3 :]
for index, item in enumerate(args_after_ffx_run):
if (
desired_args
== args_after_ffx_run[index : index + len(desired_args)]
):
return
self.fail(f"{desired_args} not found in {call_list}")
def _mock_get_device_environment(
self, env: environment.DeviceEnvironment
) -> mock.MagicMock:
m = mock.AsyncMock(return_value=env)
patch = mock.patch(
"main.execution.get_device_environment_from_exec_env", m
)
patch.start()
self.addCleanup(patch.stop)
return m
def assertIsSubset(
self, subset: set[typing.Any], full: set[typing.Any]
) -> None:
inter = full.intersection(subset)
self.assertEqual(
inter, subset, f"Full set was\n {self.prettyFormatPrefixes(full)}"
)
def prettyFormatPrefixes(self, vals: set[typing.Any]) -> str:
return "\n ".join(map(lambda x: " ".join(x), sorted(vals)))
async def test_dry_run(self) -> None:
"""Test a basic dry run of the command."""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry"]), recorder=recorder
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(
len(selection_event.selected), self.TOTAL_TESTS_IN_INPUT
)
async def test_fuzzy_dry_run(self) -> None:
"""Test a dry run of the command for fuzzy matching"""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry", "--fuzzy=1", "foo_test"]),
recorder=recorder,
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(len(selection_event.selected), 1)
async def test_cancel_before_tests_run(self) -> None:
"""Test that SIGINT before tests start running immediately stops execution"""
self._mock_run_command(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ready_to_kill = asyncio.Event()
# Make builds hang for a long time, signalling that we should
# trigger a SIGINT at the point the build starts.
async def build_handler(_: typing.Any) -> bool:
ready_to_kill.set()
await asyncio.sleep(3600)
return False
build_patch = mock.patch(
"main.AsyncMain._do_build",
mock.AsyncMock(side_effect=build_handler),
)
build_patch.start()
self.addCleanup(build_patch.stop)
recorder = event.EventRecorder()
main_task = asyncio.Task(
main.async_main_wrapper(
args.parse_args(["--simple"]), recorder=recorder
)
)
await ready_to_kill.wait()
os.kill(os.getpid(), signal.SIGINT)
ret = await main_task
self.assertEqual(ret, 1)
errors = {e.error async for e in recorder.iter() if e.error is not None}
self.assertIsSubset({"Terminated due to interrupt"}, errors)
async def test_cancel_tests_wraps_up(self) -> None:
"""Test that SIGINT while tests are running allows them to wrap up and prints output"""
ready_to_kill = asyncio.Event()
async def command_handler(
*args: typing.Any, **kwargs: typing.Any
) -> None:
event: asyncio.Event = kwargs.get("abort_signal") # type: ignore
assert event is not None
ready_to_kill.set()
await event.wait()
_command_mock = self._mock_run_command(
15, async_handler=command_handler
)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
recorder = event.EventRecorder()
main_task = asyncio.Task(
main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"]), recorder=recorder
)
)
await ready_to_kill.wait()
os.kill(os.getpid(), signal.SIGINT)
ret = await main_task
self.assertEqual(ret, 1)
errors = {e.error async for e in recorder.iter() if e.error is not None}
self.assertIsSubset({"Failed to run all tests"}, errors)
aborted_cases = {
(payload_event.status, payload_event.message)
async for e in recorder.iter()
if (payload := e.payload) is not None
and (payload_event := payload.test_suite_ended) is not None
}
self.assertSetEqual(
aborted_cases,
{
(
event.TestSuiteStatus.ABORTED,
"Test suite aborted due to user interrupt.",
)
},
)
async def test_double_sigint_cancels_everything(self) -> None:
"""Test that sending SIGINT twice cancels all tasks, no matter how long running"""
ready_to_kill = asyncio.Event()
async def command_handler(
*args: typing.Any, **kwargs: typing.Any
) -> None:
ready_to_kill.set()
await asyncio.sleep(3600)
_command_mock = self._mock_run_command(
15, async_handler=command_handler
)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
recorder = event.EventRecorder()
main_task = asyncio.Task(
main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"]), recorder=recorder
)
)
await ready_to_kill.wait()
os.kill(os.getpid(), signal.SIGINT)
await asyncio.sleep(0.5)
os.kill(os.getpid(), signal.SIGINT)
ret = await main_task
self.assertEqual(ret, 1)
@parameterized.expand(
[
(["--host"], HOST_TESTS_IN_INPUT - E2E_TESTS_IN_INPUT),
(["--device"], DEVICE_TESTS_IN_INPUT),
(["--only-e2e"], E2E_TESTS_IN_INPUT),
# TODO(https://fxbug.dev/338667899): Enable when we determine how to handle opt-in e2e.
# ([], TOTAL_NON_E2E_TESTS_IN_INPUT),
(["--e2e"], TOTAL_TESTS_IN_INPUT),
]
)
async def test_selection_flags(
self, extra_flags: list[str], expected_count: int
) -> None:
"""Test that the correct --device, --host, or --e2e tests are selected"""
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--dry"] + extra_flags),
recorder=recorder,
)
self.assertEqual(ret, 0)
selection_events: list[event.TestSelectionPayload] = [
e.payload.test_selections
async for e in recorder.iter()
if e.payload is not None and e.payload.test_selections is not None
]
self.assertEqual(len(selection_events), 1)
selection_event = selection_events[0]
self.assertEqual(len(selection_event.selected), expected_count)
@parameterized.expand(
[
("--use-package-hash", DEVICE_TESTS_IN_INPUT),
("--no-use-package-hash", 0),
]
)
async def test_use_package_hash(
self, flag_name: str, expected_hash_matches: int
) -> None:
"""Test ?hash= is used only when --use-package-hash is set"""
command_mock = self._mock_run_command(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"] + [flag_name])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertIsSubset(
{
(
"fx",
"--dir",
self.out_dir,
"ffx",
"test",
"run",
),
},
call_prefixes,
)
hash_params_found: int = 0
for prefix_list in call_prefixes:
entry: str
for entry in prefix_list:
if "?hash=" in entry:
hash_params_found += 1
self.assertEqual(
hash_params_found,
expected_hash_matches,
f"Prefixes were\n{self.prettyFormatPrefixes(call_prefixes)}",
)
@parameterized.expand(
[
("default suggestions", [], 6),
("custom suggestion count", ["--suggestion-count=10"], 10),
("suppress suggestions", ["--no-show-suggestions"], 0),
]
)
async def test_suggestions(
self,
_unused_name: str,
extra_flags: list[str],
expected_suggestion_count: int,
) -> None:
"""Test that targets are suggested when there are no test matches."""
mocked_commands = self._mock_run_commands_in_parallel("No matches")
ret = await main.async_main_wrapper(
args.parse_args(
["--simple", "non_existent_test_does_not_match"] + extra_flags
)
)
self.assertEqual(ret, 1)
if expected_suggestion_count > 0:
self.assertListEqual(
mocked_commands.call_args[0][0],
[
[
"fx",
"--dir",
self.out_dir,
"search-tests",
f"--max-results={expected_suggestion_count}",
"--no-color",
"non_existent_test_does_not_match",
]
],
)
else:
self.assertListEqual(mocked_commands.call_args_list, [])
# TODO(b/295340412): Test that suggestions are suppressed.
@parameterized.expand(
[
("default package server behavior", [], True, True),
(
"override no temporary package server",
["--no-allow-temporary-package-server"],
False,
False,
),
(
"override allow temporary package server",
["--allow-temporary-package-server"],
True,
True,
),
]
)
async def test_missing_package_server(
self,
_unused_name: str,
extra_flags: list[str],
expect_pass: bool,
expect_to_serve: bool,
) -> None:
"""Test different behaviors when a package server is missing"""
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(False)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple"] + extra_flags)
)
if expect_pass:
self.assertEqual(ret, 0)
else:
self.assertNotEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
if expect_to_serve:
self.assertIsSubset(
{("fx", "--dir", self.out_dir, "serve")},
call_prefixes,
)
else:
self.assertNotIn(
("fx", "--dir", self.out_dir, "serve"), call_prefixes
)
async def test_full_success(self) -> None:
"""Test that we can run all tests and report success"""
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--allow-temporary-package-server"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
# Make sure we built, published, and ran the device test.
self.assertIsSubset(
{
(
"fx",
"--dir",
self.out_dir,
"build",
"--default",
"//src/sys:foo_test_package",
"--toolchain=//build/toolchain/host:x64",
"//src/sys:bar_test",
"//src/sys:baz_test",
"//src/tests/end_to_end:example_e2e_test",
"--default",
"//build/images/updates",
),
(
"fx",
"--dir",
self.out_dir,
"ffx",
"repository",
"publish",
),
(
"fx",
"--dir",
self.out_dir,
"ffx",
"test",
"run",
),
},
call_prefixes,
)
self.assertNotIn(("fx", "--dir", self.out_dir, "serve"), call_prefixes)
# Make sure we properly exclude the "broken_case" and "bad_case"
# and count an empty test case set as passing.
self._assert_ffx_test_has_args(
command_mock.call_args_list, ["--test-filter", "-broken_case"]
)
self._assert_ffx_test_has_args(
command_mock.call_args_list, ["--test-filter", "-bad_case"]
)
self._assert_ffx_test_has_args(
command_mock.call_args_list, ["--no-cases-equals-success"]
)
# Make sure we ran the host tests.
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))
# Try running again, but this time replay the previous execution.
output = mock.MagicMock(wraps=io.StringIO())
output.fileno = lambda: -1
with contextlib.redirect_stdout(output):
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"-q",
"--previous",
"replay",
"--replay-speed",
"5",
]
),
replay_mode=True,
)
self.assertEqual(0, ret)
contents = list(map(str.strip, output.getvalue().split("\n")))
contents_for_printing = "\n ".join(contents)
self.assertTrue(
{
"PASSED: host_x64/bar_test",
"PASSED: fuchsia-pkg://fuchsia.com/foo-test#meta/foo_test.cm",
"PASSED: host_x64/baz_test",
"SKIPPED: host_x64/example_e2e_test",
}.issubset(set(contents)),
f"Contents were:\n {contents_for_printing}",
)
async def test_build_e2e(self) -> None:
"""Test that we build an updates package for e2e tests"""
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--only-e2e"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
# Make sure we built, published, and ran the device test.
self.assertIsSubset(
{
(
"fx",
"--dir",
os.path.join(self.fuchsia_dir.name, "out/default"),
"build",
"--toolchain=//build/toolchain/host:x64",
"//src/tests/end_to_end:example_e2e_test",
"--default",
"//build/images/updates",
),
(
"fx",
"--dir",
self.out_dir,
"ffx",
"repository",
"publish",
),
},
call_prefixes,
)
# Make sure we ran the host tests.
self.assertTrue(
any(["example_e2e_test" in v[0] for v in call_prefixes])
)
async def test_build_device_package_lists_only_when_no_merkle(self) -> None:
"""Test that we only build package lists for device tests that are missing a merkle hash"""
with open(self.package_target_file_path, "w") as f:
# Clear the target files so that this test does not have a merkle hash listed.
# This will trigger rebuilding package lists.
f.write('{"signed": {"targets": {}}}')
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-e2e", "--device"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
# Make sure we built, published, and ran the device test.
self.assertIsSubset(
{
(
"fx",
"--dir",
os.path.join(self.fuchsia_dir.name, "out/default"),
"build",
"--default",
"//src/sys:foo_test_package",
"--default",
"//build/images/updates:package_lists",
),
(
"fx",
"--dir",
self.out_dir,
"ffx",
"repository",
"publish",
),
},
call_prefixes,
)
async def test_no_build_package_lists_if_not_needed(self) -> None:
"""Test that we do not build package lists if all packages are present in the repository"""
command_mock = self._mock_run_command(0)
subprocess_mock = self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-e2e", "--device"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
call_prefixes.update(
self._make_call_args_prefix_set(subprocess_mock.call_args_list)
)
self.assertIsSubset(
{
(
"fx",
"--dir",
os.path.join(self.fuchsia_dir.name, "out/default"),
"build",
"--default",
"//src/sys:foo_test_package",
),
(
"fx",
"--dir",
self.out_dir,
"ffx",
"repository",
"publish",
),
},
call_prefixes,
)
for prefix_list in call_prefixes:
self.assertNotIn(
"//build/images/updates:package_lists", prefix_list
)
async def test_no_build(self) -> None:
"""Test that we can run all tests and report success"""
command_mock = self._mock_run_command(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build"])
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertFalse(
("fx", "--dir", self.out_dir, "build") in call_prefixes
)
self.assertFalse(
("fx", "--dir", self.out_dir, "ffx", "repository", "publish")
in call_prefixes
)
self.assertIsSubset(
{
("fx", "--dir", self.out_dir, "ffx", "test", "run"),
},
call_prefixes,
)
# Make sure we ran the host test.
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))
async def test_first_failure(self) -> None:
"""Test that one failing test aborts the rest with --fail"""
command_mock = self._mock_run_command(1)
command_mock.side_effect = [
command.CommandOutput("out", "err", 1, 10, None),
command.CommandOutput("out", "err", 1, 10, None),
command.CommandOutput("out", "err", 1, 10, None),
]
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build", "--fail"])
)
# bar_test and baz_test are not hermetic, so cannot run at the same time.
# One of them will run before the other, which means --fail
# prevents one of them from starting, and we expect to see
# only bar_test (since baz_test is defined later in the file)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertEqual(ret, 1)
self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
self.assertFalse(any(["baz_test" in v[0] for v in call_prefixes]))
async def test_count(self) -> None:
"""Test that we can re-run a test multiple times with --count"""
command_mock = self._mock_run_command(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
# Run each test 3 times, no parallel to better match behavior of failure case test.
ret = await main.async_main_wrapper(
args.parse_args(
["--simple", "--no-build", "--count=3", "--parallel=1"]
)
)
self.assertEqual(ret, 0)
self.assertEqual(
3,
sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(
[
"foo-test?hash=" in " ".join(v[0])
for v in command_mock.call_args_list
]
),
command_mock.call_args_list,
)
async def test_count_with_timeout(self) -> None:
"""Test that we abort running the rest of the tests in a --count group if a timeout occurs."""
command_mock = self._mock_run_command(1)
command_mock.return_value = command.CommandOutput(
"", "", 1, 10, None, was_timeout=True
)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
# Run each test 3 times, no parallel to better match behavior of failure case test.
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"--no-build",
"--count=3",
"--parallel=1",
]
)
)
self.assertEqual(ret, 1)
self.assertEqual(
1,
sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
1,
sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
1,
sum(
[
"foo-test?hash=" in " ".join(v[0])
for v in command_mock.call_args_list
]
),
command_mock.call_args_list,
)
async def test_no_fail_by_group(self) -> None:
"""Test that we continue running the rest of the tests in a --count group if one fails and --no-fail-by-group is passed."""
command_mock = self._mock_run_command(1)
command_mock.return_value = command.CommandOutput(
"", "", 1, 10, None, was_timeout=True
)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
# Run each test 3 times, no parallel to better match behavior of failure case test.
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"--no-build",
"--count=3",
"--parallel=1",
"--no-fail-by-group",
]
)
)
self.assertEqual(ret, 1)
self.assertEqual(
3,
sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
command_mock.call_args_list,
)
self.assertEqual(
3,
sum(
[
"foo-test?hash=" in " ".join(v[0])
for v in command_mock.call_args_list
]
),
command_mock.call_args_list,
)
@parameterized.expand(
[
("existing package server running", True),
("no existing package server running", False),
]
)
async def test_list_command(
self, _unused_name: str, existing_package_server: bool
) -> None:
"""Test that we can list test cases using --list"""
command_mock = self._mock_run_command(0)
command_parallel_mock = self._mock_run_commands_in_parallel(
"foo::test\nbar::test",
)
self._mock_has_package_server_connected_to_device(
existing_package_server
)
self._mock_has_tests_in_base([])
recorder = event.EventRecorder()
# This only works if the first test is a device test.
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build", "--list", "--limit=1"]),
recorder=recorder,
)
self.assertEqual(ret, 0)
self.assertEqual(command_parallel_mock.call_count, 1)
events = [
e.payload.enumerate_test_cases
async for e in recorder.iter()
if e.payload is not None
and e.payload.enumerate_test_cases is not None
]
self.assertEqual(len(events), 1)
self.assertEqual(
events[0].test_case_names,
[
"foo::test",
"bar::test",
],
)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
if existing_package_server:
self.assertNotIn(
("fx", "--dir", self.out_dir, "serve"), call_prefixes
)
else:
self.assertIsSubset(
{("fx", "--dir", self.out_dir, "serve")},
call_prefixes,
)
async def test_list_failing_command(self) -> None:
"""Test that failing to list test cases using --list results in a nonzero exit code"""
command_mock = self._mock_run_commands_in_parallel(
"Failed to create remote control proxy: Timeout attempting to reach target foo.",
100,
)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--no-build", "--list", "--limit=1"]),
recorder=recorder,
)
self.assertEqual(ret, 1)
self.assertEqual(command_mock.call_count, 1)
events = [
e.payload.enumerate_test_cases
async for e in recorder.iter()
if e.payload is not None
and e.payload.enumerate_test_cases is not None
]
self.assertEqual(len(events), 0)
@mock.patch("main.run_build_with_suspended_output", side_effect=[0])
async def test_updateifinbase(self, _build_mock: mock.AsyncMock) -> None:
"""Test that we appropriately update tests in base"""
command_mock = self._mock_run_command(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base(["foo-test"])
ret = await main.async_main_wrapper(
args.parse_args(
[
"--simple",
"--no-build",
"--updateifinbase",
"--parallel",
"1",
]
)
)
self.assertEqual(ret, 0)
call_prefixes = self._make_call_args_prefix_set(
command_mock.call_args_list
)
self.assertIsSubset(
{
(
"fx",
"--dir",
self.out_dir,
"ota",
"--no-build",
)
},
call_prefixes,
)
async def test_print_logs_success(self) -> None:
"""Test that print_logs searches for logs, can be given a log,
and handles invalid data
"""
env = environment.ExecutionEnvironment.initialize_from_args(
args.parse_args([])
)
assert env.log_file
# Create a sample log with 3 tests running
recorder = event.EventRecorder()
recorder.emit_init()
# Simulate one test suite
test_id = recorder.emit_test_suite_started("foo", hermetic=False)
program_id = recorder.emit_program_start(
"bar", ["abcd"], parent=test_id
)
recorder.emit_program_output(
program_id, "Data", event.ProgramOutputStream.STDOUT
)
recorder.emit_program_termination(program_id, 0)
recorder.emit_test_suite_ended(
test_id,
event.TestSuiteStatus.PASSED,
message=None,
)
test_2 = recorder.emit_test_suite_started(
"//other:test2", hermetic=True
)
test_3 = recorder.emit_test_suite_started(
"//other:test3", hermetic=True
)
program_2 = recorder.emit_program_start(
"test", ["arg", "1"], parent=test_2
)
program_3 = recorder.emit_program_start(
"test", ["arg", "2"], parent=test_3
)
recorder.emit_program_output(
program_3,
"line for test 3",
stream=event.ProgramOutputStream.STDOUT,
)
recorder.emit_program_output(
program_2,
"line for test 2",
stream=event.ProgramOutputStream.STDOUT,
)
recorder.emit_program_termination(program_2, 0)
recorder.emit_program_termination(program_3, 0)
recorder.emit_test_suite_ended(
test_2, event.TestSuiteStatus.FAILED, message=None
)
recorder.emit_test_suite_ended(
test_3, event.TestSuiteStatus.PASSED, message=None
)
recorder.emit_end()
with gzip.open(env.log_file, "wt") as out_file:
async for e in recorder.iter():
json.dump(e.to_dict(), out_file) # type:ignore[attr-defined]
print("", file=out_file)
def assert_print_logs_output(return_code: int, output: str) -> None:
self.assertEqual(return_code, 0, f"Content was:\n{output}")
self.assertIsNotNone(
re.search(r"3 tests were run", output, re.MULTILINE),
f"Did not find substring, content was:\n{output}",
)
# Test finding most recent log file.
output = io.StringIO()
with contextlib.redirect_stdout(output):
return_code = main.do_print_logs(args.parse_args([]))
assert_print_logs_output(return_code, output.getvalue())
# Test finding specific log file.
output = io.StringIO()
new_file_path = os.path.join(env.out_dir, "other-file.json.gz")
shutil.move(env.log_file, new_file_path)
with contextlib.redirect_stdout(output):
return_code = main.do_print_logs(
args.parse_args(["--logpath", new_file_path])
)
self.assertEqual(
return_code, 0, f"Content was:\n{output.getvalue()}"
)
self.assertIsNotNone(
re.search(r"3 tests were run", output.getvalue(), re.MULTILINE),
f"Did not find substring, content was:\n{output.getvalue()}",
)
async def test_print_logs_failure(self) -> None:
"""Test that --print-logs prints an error and exits if the log cannot be found"""
# Default search location
output = io.StringIO()
with contextlib.redirect_stderr(output):
self.assertEqual(main.do_print_logs(args.parse_args([])), 1)
self.assertIsNotNone(
re.search(r"No log files found", output.getvalue()),
f"Did not find substring, output was:\n{output.getvalue()}",
)
# Specific missing file
output = io.StringIO()
with contextlib.redirect_stderr(output):
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "does-not-exist")
self.assertEqual(
main.do_print_logs(args.parse_args(["--logpath", path])),
1,
)
self.assertIsNotNone(
re.search(r"No log files found", output.getvalue()),
f"Did not find substring, output was:\n{output.getvalue()}",
)
# Specific file is not a gzip file
output = io.StringIO()
with contextlib.redirect_stderr(output):
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "does-not-exist")
with open(path, "w") as f:
f.writelines(["hello world"])
self.assertEqual(
main.do_print_logs(args.parse_args(["--logpath", path])),
1,
)
self.assertIsNotNone(
re.search(
r"File does not appear to be a gzip file",
output.getvalue(),
),
f"Did not find substring, output was:\n{output.getvalue()}",
)
async def test_print_failed_tests(self) -> None:
"""Test that failed-tests prints out the failed tests"""
env = environment.ExecutionEnvironment.initialize_from_args(
args.parse_args([])
)
assert env.log_file
# Create a sample log with 2 failed tests and one passing test
recorder = event.EventRecorder()
recorder.emit_init()
# Simulate one test suite
test_id = recorder.emit_test_suite_started("foo", hermetic=False)
recorder.emit_test_suite_ended(
test_id,
event.TestSuiteStatus.FAILED,
message=None,
)
test_2 = recorder.emit_test_suite_started(
"//other:test2", hermetic=True
)
test_3 = recorder.emit_test_suite_started(
"//other:test3", hermetic=True
)
recorder.emit_test_suite_ended(
test_2, event.TestSuiteStatus.PASSED, message=None
)
recorder.emit_test_suite_ended(
test_3, event.TestSuiteStatus.FAILED_TO_START, message=None
)
recorder.emit_end()
with gzip.open(env.log_file, "wt") as out_file:
async for e in recorder.iter():
json.dump(e.to_dict(), out_file) # type:ignore[attr-defined]
print("", file=out_file)
def assert_print_failed_tests_output(
return_code: int, output: str
) -> None:
self.assertEqual(return_code, 0, f"Content was:\n{output}")
self.assertIsNotNone(
re.search(
r"The following tests failed in the previous run:",
output,
re.MULTILINE,
),
f"Did not find header substring, content was:\n{output}",
)
self.assertIsNotNone(
re.search(r"\* fx test foo", output, re.MULTILINE),
f"Did not find test 1 substring, content was:\n{output}",
)
self.assertIsNotNone(
re.search(r"\* fx test //other:test3", output, re.MULTILINE),
f"Did not find test 3 substring, content was:\n{output}",
)
# Test finding most recent log file.
output = io.StringIO()
with contextlib.redirect_stdout(output):
return_code = main.do_print_failed(args.parse_args([]))
assert_print_failed_tests_output(return_code, output.getvalue())
async def test_print_failed_tests_no_failures(self) -> None:
"""Test that failed-tests prints no failed tests"""
env = environment.ExecutionEnvironment.initialize_from_args(
args.parse_args([])
)
assert env.log_file
# Create a sample log with 2 failed tests and one passing test
recorder = event.EventRecorder()
recorder.emit_init()
# Simulate one test suite
test_id = recorder.emit_test_suite_started("foo", hermetic=False)
recorder.emit_test_suite_ended(
test_id,
event.TestSuiteStatus.PASSED,
message=None,
)
recorder.emit_end()
with gzip.open(env.log_file, "wt") as out_file:
async for e in recorder.iter():
json.dump(e.to_dict(), out_file) # type:ignore[attr-defined]
print("", file=out_file)
def assert_print_failed_tests_output(
return_code: int, output: str
) -> None:
self.assertEqual(return_code, 0, f"Content was:\n{output}")
self.assertIsNotNone(
re.search(
r"The previous run had no failed tests",
output,
re.MULTILINE,
),
f"Did not find substring, content was:\n{output}",
)
# Test finding most recent log file.
output = io.StringIO()
with contextlib.redirect_stdout(output):
return_code = main.do_print_failed(args.parse_args([]))
assert_print_failed_tests_output(return_code, output.getvalue())
@mock.patch("main.termout.is_valid", return_value=False)
async def test_log_to_stdout(self, _termout_mock: mock.Mock) -> None:
"""Test that we can log everything to stdout, and it parses as JSON lines"""
self._mock_run_command(0)
self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
output = io.StringIO()
with contextlib.redirect_stdout(output):
ret = await main.async_main_wrapper(
args.parse_args(["--logpath", "-"])
)
self.assertEqual(ret, 0)
for line in output.getvalue().splitlines():
if not line:
continue
try:
json.loads(line)
except json.JSONDecodeError as e:
self.fail(
f"Failed to parse line as JSON.\nLine: {line}\nError: {e}"
)
async def test_artifact_options(self) -> None:
"""Test that we handle artifact output directories and can query their value"""
self._mock_run_command(0)
self._mock_subprocess_call(0)
self._mock_has_package_server_connected_to_device(True)
self._mock_has_tests_in_base([])
with self.subTest("no artifact path still produces empty event"):
with tempfile.TemporaryDirectory() as td:
logpath = os.path.join(td, "log.json.gz")
flags = args.parse_args(["--simple", "--logpath", logpath])
ret = await main.async_main_wrapper(flags)
self.assertEqual(ret, 0)
env = environment.ExecutionEnvironment.initialize_from_args(
flags, create_log_file=False
)
artifact_path: str | None = None
for log_entry in log.LogSource.from_env(env).read_log():
if (event := log_entry.log_event) is not None:
if (
event.payload is not None
and (path := event.payload.artifact_directory_path)
is not None
):
artifact_path = path
self.assertIsNone(log_entry.error)
self.assertIsNone(log_entry.warning)
self.assertEqual(artifact_path, "")
# Using the output log file, we should get an error requesting the artifact output.
stderr = io.StringIO()
with contextlib.redirect_stderr(stderr):
ret = main.do_process_previous(
args.parse_args(
[
"-pr",
"artifact-path",
"--logpath",
logpath,
]
)
)
self.assertNotEqual(ret, 0)
lines = stderr.getvalue().splitlines()
self.assertEqual(
lines,
[
"ERROR: The previous run did not specify --artifact-output-directory. Run again with that flag set to get the path."
],
)
with self.subTest(
"artifact path by default goes to the top level directory"
):
with tempfile.TemporaryDirectory() as td:
logpath = os.path.join("log.json.gz")
artifact_root = os.path.join(td, "artifacts")
flags = args.parse_args(
[
"--simple",
"--logpath",
logpath,
"--outdir",
artifact_root,
]
)
ret = await main.async_main_wrapper(flags)
self.assertEqual(ret, 0)
env = environment.ExecutionEnvironment.initialize_from_args(
flags, create_log_file=False
)
artifact_path = None
for log_entry in log.LogSource.from_env(env).read_log():
if (event := log_entry.log_event) is not None:
if (
event.payload is not None
and (path := event.payload.artifact_directory_path)
is not None
):
artifact_path = path
self.assertIsNone(log_entry.error)
self.assertIsNone(log_entry.warning)
self.assertEqual(artifact_path, artifact_root)
# Path gets created automatically.
self.assertTrue(os.path.isdir(artifact_root))
# Delete the artifact directory, checking what happens when it does not exist.
shutil.rmtree(artifact_root)
# Using the output log file, we should see an error getting the path because the directory will not be present.
stderr = io.StringIO()
with contextlib.redirect_stderr(stderr):
ret = main.do_process_previous(
args.parse_args(
[
"-pr",
"artifact-path",
"--logpath",
logpath,
]
)
)
self.assertNotEqual(ret, 0)
lines = stderr.getvalue().splitlines()
self.assertEqual(
lines,
[
"ERROR: The artifact directory is missing, it may have been deleted."
],
)
# Create the artifact directory. Listing artifact path should work now.
os.makedirs(artifact_root)
stdout = mock.MagicMock(wraps=io.StringIO())
stdout.fileno = lambda: -1
with contextlib.redirect_stdout(stdout):
ret = main.do_process_previous(
args.parse_args(
[
"-pr",
"artifact-path",
"--logpath",
logpath,
]
)
)
self.assertEqual(ret, 0)
lines = stdout.getvalue().splitlines()
self.assertEqual(
lines,
[artifact_root],
)
with self.subTest(
"--timestamp-artifacts causes artifacts to go to subdir"
):
with tempfile.TemporaryDirectory() as td:
logpath = os.path.join("log.json.gz")
artifact_root = os.path.join(td, "artifacts")
flags = args.parse_args(
[
"--simple",
"--logpath",
logpath,
"--outdir",
artifact_root,
"--timestamp-artifacts",
]
)
ret = await main.async_main_wrapper(flags)
self.assertEqual(ret, 0)
env = environment.ExecutionEnvironment.initialize_from_args(
flags, create_log_file=False
)
artifact_path = None
for log_entry in log.LogSource.from_env(env).read_log():
if (event := log_entry.log_event) is not None:
if (
event.payload is not None
and (path := event.payload.artifact_directory_path)
is not None
):
artifact_path = path
self.assertIsNone(log_entry.error)
self.assertIsNone(log_entry.warning)
self.assertNotEqual(artifact_path, artifact_root)
assert artifact_path is not None
self.assertEqual(
os.path.commonprefix([artifact_path, artifact_root]),
artifact_root,
)
with self.subTest(
"it is an error to output to an existing, non-empty directory"
):
with tempfile.TemporaryDirectory() as td:
logpath = os.path.join("log.json.gz")
artifact_root = os.path.join(td, "artifacts")
os.mkdir(artifact_root)
with open(os.path.join(artifact_root, "some_file"), "w") as f:
f.write("Demo data")
flags = args.parse_args(
[
"--simple",
"--logpath",
logpath,
"--outdir",
artifact_root,
"--no-timestamp-artifacts",
]
)
ret = await main.async_main_wrapper(flags)
self.assertEqual(ret, 1)
env = environment.ExecutionEnvironment.initialize_from_args(
flags, create_log_file=False
)
artifact_path = None
found_error = False
for log_entry in log.LogSource.from_env(env).read_log():
if (event := log_entry.log_event) is not None:
if (error_message := event.error) is not None:
if (
"Your output directory already exists"
in error_message
):
found_error = True
break
self.assertTrue(
found_error,
"Expected to find an error about output directory existing",
)
async def test_list_runtime_deps_success(self) -> None:
"""Tests the successful listing of runtime dependencies."""
deps_path = "path/to/my_deps.json"
full_deps_path = os.path.join(self.out_dir, deps_path)
os.makedirs(os.path.dirname(full_deps_path), exist_ok=True)
with open(full_deps_path, "w") as f:
json.dump(["dep1", "dep2"], f)
empty_deps_path = "path/to/empty_deps.json"
full_empty_deps_path = os.path.join(self.out_dir, empty_deps_path)
os.makedirs(os.path.dirname(full_empty_deps_path), exist_ok=True)
with open(full_empty_deps_path, "w") as f:
json.dump([], f)
mock_test_with_deps = mock.MagicMock()
mock_test_with_deps.name.return_value = "test_with_deps"
mock_test_with_deps.build.test.runtime_deps = deps_path
mock_test_with_empty_deps = mock.MagicMock()
mock_test_with_empty_deps.name.return_value = "test_with_empty_deps"
mock_test_with_empty_deps.build.test.runtime_deps = empty_deps_path
mock_test_without_deps = mock.MagicMock()
mock_test_without_deps.name.return_value = "test_without_deps"
mock_test_without_deps.build.test.runtime_deps = None
mock_selections = mock.MagicMock()
mock_selections.selected = [
mock_test_with_deps,
mock_test_with_empty_deps,
mock_test_without_deps,
]
mock_selections.selected_but_not_run = []
selection_patch = mock.patch(
"main.selection.select_tests",
mock.AsyncMock(return_value=mock_selections),
)
selection_patch.start()
self.addCleanup(selection_patch.stop)
validate_patch = mock.patch(
"main.AsyncMain._validate_test_selections",
mock.AsyncMock(return_value=None),
)
validate_patch.start()
self.addCleanup(validate_patch.stop)
recorder = event.EventRecorder()
ret = await main.async_main_wrapper(
args.parse_args(["--simple", "--list-runtime-deps", "--no-build"]),
recorder=recorder,
)
self.assertEqual(ret, 0)
payloads = [
e.payload.user_message.value
async for e in recorder.iter()
if e.payload
and e.payload.user_message
and e.payload.user_message.value
]
start_index = payloads.index("test_with_deps:")
self.assertNotEqual(start_index, -1)
deps_full_output = payloads[start_index:]
expected_output = [
"test_with_deps:",
f" Runtime deps file at: {full_deps_path}",
" dep1",
" dep2",
"test_with_empty_deps:",
f" Runtime deps file at: {full_empty_deps_path}",
" File is empty",
"test_without_deps:",
" No runtime deps found for this test",
]
self.assertListEqual(deps_full_output, expected_output)
async def test_list_runtime_deps_file_not_found(self) -> None:
"""Tests that a missing runtime_deps file raises an exception."""
missing_deps_path = "path/to/non_existent_deps.json"
mock_test = mock.MagicMock()
mock_test.name.return_value = "test_with_missing_deps"
mock_test.build.test.runtime_deps = missing_deps_path
mock_selections = mock.MagicMock()
mock_selections.selected = [mock_test]
mock_selections.selected_but_not_run = []
selection_patch = mock.patch(
"main.selection.select_tests",
mock.AsyncMock(return_value=mock_selections),
)
selection_patch.start()
self.addCleanup(selection_patch.stop)
validate_patch = mock.patch(
"main.AsyncMain._validate_test_selections",
mock.AsyncMock(return_value=None),
)
validate_patch.start()
self.addCleanup(validate_patch.stop)
with self.assertRaises(FileNotFoundError):
await main.async_main_wrapper(
args.parse_args(
["--simple", "--list-runtime-deps", "--no-build"]
)
)