blob: 011b5a2f62ad24ec08d8b55d2e24d80a382cf4b0 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import datetime
import json
import subprocess
from pathlib import Path
from fuchsia_task_lib import Terminal
def run(*command):
try:
return subprocess.check_output(
command,
text=True,
).strip()
except subprocess.CalledProcessError as e:
print(e.stdout)
raise e
def run_checked(*command):
try:
subprocess.run(
command,
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
).check_returncode()
return True
except subprocess.CalledProcessError as e:
return False
def print_title(msg):
print(f'\n{Terminal.bold("-- {} --".format(msg))}\n')
def parse_args():
'''Parses arguments.'''
parser = argparse.ArgumentParser()
def path_arg(type='file'):
def arg(path):
path = Path(path)
if path.is_file() != (type == 'file') or path.is_dir() != (type == 'directory'):
parser.error(f'Path "{path}" is not a {type}!')
return path
return arg
parser.add_argument(
'--ffx',
type=path_arg(),
help='A path to the ffx tool.',
required=True,
)
parser.add_argument(
'--name',
type=str,
help='The name of the configuration.',
required=True,
)
parser.add_argument(
"--expected_emulator",
type=str,
help='The expected name of the running emulator',
required=False,
)
parser.add_argument(
"--expected_package_repo",
type=str,
help='The expected package repository',
required=False,
)
parser.add_argument(
"--expected_product_bundle",
type=str,
help='The expected product bundle url',
required=False,
)
parser.add_argument(
"--expected_product_bundle_repo",
type=str,
help='The expected product bundle repo name',
required=False,
)
parser.add_argument(
"--expected_sdk_version",
type=str,
help='The expected sdk version',
required=False,
)
parser.add_argument(
"--expected_product_name",
type=str,
help='The expected product name',
required=False,
)
return parser.parse_args()
class Emulator:
def __init__(self, active, name):
self.active = active
self.name = name
class BuildInfo:
def __init__(self, sdk_version, product_config):
self.sdk_version = sdk_version
self.product_config = product_config
def all_emulators(args):
raw_emulators = run(args.ffx, "emu", "list")
if not raw_emulators:
return []
emulators = []
for emulator in raw_emulators.split('\n'):
emulator_entries = [v for v in emulator.split(' ') if v]
(active, name) = (emulator_entries[0], emulator_entries[1])
emulators.append(Emulator(active == '[Active]', name))
return emulators
def all_package_repo_names(args):
repos = json.loads(run(args.ffx, "--machine", "JSON", "repository", "list"))
return [repo["name"] for repo in repos]
def target_reachable(args, target):
return run_checked(args.ffx, '--target', target, "target", 'wait', '-t', "5")
def get_current_default_target(args):
return run(args.ffx, "target", "default", "get")
def is_repo_registered_with_target(args, repo, target):
# target repository list does not support JSON output so we have to parse the output.
# this is fragile so migrate once json is supported.
output = run(args.ffx, "target", "repository", "list")
output_rows = output.split("\n")
for i, row in enumerate(output_rows):
if row.find(repo) > 0:
for j in range(i, len(output_rows)):
if output_rows[j].startswith("+"):
break
if output_rows[j].find(target) > 0:
return True
return False
def build_info_for_target(args, target):
if not target_reachable(args, target):
return ""
result = json.loads(run(args.ffx, '--target', target, 'target', 'show', '--json'))
sdk_version = ""
product_config = ""
for entry in result:
if entry['label'] == 'build':
for child in entry['child']:
label = child['label']
if label == 'version':
sdk_version = child['value']
elif label == 'product':
product_config = child['value']
return BuildInfo(sdk_version=sdk_version, product_config=product_config)
def is_product_bundle_downloaded(args):
# product-bundle list does not support JSON output so we have to parse the output.
# this is fragile so migrate once json is supported.
output = run(args.ffx, 'product-bundle', 'list')
pb_url = args.expected_product_bundle
for line in output.split('\n'):
if line.find(pb_url) >= 0 and line.startswith('*'):
return True
return False
def show_header(args):
sdk_version = run(args.ffx, 'sdk', 'version')
print('')
print(f'{Terminal.bold("Status of development environment: {}".format(args.name))}')
print(f' - Current SDK Version: {sdk_version}')
def show_target_summary(args, current_default_target, print_pass, print_fail):
print_title('Checking Target Status')
if current_default_target:
print_pass(f'default target set to "{current_default_target}"')
else:
print_fail('no default target specified')
print(f'\n{Terminal.bold("Known Targets:")}')
try:
targets = json.loads(run(args.ffx, "--machine", "json", "target", "list"))
except:
targets = []
if len(targets) > 0:
[print(f'- {t["nodename"]} ({t["target_type"]})') for t in targets]
else:
print_fail('no known targets')
if args.expected_emulator:
show_emulator_status(args, args.expected_emulator, current_default_target, print_pass, print_fail)
if current_default_target != args.expected_emulator:
show_reachability_of_target(args, current_default_target, print_pass, print_fail)
def show_emulator_status(args, emulator, default_target, print_pass, print_fail):
print_title('Checking status of emulator "{}"'.format(emulator))
if emulator == default_target:
print_pass(f'expected emulator "{emulator}" is the default target')
else:
print_fail(f'expected "{emulator}" to be the default target but it is not')
show_reachability_of_target(args, emulator, print_pass, print_fail)
def show_reachability_of_target(args, target, print_pass, print_fail):
is_reachable = target_reachable(args, target)
if is_reachable:
print_pass(f'{target} is running and reachable')
show_build_info_status(args, target, print_pass, print_fail)
else:
print_fail(f'{target} is not running')
def show_build_info_status(args, target, print_pass, print_fail):
build_info = build_info_for_target(args, target)
if args.expected_sdk_version:
try:
build_date = datetime.datetime.fromisoformat(build_info.sdk_version)
print(f'Cannot determine SDK version for "{target}"')
print(f'{target} is running a build from {build_info.sdk_version}')
except:
if build_info.sdk_version == args.expected_sdk_version:
print_pass(f'{target} is running the expected sdk version of "{args.expected_sdk_version}"')
else:
print_fail(f'{target} is running sdk version "{build_info.sdk_version}" which does not match the expected "{args.expected_sdk_version}"')
if args.expected_product_name:
expected_product_config = args.expected_product_name.split(".")[0]
if build_info.product_config == expected_product_config:
print_pass(f'{target} is running the expected product "{expected_product_config}"')
else:
print_fail(f'{target} is running product "{build_info.product_config}" which does not match the expected "{expected_product_config}"')
def show_package_repo_status(args, default_target, print_pass, print_fail):
print_title('Checking status of package repositories')
print(f'{Terminal.bold("Known Package Repositories:")}')
# FIX IF NOT HERE
repos = all_package_repo_names(args)
if repos:
[print(f' - {r}') for r in repos]
else:
print_fail('No active repositories')
current_default_repo = run(args.ffx, "repository", "default", "get")
if args.expected_package_repo:
print('')
if current_default_repo == args.expected_package_repo:
print_pass(f'package repository "{current_default_repo}" is the default repository')
else:
print_fail(f'expected repository "{args.expected_package_repo}" to be the default repository but it is not')
show_status_of_repository_registration(args, args.expected_package_repo, default_target, print_pass, print_fail)
def show_status_of_repository_registration(args, repo, default_target, print_pass, print_fail):
if is_repo_registered_with_target(args, repo, default_target):
print_pass(f'package repository "{repo}" is registered with the default target')
else:
print_fail(f'package repository "{repo}" is not registered with the default target')
if args.expected_emulator:
if is_repo_registered_with_target(args, repo, args.expected_emulator):
print_pass(f'package repository "{repo}" is registered with emulator "{args.expected_emulator}"')
else:
print_fail(f'package repository "{repo}" is not registered with emulator "{args.expected_emulator}"')
def show_product_bundle_status(args, default_target, print_pass, print_fail):
if not args.expected_product_bundle:
return
print_title("Checking status of product bundle")
if is_product_bundle_downloaded(args):
print_pass(f'{args.expected_product_bundle} is downloaded.')
else:
print_fail(f'{args.expected_product_bundle} is not downloaded.')
if args.expected_product_bundle_repo:
repos = all_package_repo_names(args)
if args.expected_product_bundle_repo in repos:
print_pass(f'package repository hosting the product bundle packages "{args.expected_product_bundle_repo}" exists.')
else:
print_fail(f'product bundle does not have a package repository "{args.expected_product_bundle_repo}"')
show_status_of_repository_registration(args, args.expected_product_bundle_repo, default_target, print_pass, print_fail)
def main():
args = parse_args()
default_target = get_current_default_target(args)
failures = []
def print_pass(msg):
print(f' {Terminal.green("PASS")} - {msg}')
def print_fail(msg):
failures.append(msg)
print(f' {Terminal.red("FAIL")} - {msg}')
show_header(args)
show_target_summary(args, default_target, print_pass, print_fail)
show_package_repo_status(args, default_target, print_pass, print_fail)
show_product_bundle_status(args, default_target, print_pass, print_fail)
if len(failures) > 0:
print('')
print(f'{Terminal.bold("Some checks failed. To fix these problems run:")}')
print(" bazel run {}".format(args.name))
if __name__ == '__main__':
main()