| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import subprocess |
| import unittest |
| |
| import mock |
| |
| from antlion.libs.proc.process import Process, ProcessError |
| |
| |
| class FakeThread(object): |
| def __init__(self, target=None): |
| self.target = target |
| self.alive = False |
| |
| def _on_start(self): |
| pass |
| |
| def start(self): |
| self.alive = True |
| if self._on_start: |
| self._on_start() |
| |
| def stop(self): |
| self.alive = False |
| |
| def join(self): |
| pass |
| |
| |
| class ProcessTest(unittest.TestCase): |
| """Tests the antlion.libs.proc.process.Process class.""" |
| |
| def setUp(self): |
| self._Process__start_process = Process._Process__start_process |
| |
| def tearDown(self): |
| Process._Process__start_process = self._Process__start_process |
| |
| @staticmethod |
| def patch(imported_name, *args, **kwargs): |
| return mock.patch(f"antlion.libs.proc.process.{imported_name}", *args, **kwargs) |
| |
| # set_on_output_callback |
| |
| def test_set_on_output_callback(self): |
| """Tests that set_on_output_callback sets on_output_callback.""" |
| callback = mock.Mock() |
| |
| process = Process("cmd").set_on_output_callback(callback) |
| process._on_output_callback() |
| |
| self.assertTrue(callback.called) |
| |
| # set_on_terminate_callback |
| |
| def test_set_on_terminate_callback(self): |
| """Tests that set_on_terminate_callback sets _on_terminate_callback.""" |
| callback = mock.Mock() |
| |
| process = Process("cmd").set_on_terminate_callback(callback) |
| process._on_terminate_callback() |
| |
| self.assertTrue(callback.called) |
| |
| # start |
| |
| def test_start_raises_if_called_back_to_back(self): |
| """Tests that start raises an exception if it has already been called |
| prior. |
| |
| This is required to prevent references to processes and threads from |
| being overwritten, potentially causing ACTS to hang.""" |
| process = Process("cmd") |
| |
| # Here we need the thread to start the process object. |
| class FakeThreadImpl(FakeThread): |
| def _on_start(self): |
| process._process = mock.Mock() |
| |
| with self.patch("Thread", FakeThreadImpl): |
| process.start() |
| expected_msg = "Process has already started." |
| with self.assertRaisesRegex(ProcessError, expected_msg): |
| process.start() |
| |
| def test_start_starts_listening_thread(self): |
| """Tests that start starts the _exec_popen_loop function.""" |
| process = Process("cmd") |
| |
| # Here we need the thread to start the process object. |
| class FakeThreadImpl(FakeThread): |
| def _on_start(self): |
| process._process = mock.Mock() |
| |
| with self.patch("Thread", FakeThreadImpl): |
| process.start() |
| |
| self.assertTrue(process._listening_thread.alive) |
| self.assertEqual(process._listening_thread.target, process._exec_loop) |
| |
| # wait |
| |
| def test_wait_raises_if_called_back_to_back(self): |
| """Tests that wait raises an exception if it has already been called |
| prior.""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| |
| process.wait(0) |
| expected_msg = "Process is already being stopped." |
| with self.assertRaisesRegex(ProcessError, expected_msg): |
| process.wait(0) |
| |
| @mock.patch.object(Process, "_kill_process") |
| def test_wait_kills_after_timeout(self, *_): |
| """Tests that if a TimeoutExpired error is thrown during wait, the |
| process is killed.""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process._process.wait.side_effect = subprocess.TimeoutExpired("", "") |
| |
| process.wait(0) |
| |
| self.assertEqual(process._kill_process.called, True) |
| |
| @mock.patch("os.getpgid", side_effect=lambda id: id) |
| @mock.patch("os.killpg") |
| def test_sends_signal(self, mock_os, *_): |
| """Tests that signal is sent to process..""" |
| process = Process("cmd") |
| mock_process = mock.Mock() |
| mock_process.pid = -1 |
| process._process = mock_process |
| |
| process.signal(51641) |
| |
| mock_os.assert_called_with(-1, 51641) |
| |
| def test_signal_raises_error_on_windows(self, *_): |
| """Tests that signaling is unsupported in windows with appropriate |
| error msg.""" |
| process = Process("cmd") |
| mock_inner_process = mock.Mock() |
| mock_inner_process.pid = -1 |
| process._process = mock_inner_process |
| |
| with mock.patch("antlion.libs.proc.process._on_windows", True): |
| with self.assertRaises(ProcessError): |
| process.signal(51641) |
| |
| @mock.patch.object(Process, "_kill_process") |
| def test_wait_sets_stopped_to_true_before_process_kill(self, *_): |
| """Tests that stop() sets the _stopped attribute to True. |
| |
| This order is required to prevent the _exec_loop from calling |
| _on_terminate_callback when the user has killed the process. |
| """ |
| verifier = mock.Mock() |
| verifier.passed = False |
| |
| def test_call_order(): |
| self.assertTrue(process._stopped) |
| verifier.passed = True |
| |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process._process.poll.return_value = None |
| process._process.wait.side_effect = subprocess.TimeoutExpired("", "") |
| process._kill_process = test_call_order |
| |
| process.wait() |
| |
| self.assertEqual(verifier.passed, True) |
| |
| def test_wait_joins_listening_thread_if_it_exists(self): |
| """Tests wait() joins _listening_thread if it exists.""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| mocked_thread = mock.Mock() |
| process._listening_thread = mocked_thread |
| |
| process.wait(0) |
| |
| self.assertEqual(mocked_thread.join.called, True) |
| |
| def test_wait_clears_listening_thread_if_it_exists(self): |
| """Tests wait() joins _listening_thread if it exists. |
| |
| Threads can only be started once, so after wait has been called, we |
| want to make sure we clear the listening thread. |
| """ |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process._listening_thread = mock.Mock() |
| |
| process.wait(0) |
| |
| self.assertEqual(process._listening_thread, None) |
| |
| def test_wait_joins_redirection_thread_if_it_exists(self): |
| """Tests wait() joins _listening_thread if it exists.""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| mocked_thread = mock.Mock() |
| process._redirection_thread = mocked_thread |
| |
| process.wait(0) |
| |
| self.assertEqual(mocked_thread.join.called, True) |
| |
| def test_wait_clears_redirection_thread_if_it_exists(self): |
| """Tests wait() joins _listening_thread if it exists. |
| |
| Threads can only be started once, so after wait has been called, we |
| want to make sure we clear the listening thread. |
| """ |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process._redirection_thread = mock.Mock() |
| |
| process.wait(0) |
| |
| self.assertEqual(process._redirection_thread, None) |
| |
| # stop |
| |
| def test_stop_sets_stopped_to_true(self): |
| """Tests that stop() sets the _stopped attribute to True.""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| |
| process.stop() |
| |
| self.assertTrue(process._stopped) |
| |
| def test_stop_sets_stopped_to_true_before_process_kill(self): |
| """Tests that stop() sets the _stopped attribute to True. |
| |
| This order is required to prevent the _exec_loop from calling |
| _on_terminate_callback when the user has killed the process. |
| """ |
| verifier = mock.Mock() |
| verifier.passed = False |
| |
| def test_call_order(): |
| self.assertTrue(process._stopped) |
| verifier.passed = True |
| |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process._process.poll.return_value = None |
| process._kill_process = test_call_order |
| process._process.wait.side_effect = subprocess.TimeoutExpired("", "") |
| |
| process.stop() |
| |
| self.assertEqual(verifier.passed, True) |
| |
| def test_stop_calls_wait(self): |
| """Tests that stop() also has the functionality of wait().""" |
| process = Process("cmd") |
| process._process = mock.Mock() |
| process.wait = mock.Mock() |
| |
| process.stop() |
| |
| self.assertEqual(process.wait.called, True) |
| |
| # _redirect_output |
| |
| def test_redirect_output_feeds_all_lines_to_on_output_callback(self): |
| """Tests that _redirect_output loops until all lines are parsed.""" |
| received_list = [] |
| |
| def appender(line): |
| received_list.append(line) |
| |
| process = Process("cmd") |
| process.set_on_output_callback(appender) |
| process._process = mock.Mock() |
| process._process.stdout.readline.side_effect = [b"a\n", b"b\n", b""] |
| |
| process._redirect_output() |
| |
| self.assertEqual(received_list[0], "a") |
| self.assertEqual(received_list[1], "b") |
| self.assertEqual(len(received_list), 2) |
| |
| # __start_process |
| |
| def test_start_process_returns_a_popen_object(self): |
| """Tests that a Popen object is returned by __start_process.""" |
| with self.patch("subprocess.Popen", return_value="verification"): |
| self.assertEqual(Process._Process__start_process("cmd"), "verification") |
| |
| # _exec_loop |
| |
| def test_exec_loop_redirections_output(self): |
| """Tests that the _exec_loop function calls to redirect the output.""" |
| process = Process("cmd") |
| Process._Process__start_process = mock.Mock() |
| |
| with self.patch("Thread", FakeThread): |
| process._exec_loop() |
| |
| self.assertEqual(process._redirection_thread.target, process._redirect_output) |
| self.assertEqual(process._redirection_thread.alive, True) |
| |
| def test_exec_loop_waits_for_process(self): |
| """Tests that the _exec_loop waits for the process to complete before |
| returning.""" |
| process = Process("cmd") |
| Process._Process__start_process = mock.Mock() |
| |
| with self.patch("Thread", FakeThread): |
| process._exec_loop() |
| |
| self.assertEqual(process._process.wait.called, True) |
| |
| def test_exec_loop_loops_if_not_stopped(self): |
| process = Process("1st") |
| Process._Process__start_process = mock.Mock() |
| process._on_terminate_callback = mock.Mock(side_effect=[["2nd"], None]) |
| |
| with self.patch("Thread", FakeThread): |
| process._exec_loop() |
| |
| self.assertEqual(Process._Process__start_process.call_count, 2) |
| self.assertEqual( |
| Process._Process__start_process.call_args_list[0][0], (["1st"],) |
| ) |
| self.assertEqual( |
| Process._Process__start_process.call_args_list[1][0], (["2nd"],) |
| ) |
| |
| def test_exec_loop_does_not_loop_if_stopped(self): |
| process = Process("1st") |
| Process._Process__start_process = mock.Mock() |
| process._on_terminate_callback = mock.Mock(side_effect=["2nd", None]) |
| process._stopped = True |
| |
| with self.patch("Thread", FakeThread): |
| process._exec_loop() |
| |
| self.assertEqual(Process._Process__start_process.call_count, 1) |
| self.assertEqual( |
| Process._Process__start_process.call_args_list[0][0], (["1st"],) |
| ) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |