# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import asyncio
import contextlib
import gzip
import io
import json
import os
import re
import shutil
import signal
import tempfile
import typing
import unittest
import unittest.mock as mock

import async_utils.command as command
from parameterized import parameterized

import args
import environment
import event
import log
import main
import test_list_file

WARNING_LEVEL = event.MessageLevel.WARNING


class TestMainIntegration(unittest.IsolatedAsyncioTestCase):
    """Integration tests for the main entrypoint.

    These tests encapsulate several real-world invocations of fx test,
    with mocked dependencies.
    """

    DEVICE_TESTS_IN_INPUT = 1
    HOST_TESTS_IN_INPUT = 3
    E2E_TESTS_IN_INPUT = 1
    TOTAL_TESTS_IN_INPUT = DEVICE_TESTS_IN_INPUT + HOST_TESTS_IN_INPUT
    TOTAL_NON_E2E_TESTS_IN_INPUT = TOTAL_TESTS_IN_INPUT - E2E_TESTS_IN_INPUT

    def setUp(self) -> None:
        # Set up a Fake fuchsia directory.
        self.fuchsia_dir = tempfile.TemporaryDirectory()
        self.addCleanup(self.fuchsia_dir.cleanup)

        # Set up mocks
        self.mocks = []

        # Retain the real build dir, if one exists.
        real_fuchsia_dir = os.getenv("FUCHSIA_DIR")

        # Intercept environment and instantiate a new mock FUCHSIA_DIR.
        self.mocks.append(
            mock.patch(
                "os.environ",
                {"FUCHSIA_DIR": self.fuchsia_dir.name},
            )
        )
        for m in self.mocks:
            m.start()
            self.addCleanup(m.stop)

        # Correct for location of the test data files between coverage.py
        # script and how tests are run in-tree.
        cur_path = os.path.dirname(__file__)
        while not os.path.isdir(cur_path):
            cur_path = os.path.split(cur_path)[0]

        # We use an external program to handle fuzzy matching called "dldist".
        # Put the program in the correct location so that the main script can
        # find it.
        os.makedirs(os.path.join(self.fuchsia_dir.name, "bin"))
        dldist_path = os.path.join(self.fuchsia_dir.name, "bin", "dldist")
        if os.path.exists(os.path.join(cur_path, "bin", "dldist")):
            # This path is used when executing the python_host_test target.
            print("Using the local dldist for matching")
            shutil.copy(os.path.join(cur_path, "bin", "dldist"), dldist_path)
        else:
            # This path is used when running coverage.py.
            print("Trying to use FUCHSIA_DIR dldist for the test")
            assert real_fuchsia_dir is not None
            build_dir: str
            with open(os.path.join(real_fuchsia_dir, ".fx-build-dir")) as f:
                build_dir = os.path.join(real_fuchsia_dir, f.read().strip())
            print(build_dir)
            assert os.path.isdir(build_dir)
            shutil.copy(
                os.path.join(build_dir, "host-tools", "dldist"), dldist_path
            )

        self.test_data_path = os.path.join(cur_path, "test_data/build_output")

        self.assertTrue(
            os.path.isfile(os.path.join(self.test_data_path, "tests.json")),
            f"path was {self.test_data_path} for {__file__}",
        )
        self.assertTrue(
            os.path.isfile(os.path.join(self.test_data_path, "test-list.json")),
            f"path was {self.test_data_path} for {__file__}",
        )
        self.test_list_input = os.path.join(
            self.test_data_path, "test-list.json"
        )
        self.assertTrue(
            os.path.isfile(
                os.path.join(self.test_data_path, "package-repositories.json")
            ),
            f"path was {self.test_data_path} for {__file__}",
        )

        disabled_tests_source_file = os.path.join(
            self.test_data_path, "disabled_tests.json"
        )
        self.assertTrue(
            os.path.isfile(disabled_tests_source_file),
            f"path was {self.test_data_path} for {__file__}",
        )

        with open(
            os.path.join(self.fuchsia_dir.name, ".fx-build-dir"), "w"
        ) as f:
            f.write("out/default")

        self.out_dir = os.path.join(self.fuchsia_dir.name, "out/default")
        os.makedirs(self.out_dir)

        for name in [
            "tests.json",
            "test-list.json",
            "package-repositories.json",
            "package-targets.json",
            "all_package_manifests.list",
        ]:
            shutil.copy(
                os.path.join(self.test_data_path, name),
                os.path.join(self.out_dir, name),
            )
        self.package_target_file_path = os.path.join(
            self.out_dir, "package-targets.json"
        )

        # disabled_tests.json must be in place for e2e tests to pass.
        disabled_tests_dest_path = os.path.join(
            self.fuchsia_dir.name, "sdk", "ctf"
        )
        os.makedirs(disabled_tests_dest_path)
        shutil.copy(
            disabled_tests_source_file,
            os.path.join(disabled_tests_dest_path, "disabled_tests.json"),
        )

        # Simulate the generated package metadata to test merging.
        gen_dir = os.path.join(
            self.out_dir, "gen", "build", "images", "updates"
        )
        os.makedirs(gen_dir)
        with open(
            os.path.join(
                gen_dir, "package_manifests_from_metadata.list.package_metadata"
            ),
            "w",
        ) as f:
            f.writelines(
                [
                    "obj/foo/package_manifest.json",
                    "obj/bar/package_manifest.json",
                    "obj/baz/package_manifest.json",
                ]
            )

        self._mock_get_device_environment(
            environment.DeviceEnvironment(
                "localhost", "8080", "foo", "/foo.key"
            )
        )

        # Provide hard-coded predictable test-list.json content rather than
        # actually running the generate_test_list program.
        self._mock_generate_test_list()

        return super().setUp()

    def _mock_run_commands_in_parallel(
        self, stdout: str, return_code: int = 0
    ) -> mock.MagicMock:
        m = mock.AsyncMock(
            return_value=[
                mock.MagicMock(stdout=stdout, return_code=return_code)
            ]
        )
        patch = mock.patch("main.run_commands_in_parallel", m)
        patch.start()
        self.addCleanup(patch.stop)
        return m

    def _mock_run_command(
        self,
        return_code: int,
        async_handler: (
            typing.Callable[[typing.Any, typing.Any], typing.Awaitable[None]]
            | None
        ) = None,
    ) -> mock.MagicMock:
        async def handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> typing.Any:
            if async_handler is not None:
                await async_handler(*args, **kwargs)
            return mock.MagicMock(
                return_code=return_code, stdout="", stderr="", was_timeout=False
            )

        m = mock.AsyncMock(side_effect=handler)
        patch = mock.patch("main.execution.run_command", m)
        patch.start()
        self.addCleanup(patch.stop)
        return m

    def _mock_generate_test_list(self) -> mock.MagicMock:
        test_list_entries = test_list_file.TestListFile.entries_from_file(
            self.test_list_input
        )
        m = mock.AsyncMock(return_value=test_list_entries)
        patch = mock.patch("main.AsyncMain._generate_test_list", m)
        patch.start()
        self.addCleanup(patch.stop)
        return m

    def _mock_subprocess_call(self, value: int) -> mock.MagicMock:
        m = mock.MagicMock(return_value=value)
        patch = mock.patch("main.subprocess.call", m)
        patch.start()
        self.addCleanup(patch.stop)
        return m

    def _mock_has_package_server_connected_to_device(self, value: bool) -> None:
        m = mock.AsyncMock(return_value=value)
        patch = mock.patch("main.has_package_server_connected_to_device", m)
        patch.start()
        self.addCleanup(patch.stop)

    def _mock_has_tests_in_base(self, test_packages: list[str]) -> None:
        with open(os.path.join(self.out_dir, "base_packages.list"), "w") as f:
            json.dump(
                {
                    "content": {
                        "names": test_packages,
                    }
                },
                f,
            )

    def _make_call_args_prefix_set(
        self, call_list: mock._CallList
    ) -> set[tuple[str, ...]]:
        """Given a list of mock calls, turn them into a set of prefixes for comparison.

        For instance, if the mock call is ("fx", "run", "command") the output
        is: {
            ('fx',),
            ('fx', 'run'),
            ('fx', 'run', 'command'),
        }

        This can be used to check containment.

        Args:
            call_list (mock._CallList): Calls to process.

        Returns:
            set[list[typing.Any]]: Set of prefixes to calls.
        """
        ret: set[tuple[str, ...]] = set()
        for call in call_list:
            args, _ = call
            cur = []
            if args and isinstance(args[0], list):
                # Correct for subprocess.call using lists and not *args.
                args = args[0]
            for a in args:
                cur.append(a)
                ret.add(tuple(cur))

        return ret

    def _assert_ffx_test_has_args(
        self, call_list: mock._CallList, desired_args: list[str]
    ) -> None:
        """Verifies args were passed to "ffx test".

        Given a list of mock calls, verifies it includes a call to
        "ffx test run" which includes the given sequence of args."""

        for call in call_list:
            try:
                call_args = list(call.args)
                ffx_pos = call_args.index("ffx")
            except:
                continue
            if call_args[ffx_pos : ffx_pos + 3] != ["ffx", "test", "run"]:
                continue
            args_after_ffx_run = call_args[ffx_pos + 3 :]
            for index, item in enumerate(args_after_ffx_run):
                if (
                    desired_args
                    == args_after_ffx_run[index : index + len(desired_args)]
                ):
                    return
            self.fail(f"{desired_args} not found in {call_list}")

    def _mock_get_device_environment(
        self, env: environment.DeviceEnvironment
    ) -> mock.MagicMock:
        m = mock.AsyncMock(return_value=env)
        patch = mock.patch(
            "main.execution.get_device_environment_from_exec_env", m
        )
        patch.start()
        self.addCleanup(patch.stop)
        return m

    def assertIsSubset(
        self, subset: set[typing.Any], full: set[typing.Any]
    ) -> None:
        inter = full.intersection(subset)
        self.assertEqual(
            inter, subset, f"Full set was\n {self.prettyFormatPrefixes(full)}"
        )

    def prettyFormatPrefixes(self, vals: set[typing.Any]) -> str:
        return "\n ".join(map(lambda x: " ".join(x), sorted(vals)))

    async def test_dry_run(self) -> None:
        """Test a basic dry run of the command."""
        recorder = event.EventRecorder()
        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--dry"]), recorder=recorder
        )
        self.assertEqual(ret, 0)

        selection_events: list[event.TestSelectionPayload] = [
            e.payload.test_selections
            async for e in recorder.iter()
            if e.payload is not None and e.payload.test_selections is not None
        ]

        self.assertEqual(len(selection_events), 1)
        selection_event = selection_events[0]
        self.assertEqual(
            len(selection_event.selected), self.TOTAL_TESTS_IN_INPUT
        )

    async def test_fuzzy_dry_run(self) -> None:
        """Test a dry run of the command for fuzzy matching"""
        recorder = event.EventRecorder()
        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--dry", "--fuzzy=1", "foo_test"]),
            recorder=recorder,
        )
        self.assertEqual(ret, 0)

        selection_events: list[event.TestSelectionPayload] = [
            e.payload.test_selections
            async for e in recorder.iter()
            if e.payload is not None and e.payload.test_selections is not None
        ]

        self.assertEqual(len(selection_events), 1)
        selection_event = selection_events[0]
        self.assertEqual(len(selection_event.selected), 1)

    async def test_cancel_before_tests_run(self) -> None:
        """Test that SIGINT before tests start running immediately stops execution"""

        self._mock_run_command(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ready_to_kill = asyncio.Event()

        # Make builds hang for a long time, signalling that we should
        # trigger a SIGINT at the point the build starts.
        async def build_handler(_: typing.Any) -> bool:
            ready_to_kill.set()
            await asyncio.sleep(3600)
            return False

        build_patch = mock.patch(
            "main.AsyncMain._do_build",
            mock.AsyncMock(side_effect=build_handler),
        )
        build_patch.start()
        self.addCleanup(build_patch.stop)

        recorder = event.EventRecorder()
        main_task = asyncio.Task(
            main.async_main_wrapper(
                args.parse_args(["--simple"]), recorder=recorder
            )
        )

        await ready_to_kill.wait()
        os.kill(os.getpid(), signal.SIGINT)

        ret = await main_task
        self.assertEqual(ret, 1)
        errors = {e.error async for e in recorder.iter() if e.error is not None}
        self.assertIsSubset({"Terminated due to interrupt"}, errors)

    async def test_cancel_tests_wraps_up(self) -> None:
        """Test that SIGINT while tests are running allows them to wrap up and prints output"""

        ready_to_kill = asyncio.Event()

        async def command_handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            event: asyncio.Event = kwargs.get("abort_signal")  # type: ignore
            assert event is not None
            ready_to_kill.set()
            await event.wait()

        _command_mock = self._mock_run_command(
            15, async_handler=command_handler
        )
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        recorder = event.EventRecorder()
        main_task = asyncio.Task(
            main.async_main_wrapper(
                args.parse_args(["--simple", "--no-build"]), recorder=recorder
            )
        )

        await ready_to_kill.wait()
        os.kill(os.getpid(), signal.SIGINT)

        ret = await main_task
        self.assertEqual(ret, 1)
        errors = {e.error async for e in recorder.iter() if e.error is not None}
        self.assertIsSubset({"Failed to run all tests"}, errors)

        aborted_cases = {
            (payload_event.status, payload_event.message)
            async for e in recorder.iter()
            if (payload := e.payload) is not None
            and (payload_event := payload.test_suite_ended) is not None
        }
        self.assertSetEqual(
            aborted_cases,
            {
                (
                    event.TestSuiteStatus.ABORTED,
                    "Test suite aborted due to user interrupt.",
                )
            },
        )

    async def test_double_sigint_cancels_everything(self) -> None:
        """Test that sending SIGINT twice cancels all tasks, no matter how long running"""

        ready_to_kill = asyncio.Event()

        async def command_handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            ready_to_kill.set()
            await asyncio.sleep(3600)

        _command_mock = self._mock_run_command(
            15, async_handler=command_handler
        )
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        recorder = event.EventRecorder()
        main_task = asyncio.Task(
            main.async_main_wrapper(
                args.parse_args(["--simple", "--no-build"]), recorder=recorder
            )
        )

        await ready_to_kill.wait()
        os.kill(os.getpid(), signal.SIGINT)
        await asyncio.sleep(0.5)
        os.kill(os.getpid(), signal.SIGINT)

        ret = await main_task
        self.assertEqual(ret, 1)

    @parameterized.expand(
        [
            (["--host"], HOST_TESTS_IN_INPUT - E2E_TESTS_IN_INPUT),
            (["--device"], DEVICE_TESTS_IN_INPUT),
            (["--only-e2e"], E2E_TESTS_IN_INPUT),
            # TODO(https://fxbug.dev/338667899): Enable when we determine how to handle opt-in e2e.
            # ([], TOTAL_NON_E2E_TESTS_IN_INPUT),
            (["--e2e"], TOTAL_TESTS_IN_INPUT),
        ]
    )
    async def test_selection_flags(
        self, extra_flags: list[str], expected_count: int
    ) -> None:
        """Test that the correct --device, --host, or --e2e tests are selected"""

        recorder = event.EventRecorder()
        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--dry"] + extra_flags),
            recorder=recorder,
        )
        self.assertEqual(ret, 0)

        selection_events: list[event.TestSelectionPayload] = [
            e.payload.test_selections
            async for e in recorder.iter()
            if e.payload is not None and e.payload.test_selections is not None
        ]

        self.assertEqual(len(selection_events), 1)
        selection_event = selection_events[0]
        self.assertEqual(len(selection_event.selected), expected_count)

    @parameterized.expand(
        [
            ("--use-package-hash", DEVICE_TESTS_IN_INPUT),
            ("--no-use-package-hash", 0),
        ]
    )
    async def test_use_package_hash(
        self, flag_name: str, expected_hash_matches: int
    ) -> None:
        """Test ?hash= is used only when --use-package-hash is set"""

        command_mock = self._mock_run_command(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build"] + [flag_name])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "test",
                    "run",
                ),
            },
            call_prefixes,
        )

        hash_params_found: int = 0
        for prefix_list in call_prefixes:
            entry: str
            for entry in prefix_list:
                if "?hash=" in entry:
                    hash_params_found += 1

        self.assertEqual(
            hash_params_found,
            expected_hash_matches,
            f"Prefixes were\n{self.prettyFormatPrefixes(call_prefixes)}",
        )

    @parameterized.expand(
        [
            ("default suggestions", [], 6),
            ("custom suggestion count", ["--suggestion-count=10"], 10),
            ("suppress suggestions", ["--no-show-suggestions"], 0),
        ]
    )
    async def test_suggestions(
        self,
        _unused_name: str,
        extra_flags: list[str],
        expected_suggestion_count: int,
    ) -> None:
        """Test that targets are suggested when there are no test matches."""
        mocked_commands = self._mock_run_commands_in_parallel("No matches")
        ret = await main.async_main_wrapper(
            args.parse_args(
                ["--simple", "non_existent_test_does_not_match"] + extra_flags
            )
        )
        self.assertEqual(ret, 1)
        if expected_suggestion_count > 0:
            self.assertListEqual(
                mocked_commands.call_args[0][0],
                [
                    [
                        "fx",
                        "--dir",
                        self.out_dir,
                        "search-tests",
                        f"--max-results={expected_suggestion_count}",
                        "--no-color",
                        "non_existent_test_does_not_match",
                    ]
                ],
            )
        else:
            self.assertListEqual(mocked_commands.call_args_list, [])

        # TODO(b/295340412): Test that suggestions are suppressed.

    @parameterized.expand(
        [
            ("default package server behavior", [], True, True),
            (
                "override no temporary package server",
                ["--no-allow-temporary-package-server"],
                False,
                False,
            ),
            (
                "override allow temporary package server",
                ["--allow-temporary-package-server"],
                True,
                True,
            ),
        ]
    )
    async def test_missing_package_server(
        self,
        _unused_name: str,
        extra_flags: list[str],
        expect_pass: bool,
        expect_to_serve: bool,
    ) -> None:
        """Test different behaviors when a package server is missing"""
        serve_abort_signal: asyncio.Event | None = None

        async def command_handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            nonlocal serve_abort_signal
            if "serve" in args:
                serve_abort_signal = kwargs.get("abort_signal")

        command_mock = self._mock_run_command(0, async_handler=command_handler)
        subprocess_mock = self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(False)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple"] + extra_flags)
        )
        if expect_pass:
            self.assertEqual(ret, 0)
        else:
            self.assertNotEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        call_prefixes.update(
            self._make_call_args_prefix_set(subprocess_mock.call_args_list)
        )

        if expect_to_serve:
            self.assertIsSubset(
                {("fx", "--dir", self.out_dir, "serve")},
                call_prefixes,
            )
            self.assertIsNotNone(serve_abort_signal)
            self.assertTrue(serve_abort_signal.is_set())  # type: ignore
        else:
            self.assertNotIn(
                ("fx", "--dir", self.out_dir, "serve"), call_prefixes
            )

    async def test_list_command_starts_and_terminates_package_server(
        self,
    ) -> None:
        """Test that we start and terminate a package server for the list command"""

        serve_abort_signal: asyncio.Event | None = None

        async def command_handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            nonlocal serve_abort_signal
            if "serve" in args:
                serve_abort_signal = kwargs.get("abort_signal")

        command_mock = self._mock_run_command(0, async_handler=command_handler)
        self._mock_run_commands_in_parallel("foo::test")
        self._mock_has_package_server_connected_to_device(False)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build", "--list"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        self.assertIsSubset(
            {("fx", "--dir", self.out_dir, "serve")},
            call_prefixes,
        )
        self.assertIsNotNone(serve_abort_signal)
        self.assertTrue(serve_abort_signal.is_set())  # type: ignore

    async def test_package_server_termination_on_generation_error(self) -> None:
        """Test that we terminate the package server if generation fails"""

        serve_abort_signal: asyncio.Event | None = None

        async def command_handler(
            *args: typing.Any, **kwargs: typing.Any
        ) -> None:
            nonlocal serve_abort_signal
            if "serve" in args:
                serve_abort_signal = kwargs.get("abort_signal")

        command_mock = self._mock_run_command(0, async_handler=command_handler)
        self._mock_has_package_server_connected_to_device(False)
        self._mock_has_tests_in_base([])

        with mock.patch(
            "main.AsyncMain._generate_test_list",
            side_effect=ValueError("Generation failed"),
        ):
            ret = await main.async_main_wrapper(
                args.parse_args(["--simple", "--no-build"])
            )
            self.assertEqual(ret, 1)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        self.assertIsSubset(
            {("fx", "--dir", self.out_dir, "serve")},
            call_prefixes,
        )
        self.assertIsNotNone(serve_abort_signal)
        self.assertTrue(serve_abort_signal.is_set())  # type: ignore

    async def test_full_success(self) -> None:
        """Test that we can run all tests and report success"""

        command_mock = self._mock_run_command(0)
        subprocess_mock = self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--allow-temporary-package-server"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        call_prefixes.update(
            self._make_call_args_prefix_set(subprocess_mock.call_args_list)
        )

        # Make sure we built, published, and ran the device test.
        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "build",
                    "--default",
                    "//src/sys:foo_test_package",
                    "--toolchain=//build/toolchain/host:x64",
                    "//src/sys:bar_test",
                    "//src/sys:baz_test",
                    "//src/tests/end_to_end:example_e2e_test",
                    "--default",
                    "//build/images/updates",
                ),
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "repository",
                    "publish",
                ),
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "test",
                    "run",
                ),
            },
            call_prefixes,
        )

        self.assertNotIn(("fx", "--dir", self.out_dir, "serve"), call_prefixes)

        # Make sure we properly exclude the "broken_case" and "bad_case"
        # and count an empty test case set as passing.
        self._assert_ffx_test_has_args(
            command_mock.call_args_list, ["--test-filter", "-broken_case"]
        )
        self._assert_ffx_test_has_args(
            command_mock.call_args_list, ["--test-filter", "-bad_case"]
        )
        self._assert_ffx_test_has_args(
            command_mock.call_args_list, ["--no-cases-equals-success"]
        )

        # Make sure we ran the host tests.
        self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
        self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))

        # Try running again, but this time replay the previous execution.
        output = mock.MagicMock(wraps=io.StringIO())
        output.fileno = lambda: -1
        with contextlib.redirect_stdout(output):
            ret = await main.async_main_wrapper(
                args.parse_args(
                    [
                        "--simple",
                        "-q",
                        "--previous",
                        "replay",
                        "--replay-speed",
                        "5",
                    ]
                ),
                replay_mode=True,
            )
            self.assertEqual(0, ret)

        contents = list(map(str.strip, output.getvalue().split("\n")))
        contents_for_printing = "\n ".join(contents)
        self.assertTrue(
            {
                "PASSED: host_x64/bar_test",
                "PASSED: fuchsia-pkg://fuchsia.com/foo-test#meta/foo_test.cm",
                "PASSED: host_x64/baz_test",
                "SKIPPED: host_x64/example_e2e_test",
            }.issubset(set(contents)),
            f"Contents were:\n {contents_for_printing}",
        )

    async def test_build_e2e(self) -> None:
        """Test that we build an updates package for e2e tests"""

        command_mock = self._mock_run_command(0)
        subprocess_mock = self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--only-e2e"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )
        call_prefixes.update(
            self._make_call_args_prefix_set(subprocess_mock.call_args_list)
        )

        # Make sure we built, published, and ran the device test.
        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    os.path.join(self.fuchsia_dir.name, "out/default"),
                    "build",
                    "--toolchain=//build/toolchain/host:x64",
                    "//src/tests/end_to_end:example_e2e_test",
                    "--default",
                    "//build/images/updates",
                ),
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "repository",
                    "publish",
                ),
            },
            call_prefixes,
        )

        # Make sure we ran the host tests.
        self.assertTrue(
            any(["example_e2e_test" in v[0] for v in call_prefixes])
        )

    async def test_build_device_package_lists_only_when_no_merkle(self) -> None:
        """Test that we only build package lists for device tests that are missing a merkle hash"""

        with open(self.package_target_file_path, "w") as f:
            # Clear the target files so that this test does not have a merkle hash listed.
            # This will trigger rebuilding package lists.
            f.write('{"signed": {"targets": {}}}')

        command_mock = self._mock_run_command(0)
        subprocess_mock = self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-e2e", "--device"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )
        call_prefixes.update(
            self._make_call_args_prefix_set(subprocess_mock.call_args_list)
        )

        # Make sure we built, published, and ran the device test.
        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    os.path.join(self.fuchsia_dir.name, "out/default"),
                    "build",
                    "--default",
                    "//src/sys:foo_test_package",
                    "--default",
                    "//build/images/updates:package_lists",
                ),
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "repository",
                    "publish",
                ),
            },
            call_prefixes,
        )

    async def test_no_build_package_lists_if_not_needed(self) -> None:
        """Test that we do not build package lists if all packages are present in the repository"""

        command_mock = self._mock_run_command(0)
        subprocess_mock = self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-e2e", "--device"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )
        call_prefixes.update(
            self._make_call_args_prefix_set(subprocess_mock.call_args_list)
        )

        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    os.path.join(self.fuchsia_dir.name, "out/default"),
                    "build",
                    "--default",
                    "//src/sys:foo_test_package",
                ),
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ffx",
                    "repository",
                    "publish",
                ),
            },
            call_prefixes,
        )

        for prefix_list in call_prefixes:
            self.assertNotIn(
                "//build/images/updates:package_lists", prefix_list
            )

    async def test_no_build(self) -> None:
        """Test that we can run all tests and report success"""

        command_mock = self._mock_run_command(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build"])
        )
        self.assertEqual(ret, 0)

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        self.assertFalse(
            ("fx", "--dir", self.out_dir, "build") in call_prefixes
        )
        self.assertFalse(
            ("fx", "--dir", self.out_dir, "ffx", "repository", "publish")
            in call_prefixes
        )

        self.assertIsSubset(
            {
                ("fx", "--dir", self.out_dir, "ffx", "test", "run"),
            },
            call_prefixes,
        )

        # Make sure we ran the host test.
        self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
        self.assertTrue(any(["baz_test" in v[0] for v in call_prefixes]))

    async def test_first_failure(self) -> None:
        """Test that one failing test aborts the rest with --fail"""

        command_mock = self._mock_run_command(1)
        command_mock.side_effect = [
            command.CommandOutput("out", "err", 1, 10, None),
            command.CommandOutput("out", "err", 1, 10, None),
            command.CommandOutput("out", "err", 1, 10, None),
        ]

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build", "--fail"])
        )

        # bar_test and baz_test are not hermetic, so cannot run at the same time.
        # One of them will run before the other, which means --fail
        # prevents one of them from starting, and we expect to see
        # only bar_test (since baz_test is defined later in the file)
        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )
        self.assertEqual(ret, 1)

        self.assertTrue(any(["bar_test" in v[0] for v in call_prefixes]))
        self.assertFalse(any(["baz_test" in v[0] for v in call_prefixes]))

    async def test_count(self) -> None:
        """Test that we can re-run a test multiple times with --count"""

        command_mock = self._mock_run_command(0)

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        # Run each test 3 times, no parallel to better match behavior of failure case test.
        ret = await main.async_main_wrapper(
            args.parse_args(
                ["--simple", "--no-build", "--count=3", "--parallel=1"]
            )
        )
        self.assertEqual(ret, 0)

        self.assertEqual(
            3,
            sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            3,
            sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            3,
            sum(
                [
                    "foo-test?hash=" in " ".join(v[0])
                    for v in command_mock.call_args_list
                ]
            ),
            command_mock.call_args_list,
        )

    async def test_count_with_timeout(self) -> None:
        """Test that we abort running the rest of the tests in a --count group if a timeout occurs."""

        command_mock = self._mock_run_command(1)
        command_mock.return_value = command.CommandOutput(
            "", "", 1, 10, None, was_timeout=True
        )

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        # Run each test 3 times, no parallel to better match behavior of failure case test.
        ret = await main.async_main_wrapper(
            args.parse_args(
                [
                    "--simple",
                    "--no-build",
                    "--count=3",
                    "--parallel=1",
                ]
            )
        )
        self.assertEqual(ret, 1)

        self.assertEqual(
            1,
            sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            1,
            sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            1,
            sum(
                [
                    "foo-test?hash=" in " ".join(v[0])
                    for v in command_mock.call_args_list
                ]
            ),
            command_mock.call_args_list,
        )

    async def test_no_fail_by_group(self) -> None:
        """Test that we continue running the rest of the tests in a --count group if one fails and --no-fail-by-group is passed."""

        command_mock = self._mock_run_command(1)
        command_mock.return_value = command.CommandOutput(
            "", "", 1, 10, None, was_timeout=True
        )

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        # Run each test 3 times, no parallel to better match behavior of failure case test.
        ret = await main.async_main_wrapper(
            args.parse_args(
                [
                    "--simple",
                    "--no-build",
                    "--count=3",
                    "--parallel=1",
                    "--no-fail-by-group",
                ]
            )
        )
        self.assertEqual(ret, 1)

        self.assertEqual(
            3,
            sum(["bar_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            3,
            sum(["baz_test" in v[0][0] for v in command_mock.call_args_list]),
            command_mock.call_args_list,
        )
        self.assertEqual(
            3,
            sum(
                [
                    "foo-test?hash=" in " ".join(v[0])
                    for v in command_mock.call_args_list
                ]
            ),
            command_mock.call_args_list,
        )

    @parameterized.expand(
        [
            ("existing package server running", True),
            ("no existing package server running", False),
        ]
    )
    async def test_list_command(
        self, _unused_name: str, existing_package_server: bool
    ) -> None:
        """Test that we can list test cases using --list"""

        command_mock = self._mock_run_command(0)
        command_parallel_mock = self._mock_run_commands_in_parallel(
            "foo::test\nbar::test",
        )

        self._mock_has_package_server_connected_to_device(
            existing_package_server
        )
        self._mock_has_tests_in_base([])

        recorder = event.EventRecorder()

        # This only works if the first test is a device test.
        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build", "--list", "--limit=1"]),
            recorder=recorder,
        )
        self.assertEqual(ret, 0)
        self.assertEqual(command_parallel_mock.call_count, 1)

        events = [
            e.payload.enumerate_test_cases
            async for e in recorder.iter()
            if e.payload is not None
            and e.payload.enumerate_test_cases is not None
        ]
        self.assertEqual(len(events), 1)

        self.assertEqual(
            events[0].test_case_names,
            [
                "foo::test",
                "bar::test",
            ],
        )

        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )

        if existing_package_server:
            self.assertNotIn(
                ("fx", "--dir", self.out_dir, "serve"), call_prefixes
            )
        else:
            self.assertIsSubset(
                {("fx", "--dir", self.out_dir, "serve")},
                call_prefixes,
            )

    async def test_list_failing_command(self) -> None:
        """Test that failing to list test cases using --list results in a nonzero exit code"""

        command_mock = self._mock_run_commands_in_parallel(
            "Failed to create remote control proxy: Timeout attempting to reach target foo.",
            100,
        )

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        recorder = event.EventRecorder()

        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--no-build", "--list", "--limit=1"]),
            recorder=recorder,
        )
        self.assertEqual(ret, 1)
        self.assertEqual(command_mock.call_count, 1)

        events = [
            e.payload.enumerate_test_cases
            async for e in recorder.iter()
            if e.payload is not None
            and e.payload.enumerate_test_cases is not None
        ]
        self.assertEqual(len(events), 0)

    @mock.patch("main.run_build_with_suspended_output", side_effect=[0])
    async def test_updateifinbase(self, _build_mock: mock.AsyncMock) -> None:
        """Test that we appropriately update tests in base"""

        command_mock = self._mock_run_command(0)

        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base(["foo-test"])

        ret = await main.async_main_wrapper(
            args.parse_args(
                [
                    "--simple",
                    "--no-build",
                    "--updateifinbase",
                    "--parallel",
                    "1",
                ]
            )
        )
        self.assertEqual(ret, 0)
        call_prefixes = self._make_call_args_prefix_set(
            command_mock.call_args_list
        )
        self.assertIsSubset(
            {
                (
                    "fx",
                    "--dir",
                    self.out_dir,
                    "ota",
                    "--no-build",
                )
            },
            call_prefixes,
        )

    async def test_print_logs_success(self) -> None:
        """Test that print_logs searches for logs, can be given a log,
        and handles invalid data
        """
        env = environment.ExecutionEnvironment.initialize_from_args(
            args.parse_args([])
        )
        assert env.log_file
        # Create a sample log with 3 tests running
        recorder = event.EventRecorder()
        recorder.emit_init()

        # Simulate one test suite
        test_id = recorder.emit_test_suite_started("foo", hermetic=False)
        program_id = recorder.emit_program_start(
            "bar", ["abcd"], parent=test_id
        )
        recorder.emit_program_output(
            program_id, "Data", event.ProgramOutputStream.STDOUT
        )
        recorder.emit_program_termination(program_id, 0)
        recorder.emit_test_suite_ended(
            test_id,
            event.TestSuiteStatus.PASSED,
            message=None,
        )

        test_2 = recorder.emit_test_suite_started(
            "//other:test2", hermetic=True
        )
        test_3 = recorder.emit_test_suite_started(
            "//other:test3", hermetic=True
        )
        program_2 = recorder.emit_program_start(
            "test", ["arg", "1"], parent=test_2
        )
        program_3 = recorder.emit_program_start(
            "test", ["arg", "2"], parent=test_3
        )
        recorder.emit_program_output(
            program_3,
            "line for test 3",
            stream=event.ProgramOutputStream.STDOUT,
        )
        recorder.emit_program_output(
            program_2,
            "line for test 2",
            stream=event.ProgramOutputStream.STDOUT,
        )
        recorder.emit_program_termination(program_2, 0)
        recorder.emit_program_termination(program_3, 0)
        recorder.emit_test_suite_ended(
            test_2, event.TestSuiteStatus.FAILED, message=None
        )
        recorder.emit_test_suite_ended(
            test_3, event.TestSuiteStatus.PASSED, message=None
        )
        recorder.emit_end()

        with gzip.open(env.log_file, "wt") as out_file:
            async for e in recorder.iter():
                json.dump(e.to_dict(), out_file)  # type:ignore[attr-defined]
                print("", file=out_file)

        def assert_print_logs_output(return_code: int, output: str) -> None:
            self.assertEqual(return_code, 0, f"Content was:\n{output}")
            self.assertIsNotNone(
                re.search(r"3 tests were run", output, re.MULTILINE),
                f"Did not find substring, content was:\n{output}",
            )

        # Test finding most recent log file.
        output = io.StringIO()
        with contextlib.redirect_stdout(output):
            return_code = main.do_print_logs(args.parse_args([]))
            assert_print_logs_output(return_code, output.getvalue())

        # Test finding specific log file.
        output = io.StringIO()
        new_file_path = os.path.join(env.out_dir, "other-file.json.gz")
        shutil.move(env.log_file, new_file_path)

        with contextlib.redirect_stdout(output):
            return_code = main.do_print_logs(
                args.parse_args(["--logpath", new_file_path])
            )
            self.assertEqual(
                return_code, 0, f"Content was:\n{output.getvalue()}"
            )
            self.assertIsNotNone(
                re.search(r"3 tests were run", output.getvalue(), re.MULTILINE),
                f"Did not find substring, content was:\n{output.getvalue()}",
            )

    async def test_print_logs_failure(self) -> None:
        """Test that --print-logs prints an error and exits if the log cannot be found"""

        # Default search location
        output = io.StringIO()
        with contextlib.redirect_stderr(output):
            self.assertEqual(main.do_print_logs(args.parse_args([])), 1)
            self.assertIsNotNone(
                re.search(r"No log files found", output.getvalue()),
                f"Did not find substring, output was:\n{output.getvalue()}",
            )

        # Specific missing file
        output = io.StringIO()
        with contextlib.redirect_stderr(output):
            with tempfile.TemporaryDirectory() as td:
                path = os.path.join(td, "does-not-exist")
                self.assertEqual(
                    main.do_print_logs(args.parse_args(["--logpath", path])),
                    1,
                )
                self.assertIsNotNone(
                    re.search(r"No log files found", output.getvalue()),
                    f"Did not find substring, output was:\n{output.getvalue()}",
                )

        # Specific file is not a gzip file
        output = io.StringIO()
        with contextlib.redirect_stderr(output):
            with tempfile.TemporaryDirectory() as td:
                path = os.path.join(td, "does-not-exist")
                with open(path, "w") as f:
                    f.writelines(["hello world"])
                self.assertEqual(
                    main.do_print_logs(args.parse_args(["--logpath", path])),
                    1,
                )
                self.assertIsNotNone(
                    re.search(
                        r"File does not appear to be a gzip file",
                        output.getvalue(),
                    ),
                    f"Did not find substring, output was:\n{output.getvalue()}",
                )

    async def test_print_failed_tests(self) -> None:
        """Test that failed-tests prints out the failed tests"""
        env = environment.ExecutionEnvironment.initialize_from_args(
            args.parse_args([])
        )
        assert env.log_file
        # Create a sample log with 2 failed tests and one passing test
        recorder = event.EventRecorder()
        recorder.emit_init()

        # Simulate one test suite
        test_id = recorder.emit_test_suite_started("foo", hermetic=False)
        recorder.emit_test_suite_ended(
            test_id,
            event.TestSuiteStatus.FAILED,
            message=None,
        )

        test_2 = recorder.emit_test_suite_started(
            "//other:test2", hermetic=True
        )
        test_3 = recorder.emit_test_suite_started(
            "//other:test3", hermetic=True
        )

        recorder.emit_test_suite_ended(
            test_2, event.TestSuiteStatus.PASSED, message=None
        )
        recorder.emit_test_suite_ended(
            test_3, event.TestSuiteStatus.FAILED_TO_START, message=None
        )
        recorder.emit_end()

        with gzip.open(env.log_file, "wt") as out_file:
            async for e in recorder.iter():
                json.dump(e.to_dict(), out_file)  # type:ignore[attr-defined]
                print("", file=out_file)

        def assert_print_failed_tests_output(
            return_code: int, output: str
        ) -> None:
            self.assertEqual(return_code, 0, f"Content was:\n{output}")
            self.assertIsNotNone(
                re.search(
                    r"The following tests failed in the previous run:",
                    output,
                    re.MULTILINE,
                ),
                f"Did not find header substring, content was:\n{output}",
            )
            self.assertIsNotNone(
                re.search(r"\* fx test foo", output, re.MULTILINE),
                f"Did not find test 1 substring, content was:\n{output}",
            )
            self.assertIsNotNone(
                re.search(r"\* fx test //other:test3", output, re.MULTILINE),
                f"Did not find test 3 substring, content was:\n{output}",
            )

        # Test finding most recent log file.
        output = io.StringIO()
        with contextlib.redirect_stdout(output):
            return_code = main.do_print_failed(args.parse_args([]))
            assert_print_failed_tests_output(return_code, output.getvalue())

    async def test_print_failed_tests_no_failures(self) -> None:
        """Test that failed-tests prints no failed tests"""

        env = environment.ExecutionEnvironment.initialize_from_args(
            args.parse_args([])
        )
        assert env.log_file
        # Create a sample log with 2 failed tests and one passing test
        recorder = event.EventRecorder()
        recorder.emit_init()

        # Simulate one test suite
        test_id = recorder.emit_test_suite_started("foo", hermetic=False)
        recorder.emit_test_suite_ended(
            test_id,
            event.TestSuiteStatus.PASSED,
            message=None,
        )
        recorder.emit_end()

        with gzip.open(env.log_file, "wt") as out_file:
            async for e in recorder.iter():
                json.dump(e.to_dict(), out_file)  # type:ignore[attr-defined]
                print("", file=out_file)

        def assert_print_failed_tests_output(
            return_code: int, output: str
        ) -> None:
            self.assertEqual(return_code, 0, f"Content was:\n{output}")
            self.assertIsNotNone(
                re.search(
                    r"The previous run had no failed tests",
                    output,
                    re.MULTILINE,
                ),
                f"Did not find substring, content was:\n{output}",
            )

        # Test finding most recent log file.
        output = io.StringIO()
        with contextlib.redirect_stdout(output):
            return_code = main.do_print_failed(args.parse_args([]))
            assert_print_failed_tests_output(return_code, output.getvalue())

    @mock.patch("main.termout.is_valid", return_value=False)
    async def test_log_to_stdout(self, _termout_mock: mock.Mock) -> None:
        """Test that we can log everything to stdout, and it parses as JSON lines"""

        self._mock_run_command(0)
        self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        output = io.StringIO()
        with contextlib.redirect_stdout(output):
            ret = await main.async_main_wrapper(
                args.parse_args(["--logpath", "-"])
            )
            self.assertEqual(ret, 0)
        for line in output.getvalue().splitlines():
            if not line:
                continue
            try:
                json.loads(line)
            except json.JSONDecodeError as e:
                self.fail(
                    f"Failed to parse line as JSON.\nLine: {line}\nError: {e}"
                )

    async def test_artifact_options(self) -> None:
        """Test that we handle artifact output directories and can query their value"""

        self._mock_run_command(0)
        self._mock_subprocess_call(0)
        self._mock_has_package_server_connected_to_device(True)
        self._mock_has_tests_in_base([])

        with self.subTest("no artifact path still produces empty event"):
            with tempfile.TemporaryDirectory() as td:
                logpath = os.path.join(td, "log.json.gz")
                flags = args.parse_args(["--simple", "--logpath", logpath])
                ret = await main.async_main_wrapper(flags)
                self.assertEqual(ret, 0)

                env = environment.ExecutionEnvironment.initialize_from_args(
                    flags, create_log_file=False
                )

                artifact_path: str | None = None
                for log_entry in log.LogSource.from_env(env).read_log():
                    if (event := log_entry.log_event) is not None:
                        if (
                            event.payload is not None
                            and (path := event.payload.artifact_directory_path)
                            is not None
                        ):
                            artifact_path = path
                    self.assertIsNone(log_entry.error)
                    self.assertIsNone(log_entry.warning)
                self.assertEqual(artifact_path, "")

                # Using the output log file, we should get an error requesting the artifact output.
                stderr = io.StringIO()
                with contextlib.redirect_stderr(stderr):
                    ret = main.do_process_previous(
                        args.parse_args(
                            [
                                "-pr",
                                "artifact-path",
                                "--logpath",
                                logpath,
                            ]
                        )
                    )
                    self.assertNotEqual(ret, 0)

                lines = stderr.getvalue().splitlines()
                self.assertEqual(
                    lines,
                    [
                        "ERROR: The previous run did not specify --artifact-output-directory. Run again with that flag set to get the path."
                    ],
                )

        with self.subTest(
            "artifact path by default goes to the top level directory"
        ):
            with tempfile.TemporaryDirectory() as td:
                logpath = os.path.join("log.json.gz")
                artifact_root = os.path.join(td, "artifacts")
                flags = args.parse_args(
                    [
                        "--simple",
                        "--logpath",
                        logpath,
                        "--outdir",
                        artifact_root,
                    ]
                )
                ret = await main.async_main_wrapper(flags)
                self.assertEqual(ret, 0)

                env = environment.ExecutionEnvironment.initialize_from_args(
                    flags, create_log_file=False
                )

                artifact_path = None
                for log_entry in log.LogSource.from_env(env).read_log():
                    if (event := log_entry.log_event) is not None:
                        if (
                            event.payload is not None
                            and (path := event.payload.artifact_directory_path)
                            is not None
                        ):
                            artifact_path = path
                    self.assertIsNone(log_entry.error)
                    self.assertIsNone(log_entry.warning)
                self.assertEqual(artifact_path, artifact_root)

                # Path gets created automatically.
                self.assertTrue(os.path.isdir(artifact_root))

                # Delete the artifact directory, checking what happens when it does not exist.
                shutil.rmtree(artifact_root)

                # Using the output log file, we should see an error getting the path because the directory will not be present.
                stderr = io.StringIO()
                with contextlib.redirect_stderr(stderr):
                    ret = main.do_process_previous(
                        args.parse_args(
                            [
                                "-pr",
                                "artifact-path",
                                "--logpath",
                                logpath,
                            ]
                        )
                    )
                    self.assertNotEqual(ret, 0)

                lines = stderr.getvalue().splitlines()
                self.assertEqual(
                    lines,
                    [
                        "ERROR: The artifact directory is missing, it may have been deleted."
                    ],
                )

                # Create the artifact directory. Listing artifact path should work now.
                os.makedirs(artifact_root)
                stdout = mock.MagicMock(wraps=io.StringIO())
                stdout.fileno = lambda: -1
                with contextlib.redirect_stdout(stdout):
                    ret = main.do_process_previous(
                        args.parse_args(
                            [
                                "-pr",
                                "artifact-path",
                                "--logpath",
                                logpath,
                            ]
                        )
                    )
                    self.assertEqual(ret, 0)

                lines = stdout.getvalue().splitlines()
                self.assertEqual(
                    lines,
                    [artifact_root],
                )

        with self.subTest(
            "--timestamp-artifacts causes artifacts to go to subdir"
        ):
            with tempfile.TemporaryDirectory() as td:
                logpath = os.path.join("log.json.gz")
                artifact_root = os.path.join(td, "artifacts")
                flags = args.parse_args(
                    [
                        "--simple",
                        "--logpath",
                        logpath,
                        "--outdir",
                        artifact_root,
                        "--timestamp-artifacts",
                    ]
                )
                ret = await main.async_main_wrapper(flags)
                self.assertEqual(ret, 0)

                env = environment.ExecutionEnvironment.initialize_from_args(
                    flags, create_log_file=False
                )

                artifact_path = None
                for log_entry in log.LogSource.from_env(env).read_log():
                    if (event := log_entry.log_event) is not None:
                        if (
                            event.payload is not None
                            and (path := event.payload.artifact_directory_path)
                            is not None
                        ):
                            artifact_path = path
                    self.assertIsNone(log_entry.error)
                    self.assertIsNone(log_entry.warning)
                self.assertNotEqual(artifact_path, artifact_root)
                assert artifact_path is not None
                self.assertEqual(
                    os.path.commonprefix([artifact_path, artifact_root]),
                    artifact_root,
                )

        with self.subTest(
            "it is an error to output to an existing, non-empty directory"
        ):
            with tempfile.TemporaryDirectory() as td:
                logpath = os.path.join("log.json.gz")
                artifact_root = os.path.join(td, "artifacts")
                os.mkdir(artifact_root)
                with open(os.path.join(artifact_root, "some_file"), "w") as f:
                    f.write("Demo data")
                flags = args.parse_args(
                    [
                        "--simple",
                        "--logpath",
                        logpath,
                        "--outdir",
                        artifact_root,
                        "--no-timestamp-artifacts",
                    ]
                )
                ret = await main.async_main_wrapper(flags)
                self.assertEqual(ret, 1)

                env = environment.ExecutionEnvironment.initialize_from_args(
                    flags, create_log_file=False
                )

                artifact_path = None
                found_error = False
                for log_entry in log.LogSource.from_env(env).read_log():
                    if (event := log_entry.log_event) is not None:
                        if (error_message := event.error) is not None:
                            if (
                                "Your output directory already exists"
                                in error_message
                            ):
                                found_error = True
                                break

                self.assertTrue(
                    found_error,
                    "Expected to find an error about output directory existing",
                )

    async def test_list_runtime_deps_success(self) -> None:
        """Tests the successful listing of runtime dependencies."""
        deps_path = "path/to/my_deps.json"
        full_deps_path = os.path.join(self.out_dir, deps_path)
        os.makedirs(os.path.dirname(full_deps_path), exist_ok=True)
        with open(full_deps_path, "w") as f:
            json.dump(["dep1", "dep2"], f)

        empty_deps_path = "path/to/empty_deps.json"
        full_empty_deps_path = os.path.join(self.out_dir, empty_deps_path)
        os.makedirs(os.path.dirname(full_empty_deps_path), exist_ok=True)
        with open(full_empty_deps_path, "w") as f:
            json.dump([], f)

        mock_test_with_deps = mock.MagicMock()
        mock_test_with_deps.name.return_value = "test_with_deps"
        mock_test_with_deps.build.test.runtime_deps = deps_path

        mock_test_with_empty_deps = mock.MagicMock()
        mock_test_with_empty_deps.name.return_value = "test_with_empty_deps"
        mock_test_with_empty_deps.build.test.runtime_deps = empty_deps_path

        mock_test_without_deps = mock.MagicMock()
        mock_test_without_deps.name.return_value = "test_without_deps"
        mock_test_without_deps.build.test.runtime_deps = None

        mock_selections = mock.MagicMock()
        mock_selections.selected = [
            mock_test_with_deps,
            mock_test_with_empty_deps,
            mock_test_without_deps,
        ]
        mock_selections.selected_but_not_run = []

        selection_patch = mock.patch(
            "main.selection.select_tests",
            mock.AsyncMock(return_value=mock_selections),
        )
        selection_patch.start()
        self.addCleanup(selection_patch.stop)

        validate_patch = mock.patch(
            "main.AsyncMain._validate_test_selections",
            mock.AsyncMock(return_value=None),
        )
        validate_patch.start()
        self.addCleanup(validate_patch.stop)

        recorder = event.EventRecorder()
        ret = await main.async_main_wrapper(
            args.parse_args(["--simple", "--list-runtime-deps", "--no-build"]),
            recorder=recorder,
        )
        self.assertEqual(ret, 0)
        payloads = [
            e.payload.user_message.value
            async for e in recorder.iter()
            if e.payload
            and e.payload.user_message
            and e.payload.user_message.value
        ]
        start_index = payloads.index("test_with_deps:")
        self.assertNotEqual(start_index, -1)
        deps_full_output = payloads[start_index:]
        expected_output = [
            "test_with_deps:",
            f"  Runtime deps file at: {full_deps_path}",
            "  dep1",
            "  dep2",
            "test_with_empty_deps:",
            f"  Runtime deps file at: {full_empty_deps_path}",
            "  File is empty",
            "test_without_deps:",
            "  No runtime deps found for this test",
        ]
        self.assertListEqual(deps_full_output, expected_output)

    async def test_list_runtime_deps_file_not_found(self) -> None:
        """Tests that a missing runtime_deps file raises an exception."""
        missing_deps_path = "path/to/non_existent_deps.json"
        mock_test = mock.MagicMock()
        mock_test.name.return_value = "test_with_missing_deps"
        mock_test.build.test.runtime_deps = missing_deps_path

        mock_selections = mock.MagicMock()
        mock_selections.selected = [mock_test]
        mock_selections.selected_but_not_run = []

        selection_patch = mock.patch(
            "main.selection.select_tests",
            mock.AsyncMock(return_value=mock_selections),
        )
        selection_patch.start()
        self.addCleanup(selection_patch.stop)

        validate_patch = mock.patch(
            "main.AsyncMain._validate_test_selections",
            mock.AsyncMock(return_value=None),
        )
        validate_patch.start()
        self.addCleanup(validate_patch.stop)

        with self.assertRaises(FileNotFoundError):
            await main.async_main_wrapper(
                args.parse_args(
                    ["--simple", "--list-runtime-deps", "--no-build"]
                )
            )
