blob: 920543292771e2a8f335457d89fef4b546867d55 [file] [log] [blame]
#!/usr/bin/env python3.8
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Use scrutiny to verify a build."""
import argparse
import difflib
import json
import os
import re
import shlex
import subprocess
import sys
import tempfile
SUPPORTED_TYPES = ['kernel_cmdline', 'bootfs_filelist', 'static_pkgs']
SOFT_TRANSITION_MESSAGE = """
If you are making a change in fuchsia.git that causes this, you need to perform a soft transition:
1: Instead of adding lines as written above, add each line prefixed with a question mark to mark it as transitional.
2: Instead of removing lines as written above, prefix the line with a question mark to mark it as transitional.
3: Check in your fuchsia.git change.
4: For each new line you added in 1, remove the question mark.
5: For each existing line you modified in 2, remove the line.
"""
class GoldenFileChecker(object):
def __init__(self, path):
self._path = path
self._checked = False
try:
with open(path, "r") as f:
self.orig_lines = f.read().strip().splitlines()
except IOError as e:
raise VerificationError(f'Failed to open golden file: {e}')
# Parse file contents.
self.required_contents = set()
self.optional_contents = set()
for line in self.orig_lines:
if line.startswith("#"):
# A leading # indicates a comment, and we ignore the rest of the line.
continue
elif line.startswith("?"):
# A leading ? indicates an optional line, and we allow the presence or
# absence of the noun indicated by the rest of the line
self.optional_contents.add(line[1:])
else:
self.required_contents.add(line)
# Prepare records.
self.observed_and_not_permitted = []
self.observed = set()
def check_match(self, observed_contents):
assert not self._checked
self._checked = True
# Verify that:
# 1) every line in observed_contents is allowed in either
# required_contents or optional_contents
# 2) every line in required_contents is observed
for line in observed_contents:
if line not in self.required_contents and line not in self.optional_contents:
self.observed_and_not_permitted.append(line)
else:
self.observed.add(line)
errors = []
for extra in sorted(list(self.observed_and_not_permitted)):
errors.append(f"'{extra}' is not listed in {self._path} but was found in the build. If the addition to the build was intended, add a line '{extra}' to {self._path}.")
for missing in sorted(list(self.required_contents - self.observed)):
errors.append(f"'{missing}' was declared as required in {self._path} but was not found in the build. If the removal from the build was intended, update {self._path} to remove the line '{missing}'.")
return errors
def print_error(msg):
print(msg, file=sys.stderr)
def main(input_args):
parser = argparse.ArgumentParser()
parser.add_argument(
'--zbi-file', help='Path to the zbi to verify', required=True)
parser.add_argument(
'--blobfs-manifest',
help='Path to blobfs manifest file, required for "static_pkgs"',
required=False)
parser.add_argument(
'--scrutiny',
help='Path to the scrutiny tool used for verifying kernel cmdline',
required=True)
parser.add_argument(
'--far',
help=(
'Path to the far tool used for extracting package, ' +
'required for "static_pkgs"'),
required=False)
parser.add_argument(
'--golden-files',
help=(
'Path to one of the possible golden files to check against, ' +
'there should only be one golden file in normal case, and only ' +
'two golden files, one old file and one new file during a soft ' +
'transition. After the transition, the old golden file should ' +
'be removed and only leave the new golden file.'),
nargs='+',
required=True)
parser.add_argument(
'--stamp', help='Path to the victory file', required=True)
parser.add_argument(
'--type',
help=('The type of the ZBI item to verify'),
choices=SUPPORTED_TYPES,
required=True)
parser.add_argument(
'--depfile',
help=(
'Optional generated depfile listing dynamic deps for the script' +
', required for "static_pkgs"'),
required=False)
args = parser.parse_args(input_args)
if len(args.golden_files) > 2:
print_error(
'At most two optional golden files are supported, ' +
'is there a soft transition already in place? Please wait for ' +
'that to finish before starting a new one.')
try:
verify_build(args)
except VerificationError as e:
print_error(str(e))
return 1
with open(args.stamp, 'w') as stamp_file:
stamp_file.write('Golden!\n')
return 0
def verify_build(args):
"""verify_build verifies a build against specified golden files.
Raises:
VerificationError: If verification fails.
"""
# Check for some necessary files/dirs exist first.
for file in [args.scrutiny, args.zbi_file]:
if not os.path.exists(file):
raise VerificationError('Missing required file: ' + file)
with tempfile.TemporaryDirectory() as tmp:
run_scrutiny_command(
args.scrutiny, ' '.join(
[
'tool.zbi.extract', '--input',
shlex.quote(args.zbi_file), '--output',
shlex.quote(tmp)
]))
last_error = None
for golden_file in args.golden_files:
try:
if args.type == 'kernel_cmdline':
verify_kernel_cmdline(golden_file, tmp)
elif args.type == 'bootfs_filelist':
verify_bootfs_filelist(golden_file, tmp)
elif args.type == 'static_pkgs':
verify_static_pkgs(args, golden_file, tmp)
# Passes the verification, no error thrown.
return
except VerificationError as e:
# Error thrown, we want to record this error and check next
# golden_file.
last_error = e
raise last_error
def verify_kernel_cmdline(kernel_cmdline_golden_file, scrutiny_out):
"""verify_kernel_cmdline verifies the kernel cmdline in ZBI image.
Raises:
VerificationError: If verification fails.
"""
gf_checker = GoldenFileChecker(kernel_cmdline_golden_file)
actual_cmd = []
if os.path.exists(os.path.join(scrutiny_out, 'sections', 'cmdline.blk')):
try:
with open(os.path.join(scrutiny_out, 'sections', 'cmdline.blk'),
'r') as f:
# The cmdline.blk contains a trailing \x00.
cmdline = f.read().strip().rstrip('\x00')
except IOError as e:
raise VerificationError(f'Failed to read cmdline.blk: {e}')
cmdline_args = cmdline.split(' ')
try:
actual_cmd = generate_sorted_cmdline(cmdline_args)
except CmdlineFormatError as e:
raise VerificationError(f'Invalid golden cmdline format: {e}')
errors = gf_checker.check_match(actual_cmd)
if len(errors) > 0:
error_msgs = ['Kernel cmdline mismatch!']
error_msgs.append('')
error_msgs.extend(errors)
error_msgs.append('')
error_msgs.append(f'If you intended to change the kernel command line, please acknowledge it by updating {kernel_cmdline_golden_file} with the added or removed lines.')
error_msgs.append(SOFT_TRANSITION_MESSAGE)
raise VerificationError('\n'.join(error_msgs))
def verify_bootfs_filelist(bootfs_filelist_golden_file, scrutiny_out):
"""verify_bootfs_filelist verifies the bootFS filelist in ZBI image.
Raises:
VerificationError: If verification fails.
"""
gf_checker = GoldenFileChecker(bootfs_filelist_golden_file)
bootfs_folder = os.path.join(scrutiny_out, 'bootfs')
bootfs_files = []
try:
for root, _, files in os.walk(bootfs_folder):
for file in files:
bootfs_files.append(
os.path.relpath(os.path.join(root, file), bootfs_folder))
except IOError as e:
raise VerificationError(f'Failed to walk bootfs folder: {e}')
got_content = sorted(bootfs_files)
errors = gf_checker.check_match(bootfs_files)
if len(errors) > 0:
error_msgs = ['BootFS file list mismatch!']
error_msgs.append('')
error_msgs.extend(errors)
error_msgs.append('')
error_msgs.append(f'If you intended to change the bootfs contents, please acknowledge it by updating {bootfs_filelist_golden_file} with the added or removed lines.')
error_msgs.append(SOFT_TRANSITION_MESSAGE)
raise VerificationError('\n'.join(error_msgs))
def verify_static_pkgs(
args,
golden_file,
scrutiny_out,
):
"""verify_static_pkgs verifies static packages list.
Raises:
VerificationError: If verification fails.
"""
deps = []
if not args.blobfs_manifest:
raise VerificationError(
'"blobfs-manifest" must be specified for "static_pkgs" check')
if not args.far:
raise VerificationError(
'"far" must be specified for "static_pkgs" check')
if not args.depfile:
raise VerificationError(
'"depfile" must be specified for "static_pkgs" check')
try:
system_image_hash = get_system_image_hash(scrutiny_out)
except IOError as e:
raise VerificationError(f'Failed to get devmgr config: {e}')
except KeyError as e:
raise VerificationError(f'Invalid devmgr config: {e}')
try:
blob_manifest = parse_key_value_file(args.blobfs_manifest)
except IOError as e:
raise VerificationError(f'Failed to open blob manifest: {e}')
try:
system_image_blob = os.path.join(
os.path.dirname(args.blobfs_manifest),
blob_manifest[system_image_hash])
# Add system_image_blob as dynamic dependency.
deps.append(system_image_blob)
except KeyError as e:
raise VerificationError(f'System image blob not found: {e}')
system_image_folder = os.path.join(scrutiny_out, 'system_image')
try:
extract_package(args.far, system_image_blob, system_image_folder)
except subprocess.CalledProcessError as e:
raise VerificationError(
f'Failed to extract system_image package: {e.stderr}')
try:
static_packages_hash = parse_key_value_file(
os.path.join(system_image_folder, 'meta',
'contents'))['data/static_packages']
except KeyError:
raise VerificationError(
'No "data/static_packages" found in "system_image"')
except IOError as e:
raise VerificationError(
f'Failed to read system_image/meta/contents file: {e}')
try:
static_packages_blob = os.path.join(
os.path.dirname(args.blobfs_manifest),
blob_manifest[static_packages_hash])
# Add static_packages_blob as dynamic dependency.
deps.append(static_packages_blob)
except KeyError as e:
raise VerificationError(f'Static pkgs blob not found: {e}')
try:
with open(static_packages_blob, 'r') as f:
static_packages_content = f.read().strip()
except IOError as e:
raise VerificationError(f'Failed to read static packages blob: {e}')
# Write depfile.
try:
with open(args.depfile, 'w') as f:
f.write(args.stamp + ': ' + ' '.join(deps) + '\n')
except IOError as e:
raise VerificationError(f'Failed to write depfile: {e}')
pkgs = []
for pkg in static_packages_content.splitlines():
pkgs.append(re.split(r'/[0-9]=', pkg)[0])
gf_checker = GoldenFileChecker(golden_file)
errors = gf_checker.check_match(sorted(pkgs))
if len(errors) > 0:
error_msgs = ['Static packages list mismatch!']
error_msgs.append('')
error_msgs.extend(errors)
error_msgs.append('')
error_msgs.append(f'If you intended to change the list of static packages, please acknowledge it by updating {golden_file} with the added or removed lines.')
error_msgs.append(SOFT_TRANSITION_MESSAGE)
raise VerificationError('\n'.join(error_msgs))
def get_system_image_hash(scrutiny_out):
"""Get the system image merkle root.
Args:
scrutiny_out: the scrutiny output directory.
Raises:
IOError: If fails to read devmgr config.
KeyError: If the config entry for system image hash is not found.
"""
devmgr_config_file = os.path.join(
scrutiny_out, 'bootfs', 'config', 'devmgr')
key_value_map = parse_key_value_file(devmgr_config_file)
return key_value_map['zircon.system.pkgfs.cmd'].replace('bin/pkgsvr+', '')
def run_scrutiny_command(scrutiny_path, command):
"""Runs scrutiny command.
Args:
scrutiny_path: The path to the scrutiny tool.
command: The scrutiny command to run.
Raises:
VerificationError: If the command fails or the output is not
'{"status":"ok"}'.
"""
try:
output = subprocess.run(
[scrutiny_path, '-c', command], capture_output=True,
check=True).stdout
except subprocess.CalledProcessError as e:
raise VerificationError(f'Failed to run scrutiny: {e.stederr}')
try:
if json.loads(output)['status'] != 'ok':
raise VerificationError(f'Unexpected scrutiny output: {output}')
except (KeyError, json.JSONDecodeError) as e:
raise VerificationError(f'Unexpected scrutiny output: {e}')
def extract_package(far_path, package_path, output_dir):
"""Extract a package from a blob using "fx far extract".
Args:
far_path: The path to far tool.
package_path: The path to the package blob file.
output_dir: The output directory to put the extracted package.
Raises:
subprocess.CalledProcessError: If failed to extract.
"""
subprocess.run(
[
far_path, 'extract', '--archive=' + package_path,
'--output=' + output_dir
],
capture_output=True,
check=True)
class CmdlineFormatError(Exception):
"""Exception thrown when kernel cmdline is in invalid format."""
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
class VerificationError(Exception):
"""Exception thrown when verification fails."""
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg
def generate_sorted_cmdline(cmdline_args):
"""generate_sorted_cmdline generates a kernel cmdline sorted by entry keys.
Raises:
CmdlineFormatError: If the kernel cmdline is not formatted correctly.
"""
cmdline_entries = {}
for entry in cmdline_args:
if len(entry.split('=')) > 2:
raise CmdlineFormatError(
'invalid kernel cmdline, key value pair: ' + entry)
key, _, value = entry.partition('=')
if key in cmdline_entries:
raise CmdlineFormatError('duplicate kernel cmdline key: ' + key)
cmdline_entries[key] = value
return [
('%s=%s' % (key, value)) if value else key
for key, value in sorted(cmdline_entries.items())]
def parse_key_value_file(file_path):
"""Parses a file in 'key=value' format.
Args:
file_path: The path to the file.
Returns:
A {key:value} map
Raises:
IOError: if failed to read the file.
"""
with open(file_path, 'r') as f:
content = f.read()
key_value_map = {}
for line in content.splitlines():
split_array = line.split('=')
if len(split_array) == 2:
key_value_map[split_array[0]] = split_array[1]
return key_value_map
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))