| #!/usr/bin/env python3 |
| |
| """Runs ./ninja in persistent mode and checks if the output is correct. |
| |
| In order to simulate a smart terminal it uses the 'script' command. |
| """ |
| import os |
| import platform |
| import subprocess |
| import sys |
| import tempfile |
| import unittest |
| |
| default_env = dict(os.environ) |
| for varname in ("NINJA_STATUS", "NINJA_STATUS_MAX_COMMANDS", "CLICOLOR_FORCE"): |
| if varname in default_env: |
| del default_env[varname] |
| |
| default_env["TERM"] = "" |
| default_env["NINJA_PERSISTENT_MODE"] = "1" |
| default_env["DEBUG_PERSISTENT_SERVICE_LOG_FILE"] = "/tmp/DLOG" |
| default_env["NINJA_PERSISTENT_LOG_FILE"] = "/tmp/ELOG" |
| NINJA_PATH = os.path.abspath("./ninja") |
| |
| |
| class NinjaPersistentInstance(object): |
| def __init__(self, build_ninja: str): |
| """Initialize instance. |
| |
| This creates a temporary directory, cd to it, then writes a build.ninja |
| plan in it that will be used by future run() calls. |
| |
| Args: |
| build_ninja: The content of a "build.ninja" file |
| """ |
| self._server_running = False |
| self._tempdir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) |
| self._dir = self._tempdir.name |
| self.write_build_ninja(build_ninja) |
| |
| def write_build_ninja(self, build_ninja): |
| """Rewrite the build.ninja file with new content.""" |
| self.write_file("build.ninja", build_ninja) |
| |
| def write_file(self, path, content): |
| with open(os.path.join(self._dir, path), "w") as f: |
| f.write(content) |
| f.flush() |
| |
| def close(self): |
| """Close the instance, ensuring the persistent server is stopped.""" |
| if self._server_running: |
| self.run(flags="-t server stop") |
| self._server_running = False |
| |
| if self._tempdir: |
| self._tempdir.cleanup() |
| self._tempdir = None |
| |
| def server_status(self): |
| return self.run(flags="-t server status") |
| |
| def has_server(self): |
| status = self.server_status() |
| return status == f"server is running for {self._dir}\n" |
| |
| def server_pid(self): |
| """Return PID of server process, or -1 if there is none.""" |
| return int(self.run(flags="-t server pid").strip()) |
| |
| def run(self, flags="", pipe=False, env=default_env): |
| """Run a Ninja command.""" |
| self._server_running = True |
| ninja_cmd = "{} {}".format(NINJA_PATH, flags).strip() |
| try: |
| if pipe: |
| output = subprocess.check_output( |
| ninja_cmd, shell=True, cwd=self._dir, env=env |
| ) |
| elif platform.system() == "Darwin": |
| output = subprocess.check_output( |
| ["script", "-q", "/dev/null", "bash", "-c", ninja_cmd], |
| cwd=self._dir, |
| env=env, |
| ) |
| else: |
| output = subprocess.check_output( |
| ["script", "-qfec", ninja_cmd, "/dev/null"], cwd=self._dir, env=env |
| ) |
| except subprocess.CalledProcessError as err: |
| sys.stdout.buffer.write(err.output) |
| raise err |
| |
| final_output = "" |
| for line in output.decode("utf-8").splitlines(True): |
| if len(line) > 0 and line[-1] == "\r": |
| continue |
| final_output += line.replace("\r", "") |
| return final_output |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.close() |
| return False |
| |
| |
| @unittest.skipIf( |
| platform.system() == "Windows", "These test methods do not work on Windows" |
| ) |
| class Output(unittest.TestCase): |
| BUILD_SIMPLE_ECHO = "\n".join( |
| ( |
| "rule echo", |
| ' command = printf "do thing"', |
| " description = echo $out", |
| "", |
| "build a: echo", |
| "", |
| ) |
| ) |
| |
| def test_start_persistent_server(self): |
| with NinjaPersistentInstance(Output.BUILD_SIMPLE_ECHO) as ninja: |
| self.assertFalse(ninja.has_server()) |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| server_pid = ninja.server_pid() |
| self.assertNotEqual(server_pid, -1) |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| self.assertEqual(server_pid, ninja.server_pid()) |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| self.assertEqual(server_pid, ninja.server_pid()) |
| |
| def test_stop_persistent_server(self): |
| with NinjaPersistentInstance(Output.BUILD_SIMPLE_ECHO) as ninja: |
| self.assertFalse(ninja.has_server()) |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| |
| server_pid = ninja.server_pid() |
| self.assertNotEqual(server_pid, -1) |
| |
| ninja.run(flags="-t server stop") |
| self.assertFalse(ninja.has_server()) |
| self.assertEqual(ninja.server_pid(), -1) |
| |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| new_server_pid = ninja.server_pid() |
| self.assertNotEqual(new_server_pid, -1) |
| self.assertNotEqual(new_server_pid, server_pid) |
| |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| self.assertEqual(new_server_pid, ninja.server_pid()) |
| |
| def test_ninja_status(self): |
| env = default_env.copy() |
| env["NINJA_STATUS"] = "STATUS [%f/%t] " |
| with NinjaPersistentInstance(Output.BUILD_SIMPLE_ECHO) as ninja: |
| self.assertFalse(ninja.has_server()) |
| non_persistent_env = env.copy() |
| non_persistent_env["NINJA_PERSISTENT_MODE"] = "0" |
| self.assertEqual( |
| ninja.run(env=non_persistent_env), |
| "STATUS [1/1] echo a\x1b[K\ndo thing\n", |
| ) |
| self.assertFalse(ninja.has_server()) |
| |
| env["NINJA_PERSISTENT_STATUS"] = "1" |
| self.assertEqual( |
| ninja.run(env=env), "STATUS [1/1] echo a\x1b[K\ndo thing\n" |
| ) |
| self.assertTrue(ninja.has_server()) |
| |
| env["NINJA_STATUS"] = "NOPE" |
| self.assertEqual(ninja.run(env=env), "NOPEecho a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| |
| def test_environment_variables(self): |
| build_ninja = """ |
| rule print_var |
| command = printf "%s=[%s]" $varname $$$varname |
| description = print $varname |
| |
| build foo: print_var |
| varname = foo |
| """ |
| env = default_env.copy() |
| if "foo" in env: |
| del env["foo"] |
| |
| with NinjaPersistentInstance(build_ninja) as ninja: |
| self.assertFalse(ninja.has_server()) |
| self.assertEqual(ninja.run(env=env), "[1/1] print foo\x1b[K\nfoo=[]\n") |
| self.assertTrue(ninja.has_server()) |
| self.assertEqual(ninja.run(env=env), "[1/1] print foo\x1b[K\nfoo=[]\n") |
| self.assertTrue(ninja.has_server()) |
| env["foo"] = "FOO" |
| self.assertEqual(ninja.run(env=env), "[1/1] print foo\x1b[K\nfoo=[FOO]\n") |
| self.assertTrue(ninja.has_server()) |
| del env["foo"] |
| self.assertEqual(ninja.run(env=env), "[1/1] print foo\x1b[K\nfoo=[]\n") |
| self.assertTrue(ninja.has_server()) |
| |
| def test_server_restart_on_input_file_change(self): |
| with NinjaPersistentInstance(Output.BUILD_SIMPLE_ECHO) as ninja: |
| self.assertFalse(ninja.has_server()) |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| |
| ninja.write_build_ninja(Output.BUILD_SIMPLE_ECHO) |
| server_pid = ninja.server_pid() |
| |
| self.assertEqual(ninja.run(), "[1/1] echo a\x1b[K\ndo thing\n") |
| self.assertTrue(ninja.has_server()) |
| self.assertNotEqual(server_pid, ninja.server_pid()) |
| |
| def test_server_restart_on_generator_run(self): |
| build_ninja = """ |
| rule echo |
| command = printf "do thing" |
| description = echo $out |
| |
| build a: echo |
| |
| rule regen |
| command = cp input.depfile $out.d && touch build.ninja |
| depfile = $out.d |
| generator = 1 |
| |
| build build.ninja: regen |
| """ |
| with NinjaPersistentInstance(build_ninja) as ninja: |
| # Launch the server. |
| ninja.write_file("CMakeLists.txt", "# Fake original config file") |
| ninja.write_file("input.depfile", "build.ninja: CMakeLists.txt") |
| self.assertFalse(ninja.has_server()) |
| ninja.run(flags="a") |
| self.assertTrue(ninja.has_server()) |
| server_pid = ninja.server_pid() |
| |
| ninja.write_file("CMakeLists.txt", "# Fake update to config file") |
| ninja.run(flags="a") |
| self.assertTrue(ninja.has_server()) |
| self.assertNotEqual(ninja.server_pid(), server_pid) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |