blob: 27b9d9b6eaa96195f739fa42c99188e941edc009 [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import functools
import re
from recipe_engine import recipe_api
from PB.infra.roller_metadata import RollerMetadata
class CIPDEnsureApi(recipe_api.RecipeApi):
"""APIs for CIPD ensure files."""
OS_RE = re.compile(r"\${os=?([^}]*)}")
ARCH_RE = re.compile(r"\${arch=?(.*)}")
PLATFORM_RE = re.compile(r"\${platform}")
def __init__(self, **kwargs):
super(CIPDEnsureApi, self).__init__(**kwargs)
self._installed_tools = {}
self._lock = None
def initialize(self):
self._lock = self.m.futures.make_bounded_semaphore()
@functools.lru_cache(maxsize=None)
def get_roller_metadata(self, manifest_path):
"""
Read a roller metadata file and return its contents.
Args:
manifest_path (Path): Path to roller metadata jsonpb which can be
parsed into a RollerMetadata proto message.
Returns:
RollerMetadata: Tool metadata from manifest.
"""
return self.m.file.read_proto(
"read roller metadata",
manifest_path,
RollerMetadata,
"JSONPB",
test_proto=RollerMetadata(
ref="latest",
do_not_autoroll=False,
tag="version",
),
)
def get_packages(self, step_name, ensure_file, test_data=""):
"""Get packages in a CIPD ensure file.
Args:
step_name (str): Name of the step.
ensure_file (Path): Path to ensure file.
Returns:
dict: Mapping of packages to versions.
"""
packages = {}
with self.m.step.nest(step_name):
lines = self.m.file.read_text(
"read ensure file",
ensure_file,
test_data=test_data,
).split("\n")
for line in lines:
# A valid package line is a non-empty line that starts with an
# alphabetic character.
if line[:1].isalpha():
package, version = line.split(" ")
packages[package] = version
return packages
def get_platforms(self, step_name, ensure_file):
"""Get platforms specified in a CIPD ensure file.
Args:
step_name (str): Name of the step.
ensure_file (Path): Path to ensure file.
Returns:
list(str): List of platforms.
"""
platforms = []
with self.m.step.nest(step_name):
lines = self.m.file.read_text("read ensure file", ensure_file).split("\n")
for line in lines:
# Get platforms from "$VerifiedPlatform <platform>...".
if line.startswith("$VerifiedPlatform"):
platforms.extend(line.split(" ")[1:])
return platforms
def expand_packages_by_platforms(self, packages, platforms):
"""Expand packages by platforms.
e.g. for platforms ["linux-amd64", "linux-arm64", "mac-amd64"],
["pkg/${platform}"] -> ["pkg/linux-amd64", "pkg/linux-arm64", "pkg/mac-amd64"]
["pkg/${os}-{arch}"] -> ["pkg/linux-amd64", "pkg/linux-arm64", "pkg/mac-amd64"]
["pkg/${os=linux}-{arch}"] -> ["pkg/linux-amd64", "pkg/linux-arm64"]
["pkg/${os=mac}-{arch=amd64}"] -> ["pkg/mac-amd64"]
["pkg/${os=linux,mac}"] -> ["pkg/linux", "pkg/mac"]
Args:
packages (list(str)): Package names which may contain CIPD magic
strings.
platforms (list(str)): List of platform names.
Returns:
list(str): Expanded list of platform-specific packages.
"""
if not platforms:
return packages
def parse_allowlist(match):
if not match:
return []
allowed = match.group(1)
return allowed.split(",") if allowed else []
expanded_packages = set()
for package in packages:
match_platform = self.PLATFORM_RE.search(package)
match_os = self.OS_RE.search(package)
match_arch = self.ARCH_RE.search(package)
os_allowlist = parse_allowlist(match_os) or [
p.split("-")[0] for p in platforms
]
arch_allowlist = parse_allowlist(match_arch) or [
p.split("-")[1] for p in platforms
]
for platform in platforms:
os, arch = platform.split("-")
# Replace all instances ${platform}, ${os}, and ${arch}. Apply
# allowlists if specified by the package. If an allowlist is
# empty, all values are valid.
if match_platform:
expanded_packages.add(
package.replace(match_platform.group(0), platform)
)
elif match_os and match_arch:
if os in os_allowlist and arch in arch_allowlist:
expanded_packages.add(
package.replace(match_os.group(0), os).replace(
match_arch.group(0), arch
)
)
elif match_os:
if os in os_allowlist:
expanded_packages.add(package.replace(match_os.group(0), os))
elif match_arch:
if arch in arch_allowlist:
expanded_packages.add(
package.replace(match_arch.group(0), arch)
)
else:
expanded_packages.add(package)
return sorted(list(expanded_packages))
def update_packages(
self,
step_name,
ensure_file,
packages,
version,
):
"""Updates an ensure file's packages to the target version.
NOTE: This is a custom parser. This should be relatively safe because
the ensure file format is very stable, but we should eventually make
parsing natively supported by the CIPD client for robustness' sake. See
go/rolling-cipd-ensure-files.
Args:
step_name (str): Name of the step.
ensure_file (Path): Path to ensure file.
packages (seq(str)): Packages to update.
version (str): Target version to update to.
Returns:
list(str): The packages that were updated.
"""
with self.m.step.nest(step_name):
lines = self.m.file.read_text("read ensure file", ensure_file).split("\n")
modified_lines = []
updated_packages = []
for line in lines:
for package in packages:
# Lines which do not start with the name of a package are
# comments, settings, or directives, which should remain
# untouched.
#
# Look for the terminating space character to ensure that we
# don't accidentally touch a package whose name is a prefix
# of another package.
if line.startswith(f"{package} ") and not line.endswith(version):
line = f"{package} {version}"
updated_packages.append(package)
break
modified_lines.append(line)
self.m.file.write_text(
"write ensure file", ensure_file, "\n".join(modified_lines)
)
return updated_packages
def __call__(self, ensure_file, package_name, executable_path=None):
"""
Given an ensure file, ensure a tool within the file and return its Path.
Args:
ensure_file (Path): Path to ensure file.
package_name (str): Package name within the ensure file.
executable_path (Path): Passed to api.cipd.ensure_tool().
Returns:
Path: Path to tool.
"""
cache_key = (package_name, str(ensure_file))
with self._lock:
if cache_key not in self._installed_tools:
with self.m.step.nest(f"ensure {package_name}"):
packages = self.get_packages(
"get packages",
ensure_file,
test_data=f"{package_name} version:pinned-version",
)
self._installed_tools[cache_key] = self.m.cipd.ensure_tool(
package_name,
packages[package_name],
executable_path=executable_path,
)
return self._installed_tools[cache_key]