| # Copyright 2021 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import functools |
| import re |
| |
| from recipe_engine import recipe_api |
| |
| from PB.infra.roller_metadata import RollerMetadata |
| |
| |
| class CIPDEnsureApi(recipe_api.RecipeApi): |
| """APIs for CIPD ensure files.""" |
| |
| OS_RE = re.compile(r"\${os=?([^}]*)}") |
| ARCH_RE = re.compile(r"\${arch=?(.*)}") |
| PLATFORM_RE = re.compile(r"\${platform}") |
| |
| def __init__(self, **kwargs): |
| super(CIPDEnsureApi, self).__init__(**kwargs) |
| self._installed_tools = {} |
| self._lock = None |
| |
| def initialize(self): |
| self._lock = self.m.futures.make_bounded_semaphore() |
| |
| @functools.lru_cache(maxsize=None) |
| def get_roller_metadata(self, manifest_path): |
| """ |
| Read a roller metadata file and return its contents. |
| |
| Args: |
| manifest_path (Path): Path to roller metadata jsonpb which can be |
| parsed into a RollerMetadata proto message. |
| |
| Returns: |
| RollerMetadata: Tool metadata from manifest. |
| """ |
| return self.m.file.read_proto( |
| "read roller metadata", |
| manifest_path, |
| RollerMetadata, |
| "JSONPB", |
| test_proto=RollerMetadata( |
| ref="latest", |
| do_not_autoroll=False, |
| tag="version", |
| ), |
| ) |
| |
| def get_packages(self, step_name, ensure_file, test_data=""): |
| """Get packages in a CIPD ensure file. |
| |
| Args: |
| step_name (str): Name of the step. |
| ensure_file (Path): Path to ensure file. |
| |
| Returns: |
| dict: Mapping of packages to versions. |
| """ |
| packages = {} |
| with self.m.step.nest(step_name): |
| lines = self.m.file.read_text( |
| "read ensure file", |
| ensure_file, |
| test_data=test_data, |
| ).split("\n") |
| for line in lines: |
| # A valid package line is a non-empty line that starts with an |
| # alphabetic character. |
| if line[:1].isalpha(): |
| package, version = line.split(" ") |
| packages[package] = version |
| return packages |
| |
| def get_platforms(self, step_name, ensure_file): |
| """Get platforms specified in a CIPD ensure file. |
| |
| Args: |
| step_name (str): Name of the step. |
| ensure_file (Path): Path to ensure file. |
| |
| Returns: |
| list(str): List of platforms. |
| """ |
| platforms = [] |
| with self.m.step.nest(step_name): |
| lines = self.m.file.read_text("read ensure file", ensure_file).split("\n") |
| for line in lines: |
| # Get platforms from "$VerifiedPlatform <platform>...". |
| if line.startswith("$VerifiedPlatform"): |
| platforms.extend(line.split(" ")[1:]) |
| return platforms |
| |
| def expand_packages_by_platforms(self, packages, platforms): |
| """Expand packages by platforms. |
| |
| e.g. for platforms ["linux-amd64", "linux-arm64", "mac-amd64"], |
| |
| ["pkg/${platform}"] -> ["pkg/linux-amd64", "pkg/linux-arm64", "pkg/mac-amd64"] |
| ["pkg/${os}-{arch}"] -> ["pkg/linux-amd64", "pkg/linux-arm64", "pkg/mac-amd64"] |
| ["pkg/${os=linux}-{arch}"] -> ["pkg/linux-amd64", "pkg/linux-arm64"] |
| ["pkg/${os=mac}-{arch=amd64}"] -> ["pkg/mac-amd64"] |
| ["pkg/${os=linux,mac}"] -> ["pkg/linux", "pkg/mac"] |
| |
| Args: |
| packages (list(str)): Package names which may contain CIPD magic |
| strings. |
| platforms (list(str)): List of platform names. |
| |
| Returns: |
| list(str): Expanded list of platform-specific packages. |
| """ |
| if not platforms: |
| return packages |
| |
| def parse_allowlist(match): |
| if not match: |
| return [] |
| allowed = match.group(1) |
| return allowed.split(",") if allowed else [] |
| |
| expanded_packages = set() |
| |
| for package in packages: |
| match_platform = self.PLATFORM_RE.search(package) |
| match_os = self.OS_RE.search(package) |
| match_arch = self.ARCH_RE.search(package) |
| os_allowlist = parse_allowlist(match_os) or [ |
| p.split("-")[0] for p in platforms |
| ] |
| arch_allowlist = parse_allowlist(match_arch) or [ |
| p.split("-")[1] for p in platforms |
| ] |
| |
| for platform in platforms: |
| os, arch = platform.split("-") |
| # Replace all instances ${platform}, ${os}, and ${arch}. Apply |
| # allowlists if specified by the package. If an allowlist is |
| # empty, all values are valid. |
| if match_platform: |
| expanded_packages.add( |
| package.replace(match_platform.group(0), platform) |
| ) |
| elif match_os and match_arch: |
| if os in os_allowlist and arch in arch_allowlist: |
| expanded_packages.add( |
| package.replace(match_os.group(0), os).replace( |
| match_arch.group(0), arch |
| ) |
| ) |
| elif match_os: |
| if os in os_allowlist: |
| expanded_packages.add(package.replace(match_os.group(0), os)) |
| elif match_arch: |
| if arch in arch_allowlist: |
| expanded_packages.add( |
| package.replace(match_arch.group(0), arch) |
| ) |
| else: |
| expanded_packages.add(package) |
| |
| return sorted(list(expanded_packages)) |
| |
| def update_packages( |
| self, |
| step_name, |
| ensure_file, |
| packages, |
| version, |
| ): |
| """Updates an ensure file's packages to the target version. |
| |
| NOTE: This is a custom parser. This should be relatively safe because |
| the ensure file format is very stable, but we should eventually make |
| parsing natively supported by the CIPD client for robustness' sake. See |
| go/rolling-cipd-ensure-files. |
| |
| Args: |
| step_name (str): Name of the step. |
| ensure_file (Path): Path to ensure file. |
| packages (seq(str)): Packages to update. |
| version (str): Target version to update to. |
| |
| Returns: |
| list(str): The packages that were updated. |
| """ |
| with self.m.step.nest(step_name): |
| lines = self.m.file.read_text("read ensure file", ensure_file).split("\n") |
| modified_lines = [] |
| updated_packages = [] |
| |
| for line in lines: |
| for package in packages: |
| # Lines which do not start with the name of a package are |
| # comments, settings, or directives, which should remain |
| # untouched. |
| # |
| # Look for the terminating space character to ensure that we |
| # don't accidentally touch a package whose name is a prefix |
| # of another package. |
| if line.startswith(f"{package} ") and not line.endswith(version): |
| line = f"{package} {version}" |
| updated_packages.append(package) |
| break |
| modified_lines.append(line) |
| |
| self.m.file.write_text( |
| "write ensure file", ensure_file, "\n".join(modified_lines) |
| ) |
| return updated_packages |
| |
| def __call__(self, ensure_file, package_name, executable_path=None): |
| """ |
| Given an ensure file, ensure a tool within the file and return its Path. |
| |
| Args: |
| ensure_file (Path): Path to ensure file. |
| package_name (str): Package name within the ensure file. |
| executable_path (Path): Passed to api.cipd.ensure_tool(). |
| |
| Returns: |
| Path: Path to tool. |
| """ |
| |
| cache_key = (package_name, str(ensure_file)) |
| |
| with self._lock: |
| if cache_key not in self._installed_tools: |
| with self.m.step.nest(f"ensure {package_name}"): |
| packages = self.get_packages( |
| "get packages", |
| ensure_file, |
| test_data=f"{package_name} version:pinned-version", |
| ) |
| self._installed_tools[cache_key] = self.m.cipd.ensure_tool( |
| package_name, |
| packages[package_name], |
| executable_path=executable_path, |
| ) |
| |
| return self._installed_tools[cache_key] |