blob: dc384c5e21df16a6e29ca967593f861582a192b6 [file]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2026 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import os
import signal
import subprocess
import unittest
from typing import Any
from unittest import mock
import signal_utils
class SignalUtilsTest(unittest.TestCase):
@mock.patch.object(os, "kill")
@mock.patch.object(os, "killpg")
@mock.patch.object(os, "getpgid")
@mock.patch.object(signal, "signal")
def test_wait_and_forward_signals_leader(
self,
mock_signal: mock.Mock,
mock_getpgid: mock.Mock,
mock_killpg: mock.Mock,
mock_kill: mock.Mock,
) -> None:
"""Verify that signals are forwarded to the PGID if the child is a group leader."""
mock_process = mock.Mock(spec=subprocess.Popen)
mock_process.pid = 5678
# Simulate that the process is a group leader (PGID == PID).
# This happens when using os.setpgrp in main_build.py.
mock_getpgid.return_value = 5678
registered_handlers = {}
def fake_signal(sig: int, handler: Any) -> Any:
registered_handlers[sig] = handler
return signal.SIG_DFL
mock_signal.side_effect = fake_signal
def trigger_handler_during_wait() -> int:
# This is called when wait_and_forward_signals calls process.wait().
# We trigger our custom signal handler while it is waiting.
handler = registered_handlers[signal.SIGINT]
handler(signal.SIGINT, None)
return 0
mock_process.wait.side_effect = trigger_handler_during_wait
# Since a SIGINT was received, the function should raise BuildInterruptedError
# after the process has "finished".
with self.assertRaises(signal_utils.BuildInterruptedError) as cm:
signal_utils.wait_and_forward_signals(mock_process)
# Child returned 0, so parent should upgrade to 130 (SIGINT).
self.assertEqual(cm.exception.return_code, 130)
self.assertEqual(cm.exception.signum, signal.SIGINT)
# Because the child was a group leader, we must have called killpg
# to ensure the entire sub-tree (including Bazel's children) is signaled.
mock_killpg.assert_called_once_with(5678, signal.SIGINT)
mock_kill.assert_not_called()
@mock.patch.object(os, "kill")
@mock.patch.object(os, "killpg")
@mock.patch.object(os, "getpgid")
@mock.patch.object(signal, "signal")
def test_wait_and_forward_signals_follower(
self,
mock_signal: mock.Mock,
mock_getpgid: mock.Mock,
mock_killpg: mock.Mock,
mock_kill: mock.Mock,
) -> None:
"""Verify that signals are forwarded to the PID only if the child is not a group leader."""
mock_process = mock.Mock(spec=subprocess.Popen)
mock_process.pid = 5678
# Simulate that the process is NOT a group leader (PGID != PID).
# This happens for simple linear wrapper chains without isolation.
mock_getpgid.return_value = 1234
registered_handlers = {}
def fake_signal(sig: int, handler: Any) -> Any:
registered_handlers[sig] = handler
return signal.SIG_DFL
mock_signal.side_effect = fake_signal
def trigger_handler_during_wait() -> int:
handler = registered_handlers[signal.SIGINT]
handler(signal.SIGINT, None)
return 0
mock_process.wait.side_effect = trigger_handler_during_wait
with self.assertRaises(signal_utils.BuildInterruptedError) as cm:
signal_utils.wait_and_forward_signals(mock_process)
# Child returned 0, so parent should upgrade to 130 (SIGINT).
self.assertEqual(cm.exception.return_code, 130)
self.assertEqual(cm.exception.signum, signal.SIGINT)
# Because the child was just a follower, we must have called kill (PID)
# to avoid "bombarding" the entire process group multiple times.
mock_kill.assert_called_once_with(5678, signal.SIGINT)
mock_killpg.assert_not_called()
@mock.patch.object(os, "kill")
@mock.patch.object(os, "killpg")
@mock.patch.object(os, "getpgid")
@mock.patch.object(signal, "signal")
def test_wait_and_forward_signals_failure_preserved(
self,
mock_signal: mock.Mock,
mock_getpgid: mock.Mock,
mock_killpg: mock.Mock,
mock_kill: mock.Mock,
) -> None:
"""Verify that child failure code is preserved even when interrupted."""
mock_process = mock.Mock(spec=subprocess.Popen)
mock_process.pid = 5678
mock_getpgid.return_value = 5678 # leader
registered_handlers = {}
mock_signal.side_effect = (
lambda s, h: registered_handlers.update({s: h}) or signal.SIG_DFL
)
def trigger_handler_during_wait() -> int:
handler = registered_handlers[signal.SIGINT]
handler(signal.SIGINT, None)
# Child failed during interrupt.
return 1
mock_process.wait.side_effect = trigger_handler_during_wait
with self.assertRaises(signal_utils.BuildInterruptedError) as cm:
signal_utils.wait_and_forward_signals(mock_process)
# Child returned 1, so parent should return 1.
self.assertEqual(cm.exception.return_code, 1)
self.assertEqual(cm.exception.signum, signal.SIGINT)
@mock.patch.object(os, "kill")
@mock.patch.object(os, "killpg")
@mock.patch.object(os, "getpgid")
@mock.patch.object(signal, "signal")
def test_wait_and_forward_signals_negative_rc_conversion(
self,
mock_signal: mock.Mock,
mock_getpgid: mock.Mock,
mock_killpg: mock.Mock,
mock_kill: mock.Mock,
) -> None:
"""Verify that negative signal status is converted to shell-style (128+N)."""
mock_process = mock.Mock(spec=subprocess.Popen)
mock_process.pid = 5678
mock_getpgid.return_value = 5678 # leader
registered_handlers = {}
mock_signal.side_effect = (
lambda s, h: registered_handlers.update({s: h}) or signal.SIG_DFL
)
def trigger_handler_during_wait() -> int:
handler = registered_handlers[signal.SIGINT]
handler(signal.SIGINT, None)
# Child killed by SIGTERM (15). Subprocess returns -15.
return -15
mock_process.wait.side_effect = trigger_handler_during_wait
with self.assertRaises(signal_utils.BuildInterruptedError) as cm:
signal_utils.wait_and_forward_signals(mock_process)
# 128 - (-15) = 143.
self.assertEqual(cm.exception.return_code, 143)
self.assertEqual(cm.exception.signum, signal.SIGINT)
@mock.patch.object(os, "kill")
@mock.patch.object(os, "killpg")
@mock.patch.object(os, "getpgid")
@mock.patch.object(signal, "signal")
def test_wait_and_forward_signals_multiple_succession(
self,
mock_signal: mock.Mock,
mock_getpgid: mock.Mock,
mock_killpg: mock.Mock,
mock_kill: mock.Mock,
) -> None:
"""Verify that multiple different signals are all forwarded correctly."""
mock_process = mock.Mock(spec=subprocess.Popen)
mock_process.pid = 5678
mock_getpgid.return_value = 5678 # leader
registered_handlers = {}
mock_signal.side_effect = (
lambda s, h: registered_handlers.update({s: h}) or signal.SIG_DFL
)
def trigger_handlers_during_wait() -> int:
# Trigger SIGINT
registered_handlers[signal.SIGINT](signal.SIGINT, None)
# Trigger SIGTERM
registered_handlers[signal.SIGTERM](signal.SIGTERM, None)
# Child killed by SIGTERM (15)
return -15
mock_process.wait.side_effect = trigger_handlers_during_wait
with self.assertRaises(signal_utils.BuildInterruptedError) as cm:
signal_utils.wait_and_forward_signals(mock_process)
# Both signals should have been forwarded to the process group.
mock_killpg.assert_has_calls(
[
mock.call(5678, signal.SIGINT),
mock.call(5678, signal.SIGTERM),
]
)
# Since child died of SIGTERM (-15), status should be 143.
self.assertEqual(cm.exception.return_code, 143)
# The last received signal was SIGTERM.
self.assertEqual(cm.exception.signum, signal.SIGTERM)
if __name__ == "__main__":
unittest.main()