| #!/usr/bin/env python3 |
| |
| import argparse |
| import os |
| import shlex |
| import subprocess |
| import sys |
| |
| import yaml |
| |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser( |
| description="Replicate and emulate BazelCI job configurations from presubmit.yml." |
| ) |
| parser.add_argument( |
| "job", |
| help="The key or name of the CI job to emulate (e.g., ubuntu_workspace).", |
| ) |
| return parser.parse_args() |
| |
| |
| def run_cmd(cmd, cwd=None, env=None, shell=False): |
| if shell: |
| cmd_str = cmd if isinstance(cmd, str) else " ".join(cmd) |
| else: |
| cmd_str = shlex.join(cmd) if isinstance(cmd, list) else str(cmd) |
| |
| print(f"\nš Executing: {cmd_str}") |
| if cwd and cwd != os.getcwd(): |
| print(f"š Directory: {cwd}") |
| if env and "USE_BAZEL_VERSION" in env: |
| print(f"š§ Bazel Version: {env['USE_BAZEL_VERSION']}") |
| |
| res = subprocess.run(cmd, cwd=cwd, env=env, shell=shell) |
| if res.returncode != 0: |
| print( |
| f"\nā Command failed with return code {res.returncode}: {cmd_str}", |
| file=sys.stderr, |
| ) |
| return False |
| return True |
| |
| |
| def resolve_bazel_version(task_bazel): |
| if not task_bazel or task_bazel.startswith("${{"): |
| return None |
| return task_bazel |
| |
| |
| def execute_ci_job(job_key, task, repo_root): |
| job_name = task.get("name", job_key) |
| print(f"\n{'=' * 80}\nšÆ Replicating CI Job: {job_key} ('{job_name}')\n{'=' * 80}") |
| |
| # Setup working directory |
| cwd = repo_root |
| if "working_directory" in task: |
| cwd = os.path.join(repo_root, task["working_directory"]) |
| if not os.path.exists(cwd): |
| print( |
| f"ā Error: working_directory '{task['working_directory']}' does not exist at '{cwd}'", |
| file=sys.stderr, |
| ) |
| return False |
| |
| # Setup environment |
| env = os.environ.copy() |
| bzl_version = resolve_bazel_version(task.get("bazel")) |
| if bzl_version: |
| env["USE_BAZEL_VERSION"] = bzl_version |
| |
| # Execute pre-commands |
| is_windows = sys.platform.startswith("win") |
| pre_cmds = task.get("batch_commands" if is_windows else "shell_commands", []) |
| for pre_cmd in pre_cmds: |
| if not run_cmd(pre_cmd, cwd=cwd, env=env, shell=True): |
| return False |
| |
| # Execute Build Targets |
| build_targets = [t for t in task.get("build_targets", []) if t != "--"] |
| if build_targets: |
| build_flags = task.get("build_flags", []) |
| cmd = ["bazel", "build"] + build_flags + ["--"] + build_targets |
| if not run_cmd(cmd, cwd=cwd, env=env): |
| return False |
| |
| # Execute Test Targets |
| test_targets = [t for t in task.get("test_targets", []) if t != "--"] |
| if test_targets: |
| test_flags = task.get("test_flags", []) |
| if "--build_tests_only" not in test_flags: |
| test_flags = ["--build_tests_only"] + test_flags |
| cmd = ["bazel", "test"] + test_flags + ["--"] + test_targets |
| if not run_cmd(cmd, cwd=cwd, env=env): |
| return False |
| |
| # Execute Coverage Targets |
| coverage_targets = [t for t in task.get("coverage_targets", []) if t != "--"] |
| if coverage_targets: |
| coverage_flags = task.get("test_flags", []) |
| cmd = ["bazel", "coverage"] + coverage_flags + ["--"] + coverage_targets |
| if not run_cmd(cmd, cwd=cwd, env=env): |
| return False |
| |
| print(f"\nš Successfully replicated CI Job: {job_key}") |
| return True |
| |
| |
| def main(): |
| args = parse_args() |
| |
| repo_root = os.path.abspath(os.path.dirname(__file__)) |
| presubmit_path = os.path.join(repo_root, ".bazelci/presubmit.yml") |
| if not os.path.exists(presubmit_path): |
| print( |
| f"ā Error: Presubmit file not found at '{presubmit_path}'", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
| |
| with open(presubmit_path) as f: |
| presubmit = yaml.safe_load(f) |
| |
| tasks = presubmit.get("tasks", {}) |
| if not tasks: |
| print( |
| f"ā Error: No tasks found in '{presubmit_path}'", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
| |
| # If no job specified, print available jobs and exit |
| if not args.job: |
| print("ā Error: No CI job specified. Provide a job key.\n", file=sys.stderr) |
| print("š Available CI Job Keys:", file=sys.stderr) |
| for key in sorted(tasks.keys()): |
| name = tasks[key].get("name", key) |
| print(f" ⢠{key} ({name})", file=sys.stderr) |
| sys.exit(1) |
| |
| # Match by key or by name |
| job_key = None |
| if args.job in tasks: |
| job_key = args.job |
| else: |
| for key, config in tasks.items(): |
| if config.get("name") == args.job: |
| job_key = key |
| break |
| |
| if not job_key: |
| print( |
| f"ā Error: CI job '{args.job}' not found in '{presubmit_path}'\n", |
| file=sys.stderr, |
| ) |
| print("š Available CI Job Keys:", file=sys.stderr) |
| for key in sorted(tasks.keys()): |
| name = tasks[key].get("name", key) |
| print(f" ⢠{key} ({name})", file=sys.stderr) |
| sys.exit(1) |
| |
| success = execute_ci_job(job_key, tasks[job_key], repo_root) |
| sys.exit(0 if success else 1) |
| |
| |
| if __name__ == "__main__": |
| main() |