blob: e7c8719b6bd510a52ae71d84550b9f637ce2b8fd [file] [log] [blame] [edit]
#!/usr/bin/env python3
# Copyright 2024 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setup a GNU Make Jobserver token pool then launch a command with it.
On Windows, this only supports the semaphore-based scheme.
On Posix, this uses a FIFO by default, except if `--pipe` is used.
NOTE: This is a basic implementation that doesn't support broken
protocol clients that release more tokens than they acquired
to the pool. Using these in your build will result in extra job
slots being created, severely degrading overall performance
over time.
See --help-usage for usage examples.
"""
import argparse
import os
import platform
import subprocess
import sys
import typing as T
_DEFAULT_NAME = "jobserver_tokens"
_IS_WINDOWS = sys.platform in ("win32", "cygwin")
if _IS_WINDOWS:
try:
# This requires pywin32 to be installed.
import win32event
import win32api
import winerror
except ModuleNotFoundError as e:
print(
"\nERROR: Could not import Win32 API, please install pywin32, e.g. `python -m pip install pywin32`.\n",
file=sys.stderr,
)
raise e
def create_sem(
sem_name: str, token_count: int
) -> T.Tuple[win32event.PyHandle, T.Dict[str, str]]:
"""Create and initialize Win32 semaphore."""
assert token_count > 0, f"Token count must be strictly positive"
handle = win32event.CreateSemaphore(
None, token_count, token_count - 1, sem_name # Default security attributes,
)
assert handle != 0, f"Error creating Win32 semaphore {winerror.GetLastError()}"
env = dict(os.environ)
env["MAKEFLAGS"] = " -j{token_count} --jobserver-auth=" + sem_name
return handle, env
def check_sem_count(handle: win32event.PyHANDLE, token_count: int) -> int:
if token_count <= 1:
# Nothing to check here.
return 0
expected_count = token_count - 1
read_count = win32event.ReleaseSemaphore(handle, 1)
if read_count < expected_count:
print(
f"ERROR: {expected_count - read_count} were missing from the token pool (got {read_count}, expected {expected_count})",
file=sys.stderr,
)
return 1
if read_count > expected_count:
print(
f"ERROR: {read_count - expected_count} extra tokens were released to the token pool (got {read_count}, expected {expected_count})",
file=sys.stderr,
)
return 1
return 0
def print_usage() -> int:
print(
"""Example usage:
# Start <command> after setting the server to provide as many tokens
# as available CPUs (the default)
python \\path\\to\\jobserver_pool.py <command>
# Start <command> with a fixed number of tokens
python \\path\\to\\jobserver_pool.py --token-count=10 <command>
# Disable the feature with a non-positive count. This is equivalent
# to running <command> directly.
python \\path\\to\\jobserver_pool.py --token-count=0 <command>
# Use a specific semaphore name
python \\path\\to\\jobserver_pool.py --name my_build_jobs <command>
# Setup jobserver then start new interactive PowerShell
# session, print MAKEFLAGS value, build stuff, then exit.
python \\path\\to\\jobserver_pool.py
$env:MAKEFLAGS
... build stuff ...
exit
"""
)
return 0
else: # !_IS_WINDOWS
def create_pipe(token_count: int) -> T.Tuple[int, int, T.Dict[str, str]]:
"""Create and fill Posix PIPE."""
read_fd, write_fd = os.pipe()
os.set_inheritable(read_fd, True)
os.set_inheritable(write_fd, True)
assert token_count > 0, f"Token count must be strictly positive"
os.write(write_fd, (token_count - 1) * b"x")
env = dict(os.environ)
env["MAKEFLAGS"] = (
f" -j{token_count} --jobserver-fds={read_fd},{write_fd} --jobserver-auth={read_fd},{write_fd}"
)
return read_fd, write_fd, env
def create_fifo(path: str, token_count: int) -> T.Tuple[int, int, T.Dict[str, str]]:
"""Create and fill Posix FIFO."""
if os.path.exists(path):
os.remove(path)
os.mkfifo(path)
read_fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK)
write_fd = os.open(path, os.O_WRONLY | os.O_NONBLOCK)
assert token_count > 0, f"Token count must be strictly positive"
os.write(write_fd, (token_count - 1) * b"x")
env = dict(os.environ)
env["MAKEFLAGS"] = f" -j{token_count} --jobserver-auth=fifo:" + path
return read_fd, write_fd, env
def print_usage() -> int:
print(
"""Example usage:
# Start <command> after setting the job pool to provide as many tokens
# as available CPUs (the default)
/path/to/jobserver_pool.py <command>
# Start <command> with a fixed number of tokens
/path/to/jobserver_pool.py --token-count=10 <command>
# Disable the feature with a non-positive count. This is equivalent
# to running <command> directly.
/path/to/jobserver_pool.py --token-count=0 <command>
# Use a specific FIFO path
/path/to/jobserver_pool.py --fifo /tmp/my_build_jobs <command>
# Setup jobserver then start new interactive Bash shell
# session, print MAKEFLAGS value, build stuff, then exit.
/path/to/jobserver_pool.py bash -i
echo "$MAKEFLAGS"
... build stuff ...
exit
"""
)
return 0
def check_pipe_tokens(read_fd: int, token_count: int) -> int:
if token_count <= 1: # Nothing to check
return 0
# Remove implicit token from the expected count.
expected_count = token_count - 1
os.set_blocking(read_fd, False)
read_count = 0
while True:
try:
token = os.read(read_fd, 1)
if len(token) == 0: # End of pipe?
break
read_count += 1
except BlockingIOError:
break
if read_count < expected_count:
print(
f"ERROR: {expected_count - read_count} were missing from the token pool (got {read_count}, expected {expected_count})",
file=sys.stderr,
)
return 1
if read_count > expected_count:
print(
f"ERROR: {read_count - expected_count} extra tokens were released to the token pool (got {read_count}, expected {expected_count})",
file=sys.stderr,
)
return 1
return 0
def main() -> int:
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawTextHelpFormatter
)
if _IS_WINDOWS:
parser.add_argument(
"--name",
help=f"Specify semaphore name, default is {_DEFAULT_NAME}",
default=_DEFAULT_NAME,
)
else:
mutex_group = parser.add_mutually_exclusive_group()
mutex_group.add_argument(
"--pipe",
action="store_true",
help="Implement the pool with a Unix pipe (the default)",
)
mutex_group.add_argument(
"--fifo",
help=f"Specify FIFO file path, default is $(pwd)/{_DEFAULT_NAME}",
default=os.path.abspath(_DEFAULT_NAME),
)
parser.add_argument(
"--check",
action="store_true",
help="Check that all job slots were returned to the pool on exit.",
)
parser.add_argument(
"--help-usage", action="store_true", help="Print usage examples."
)
parser.add_argument(
"--token-count",
action="store",
type=int,
default=os.cpu_count(),
help="Set token count, default is available CPUs count",
)
parser.add_argument("command", nargs=argparse.REMAINDER, help="Command to run.")
args = parser.parse_args()
if args.help_usage:
return print_usage()
if not args.command:
parser.error("This script requires at least one command argument!")
token_count = args.token_count
if token_count <= 0:
# Disable the feature.
ret = subprocess.run(args.command)
exit_code = ret.returncode
elif _IS_WINDOWS:
# Run with a Window semaphore.
handle, env = create_sem(args.name, token_count)
try:
ret = subprocess.run(args.command, env=env)
exit_code = ret.returncode
if exit_code == 0 and args.check:
exit_code = check_sem_count(handle, token_count)
finally:
win32api.CloseHandle(handle)
else:
# Run with pipe descriptors.
exit_code = 0
delete_fifo = ""
try:
if args.pipe:
read_fd, write_fd, env = create_pipe(int(args.token_count))
ret = subprocess.run(
args.command, env=env, pass_fds=(read_fd, write_fd)
)
elif args.fifo:
read_fd, write_fd, env = create_fifo(args.fifo, int(args.token_count))
ret = subprocess.run(args.command, env=env)
delete_fifo = args.fifo
exit_code = ret.returncode
if exit_code == 0 and args.check:
exit_code = check_pipe_tokens(read_fd, token_count)
finally:
os.close(read_fd)
os.close(write_fd)
if delete_fifo:
os.remove(delete_fifo)
return exit_code
if __name__ == "__main__":
sys.exit(main())