blob: a4db890ba9f390d92eee9a2c11b40ec90a1c150b [file] [log] [blame] [edit]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import multiprocessing
import os
import signal
import stat
import tempfile
import unittest
import async_utils.signals
from async_utils import command
class TestCommand(unittest.IsolatedAsyncioTestCase):
def assertStdout(self, event: command.CommandEvent, line: bytes) -> None:
"""Helper to assert on contents of a StdoutEvent.
Args:
event (command.CommandEvent): Event to cast and compare.
line (bytes): Expected line value.
"""
self.assertTrue(isinstance(event, command.StdoutEvent))
assert isinstance(event, command.StdoutEvent)
e: command.StdoutEvent = event
self.assertEqual(e.text, line)
def assertStderr(self, event: command.CommandEvent, line: bytes) -> None:
"""Helper to assert on contents of a StderrEvent.
Args:
event (command.CommandEvent): Event to cast and compare.
line (bytes): Expected line value.
"""
self.assertTrue(isinstance(event, command.StderrEvent))
assert isinstance(event, command.StderrEvent)
e: command.StderrEvent = event
self.assertEqual(e.text, line)
def assertTermination(
self, event: command.CommandEvent, return_code: int
) -> None:
"""Helper to assert on contents of a TerminationEvent.
Args:
event (command.CommandEvent): Event to cast and compare.
return_code (int): Expected return code.
"""
self.assertTrue(isinstance(event, command.TerminationEvent))
assert isinstance(event, command.TerminationEvent)
e: command.TerminationEvent = event
self.assertEqual(e.return_code, return_code)
async def test_basic_command(self) -> None:
"""Test running a basic command and getting the output.
We create a file in a temporary directory and simply assert that `ls`
prints that file as output.
"""
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "temp-file.txt"), "w") as f:
f.write("hello world")
cmd = await command.AsyncCommand.create("ls", ".", env={"CWD": td})
events = []
complete = await cmd.run_to_completion(
lambda event: events.append(event)
)
self.assertEqual(len(events), 2, f"Events was actually {events}")
self.assertStdout(events[0], b"temp-file.txt\n")
self.assertTermination(events[1], 0)
self.assertEqual(complete.stdout, "temp-file.txt\n")
self.assertEqual(complete.return_code, 0)
async def test_command_with_input(self) -> None:
"""Test passing input to a command."""
cmd = await command.AsyncCommand.create(
"cat", input_bytes=b"hello\nworld"
)
events = []
complete = await cmd.run_to_completion(
lambda event: events.append(event)
)
self.assertEqual(len(events), 3, f"Events was actually {events}")
self.assertStdout(events[0], b"hello\n")
self.assertStdout(events[1], b"world")
self.assertTermination(events[2], 0)
self.assertEqual(complete.stdout, "hello\nworld")
self.assertEqual(complete.return_code, 0)
async def test_wrapper_with_input(self) -> None:
"""Test passing input to a command that uses a symbolizer"""
cmd = await command.AsyncCommand.create(
"cat",
symbolizer_args=["sed", "s/hello/goodbye/g"],
input_bytes=b"hello\nworld",
)
events = []
complete = await cmd.run_to_completion(
lambda event: events.append(event)
)
self.assertEqual(len(events), 3, f"Events was actually {events}")
self.assertStdout(events[0], b"goodbye\n")
self.assertStdout(events[1], b"world")
self.assertTermination(events[2], 0)
self.assertEqual(complete.stdout, "goodbye\nworld")
self.assertEqual(complete.return_code, 0)
async def test_long_line_output(self) -> None:
"""Test processing a very large output from a program"""
cmd = await command.AsyncCommand.create(
"cat",
input_bytes=b"a" * 1024 * 1024,
)
events = []
complete = await cmd.run_to_completion(
lambda event: events.append(event)
)
self.assertEqual(len(events), 2, f"Events was actually {events}")
self.assertStdout(events[0], b"a" * 1024 * 1024)
self.assertTermination(events[1], 0)
self.assertEqual(complete.stdout, "a" * 1024 * 1024)
self.assertEqual(complete.return_code, 0)
async def test_basic_command_with_long_timeout(self) -> None:
"""Test running a basic command and getting the output.
We create a file in a temporary directory and simply assert that `ls`
prints that file as output.
"""
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "temp-file.txt"), "w") as f:
f.write("hello world")
cmd = await command.AsyncCommand.create(
"ls", ".", env={"CWD": td}, timeout=3600
)
events = []
complete = await cmd.run_to_completion(
lambda event: events.append(event)
)
self.assertEqual(len(events), 2, f"Events was actually {events}")
self.assertStdout(events[0], b"temp-file.txt\n")
self.assertTermination(events[1], 0)
self.assertEqual(complete.stdout, "temp-file.txt\n")
self.assertEqual(complete.return_code, 0)
self.assertFalse(complete.was_timeout)
async def test_with_stderr(self) -> None:
"""Test running a command with stderr output.
We create a temporary directory and try to `ls` a file we know does not
exist. `ls` should print to stderr and report an error return code.
"""
with tempfile.TemporaryDirectory() as td:
cmd = await command.AsyncCommand.create(
"ls", os.path.join(td, "does-not-exist")
)
complete = await cmd.run_to_completion()
self.assertEqual(complete.stdout, "")
self.assertNotEqual(complete.stderr, "")
self.assertNotEqual(complete.return_code, 0)
async def test_symbolized_command(self) -> None:
"""Test piping output through another program.
We run `ls` as in the above tests, but this time we pipe the output
through `sed` to change the word "temp" to "temporary" and assert on
the new output.
"""
with tempfile.TemporaryDirectory() as td:
with open(os.path.join(td, "temp-file.txt"), "w") as f:
f.write("hello world")
cmd = await command.AsyncCommand.create(
"ls",
".",
env={"CWD": td},
symbolizer_args=["sed", "s/temp/temporary/g"],
)
events = []
await cmd.run_to_completion(lambda event: events.append(event))
self.assertEqual(len(events), 2, f"Events was actually {events}")
self.assertStdout(events[0], b"temporary-file.txt\n")
self.assertTermination(events[1], 0)
async def test_terminate_and_kill(self) -> None:
"""Test that we can terminate and kill programs.
We spawn `sleep` to run for over a day, then terminate it. We expect
the return code to be set by the OS to represent that the program
was killed.
"""
cmd = await command.AsyncCommand.create("sleep", "100000")
task = asyncio.create_task(cmd.run_to_completion())
cmd.terminate()
out: command.CommandOutput = await task
self.assertEqual(out.return_code, -15)
cmd = await command.AsyncCommand.create("sleep", "100000")
task = asyncio.create_task(cmd.run_to_completion())
cmd.kill()
out = await task
self.assertEqual(out.return_code, -9)
cmd = await command.AsyncCommand.create(
"sleep", "100000", symbolizer_args=["sleep", "100000"]
)
task = asyncio.create_task(cmd.run_to_completion())
cmd.terminate()
out = await task
self.assertEqual(out.return_code, -15)
self.assertEqual(out.wrapper_return_code, -15)
cmd = await command.AsyncCommand.create(
"sleep", "100000", symbolizer_args=["sleep", "100000"]
)
task = asyncio.create_task(cmd.run_to_completion())
cmd.kill()
out = await task
self.assertEqual(out.return_code, -9)
self.assertEqual(out.wrapper_return_code, -9)
async def test_kill_process_groups(self) -> None:
"""Test that terminating a program kills the entire process group."""
BASH_SHORT = "#!/usr/bin/env bash\nsleep .1\necho 'OK'"
BASH_LONG = "#!/usr/bin/env bash\nsleep 100000\necho 'OK'"
with tempfile.TemporaryDirectory() as td:
# Create scripts and make them executable.
paths = [
os.path.join(td, "short.sh"),
os.path.join(td, "long.sh"),
]
short_path, long_path = paths
with open(short_path, "w") as f:
f.write(BASH_SHORT)
with open(long_path, "w") as f:
f.write(BASH_LONG)
for name in paths:
st = os.stat(name)
os.chmod(
name,
st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH,
)
# Make sure we can run the commands to start with, and they do not hang.
cmd = await command.AsyncCommand.create(
short_path,
env={"CWD": td},
symbolizer_args=[short_path],
)
events = []
await cmd.run_to_completion(lambda event: events.append(event))
self.assertEqual(len(events), 2, f"Events was actually {events}")
self.assertStdout(events[0], b"OK\n")
# This causes spurious failures on Mac for some reason, where
# the return value is sometimes -13.
# self.assertTermination(events[1], 0)
# Run the long-running shell script, and ensure terminating it does not hang.
cmd = await command.AsyncCommand.create(
long_path,
env={"CWD": td},
symbolizer_args=[long_path],
)
await asyncio.sleep(0.001)
cmd.terminate()
events = []
await cmd.run_to_completion(lambda event: events.append(event))
self.assertEqual(len(events), 1, f"Events was actually {events}")
self.assertTermination(events[0], -15)
# Run again, this time using SIGKILL.
cmd = await command.AsyncCommand.create(
long_path,
env={"CWD": td},
symbolizer_args=[long_path],
)
await asyncio.sleep(0.001)
cmd.kill()
events = []
await cmd.run_to_completion(lambda event: events.append(event))
self.assertEqual(len(events), 1, f"Events was actually {events}")
self.assertTermination(events[0], -9)
async def test_timeout(self) -> None:
"""Test that commands timeout"""
cmd = await command.AsyncCommand.create("sleep", "100000", timeout=0.1)
task = asyncio.create_task(cmd.run_to_completion())
out: command.CommandOutput = await task
self.assertEqual(out.return_code, -15)
self.assertTrue(out.was_timeout)
def test_invalid_program(self) -> None:
"""Test running a program that doesn't exist, and expect an error."""
self.assertRaises(
command.AsyncCommandError,
lambda: asyncio.run(command.AsyncCommand.create("..........")),
)
class TestSignals(unittest.TestCase):
def test_async_signal_handler(self) -> None:
"""Test that registered signal handlers work appropriately."""
multiprocessing.set_start_method("fork", force=True)
output_directory = tempfile.TemporaryDirectory()
self.addCleanup(output_directory.cleanup)
output_file_name = os.path.join(output_directory.name, "output.txt")
def main(output_file_name: str) -> None:
async def internal_main() -> None:
os.kill(os.getpid(), signal.SIGTERM)
await asyncio.sleep(120)
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
fut = asyncio.ensure_future(internal_main())
def write_output() -> None:
with open(output_file_name, "a") as f:
f.write("Handler printed message\n")
fut.cancel()
async_utils.signals.register_on_terminate_signal(write_output)
try:
loop.run_until_complete(fut)
except asyncio.CancelledError:
with open(output_file_name, "a") as f:
f.write("Cancelled\n")
proc = multiprocessing.Process(target=main, args=(output_file_name,))
proc.start()
proc.join()
lines: list[str]
with open(output_file_name, "r") as f:
lines = [line.strip() for line in f.readlines()]
self.assertListEqual(lines, ["Handler printed message", "Cancelled"])
if __name__ == "__main__":
unittest.main()