blob: d362c9b600e83fd4f139693e684e8b4df8f8ab3d [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import unittest
import mock
from antlion.libs.proc.process import Process, ProcessError
class FakeThread(object):
def __init__(self, target=None):
self.target = target
self.alive = False
def _on_start(self):
pass
def start(self):
self.alive = True
if self._on_start:
self._on_start()
def stop(self):
self.alive = False
def join(self):
pass
class ProcessTest(unittest.TestCase):
"""Tests the antlion.libs.proc.process.Process class."""
def setUp(self):
self._Process__start_process = Process._Process__start_process
def tearDown(self):
Process._Process__start_process = self._Process__start_process
@staticmethod
def patch(imported_name, *args, **kwargs):
return mock.patch(f"antlion.libs.proc.process.{imported_name}", *args, **kwargs)
# set_on_output_callback
def test_set_on_output_callback(self):
"""Tests that set_on_output_callback sets on_output_callback."""
callback = mock.Mock()
process = Process("cmd").set_on_output_callback(callback)
process._on_output_callback()
self.assertTrue(callback.called)
# set_on_terminate_callback
def test_set_on_terminate_callback(self):
"""Tests that set_on_terminate_callback sets _on_terminate_callback."""
callback = mock.Mock()
process = Process("cmd").set_on_terminate_callback(callback)
process._on_terminate_callback()
self.assertTrue(callback.called)
# start
def test_start_raises_if_called_back_to_back(self):
"""Tests that start raises an exception if it has already been called
prior.
This is required to prevent references to processes and threads from
being overwritten, potentially causing ACTS to hang."""
process = Process("cmd")
# Here we need the thread to start the process object.
class FakeThreadImpl(FakeThread):
def _on_start(self):
process._process = mock.Mock()
with self.patch("Thread", FakeThreadImpl):
process.start()
expected_msg = "Process has already started."
with self.assertRaisesRegex(ProcessError, expected_msg):
process.start()
def test_start_starts_listening_thread(self):
"""Tests that start starts the _exec_popen_loop function."""
process = Process("cmd")
# Here we need the thread to start the process object.
class FakeThreadImpl(FakeThread):
def _on_start(self):
process._process = mock.Mock()
with self.patch("Thread", FakeThreadImpl):
process.start()
self.assertTrue(process._listening_thread.alive)
self.assertEqual(process._listening_thread.target, process._exec_loop)
# wait
def test_wait_raises_if_called_back_to_back(self):
"""Tests that wait raises an exception if it has already been called
prior."""
process = Process("cmd")
process._process = mock.Mock()
process.wait(0)
expected_msg = "Process is already being stopped."
with self.assertRaisesRegex(ProcessError, expected_msg):
process.wait(0)
@mock.patch.object(Process, "_kill_process")
def test_wait_kills_after_timeout(self, *_):
"""Tests that if a TimeoutExpired error is thrown during wait, the
process is killed."""
process = Process("cmd")
process._process = mock.Mock()
process._process.wait.side_effect = subprocess.TimeoutExpired("", "")
process.wait(0)
self.assertEqual(process._kill_process.called, True)
@mock.patch("os.getpgid", side_effect=lambda id: id)
@mock.patch("os.killpg")
def test_sends_signal(self, mock_os, *_):
"""Tests that signal is sent to process.."""
process = Process("cmd")
mock_process = mock.Mock()
mock_process.pid = -1
process._process = mock_process
process.signal(51641)
mock_os.assert_called_with(-1, 51641)
def test_signal_raises_error_on_windows(self, *_):
"""Tests that signaling is unsupported in windows with appropriate
error msg."""
process = Process("cmd")
mock_inner_process = mock.Mock()
mock_inner_process.pid = -1
process._process = mock_inner_process
with mock.patch("antlion.libs.proc.process._on_windows", True):
with self.assertRaises(ProcessError):
process.signal(51641)
@mock.patch.object(Process, "_kill_process")
def test_wait_sets_stopped_to_true_before_process_kill(self, *_):
"""Tests that stop() sets the _stopped attribute to True.
This order is required to prevent the _exec_loop from calling
_on_terminate_callback when the user has killed the process.
"""
verifier = mock.Mock()
verifier.passed = False
def test_call_order():
self.assertTrue(process._stopped)
verifier.passed = True
process = Process("cmd")
process._process = mock.Mock()
process._process.poll.return_value = None
process._process.wait.side_effect = subprocess.TimeoutExpired("", "")
process._kill_process = test_call_order
process.wait()
self.assertEqual(verifier.passed, True)
def test_wait_joins_listening_thread_if_it_exists(self):
"""Tests wait() joins _listening_thread if it exists."""
process = Process("cmd")
process._process = mock.Mock()
mocked_thread = mock.Mock()
process._listening_thread = mocked_thread
process.wait(0)
self.assertEqual(mocked_thread.join.called, True)
def test_wait_clears_listening_thread_if_it_exists(self):
"""Tests wait() joins _listening_thread if it exists.
Threads can only be started once, so after wait has been called, we
want to make sure we clear the listening thread.
"""
process = Process("cmd")
process._process = mock.Mock()
process._listening_thread = mock.Mock()
process.wait(0)
self.assertEqual(process._listening_thread, None)
def test_wait_joins_redirection_thread_if_it_exists(self):
"""Tests wait() joins _listening_thread if it exists."""
process = Process("cmd")
process._process = mock.Mock()
mocked_thread = mock.Mock()
process._redirection_thread = mocked_thread
process.wait(0)
self.assertEqual(mocked_thread.join.called, True)
def test_wait_clears_redirection_thread_if_it_exists(self):
"""Tests wait() joins _listening_thread if it exists.
Threads can only be started once, so after wait has been called, we
want to make sure we clear the listening thread.
"""
process = Process("cmd")
process._process = mock.Mock()
process._redirection_thread = mock.Mock()
process.wait(0)
self.assertEqual(process._redirection_thread, None)
# stop
def test_stop_sets_stopped_to_true(self):
"""Tests that stop() sets the _stopped attribute to True."""
process = Process("cmd")
process._process = mock.Mock()
process.stop()
self.assertTrue(process._stopped)
def test_stop_sets_stopped_to_true_before_process_kill(self):
"""Tests that stop() sets the _stopped attribute to True.
This order is required to prevent the _exec_loop from calling
_on_terminate_callback when the user has killed the process.
"""
verifier = mock.Mock()
verifier.passed = False
def test_call_order():
self.assertTrue(process._stopped)
verifier.passed = True
process = Process("cmd")
process._process = mock.Mock()
process._process.poll.return_value = None
process._kill_process = test_call_order
process._process.wait.side_effect = subprocess.TimeoutExpired("", "")
process.stop()
self.assertEqual(verifier.passed, True)
def test_stop_calls_wait(self):
"""Tests that stop() also has the functionality of wait()."""
process = Process("cmd")
process._process = mock.Mock()
process.wait = mock.Mock()
process.stop()
self.assertEqual(process.wait.called, True)
# _redirect_output
def test_redirect_output_feeds_all_lines_to_on_output_callback(self):
"""Tests that _redirect_output loops until all lines are parsed."""
received_list = []
def appender(line):
received_list.append(line)
process = Process("cmd")
process.set_on_output_callback(appender)
process._process = mock.Mock()
process._process.stdout.readline.side_effect = [b"a\n", b"b\n", b""]
process._redirect_output()
self.assertEqual(received_list[0], "a")
self.assertEqual(received_list[1], "b")
self.assertEqual(len(received_list), 2)
# __start_process
def test_start_process_returns_a_popen_object(self):
"""Tests that a Popen object is returned by __start_process."""
with self.patch("subprocess.Popen", return_value="verification"):
self.assertEqual(Process._Process__start_process("cmd"), "verification")
# _exec_loop
def test_exec_loop_redirections_output(self):
"""Tests that the _exec_loop function calls to redirect the output."""
process = Process("cmd")
Process._Process__start_process = mock.Mock()
with self.patch("Thread", FakeThread):
process._exec_loop()
self.assertEqual(process._redirection_thread.target, process._redirect_output)
self.assertEqual(process._redirection_thread.alive, True)
def test_exec_loop_waits_for_process(self):
"""Tests that the _exec_loop waits for the process to complete before
returning."""
process = Process("cmd")
Process._Process__start_process = mock.Mock()
with self.patch("Thread", FakeThread):
process._exec_loop()
self.assertEqual(process._process.wait.called, True)
def test_exec_loop_loops_if_not_stopped(self):
process = Process("1st")
Process._Process__start_process = mock.Mock()
process._on_terminate_callback = mock.Mock(side_effect=[["2nd"], None])
with self.patch("Thread", FakeThread):
process._exec_loop()
self.assertEqual(Process._Process__start_process.call_count, 2)
self.assertEqual(
Process._Process__start_process.call_args_list[0][0], (["1st"],)
)
self.assertEqual(
Process._Process__start_process.call_args_list[1][0], (["2nd"],)
)
def test_exec_loop_does_not_loop_if_stopped(self):
process = Process("1st")
Process._Process__start_process = mock.Mock()
process._on_terminate_callback = mock.Mock(side_effect=["2nd", None])
process._stopped = True
with self.patch("Thread", FakeThread):
process._exec_loop()
self.assertEqual(Process._Process__start_process.call_count, 1)
self.assertEqual(
Process._Process__start_process.call_args_list[0][0], (["1st"],)
)
if __name__ == "__main__":
unittest.main()