blob: c4822e4ebf27fbe6f3fb9d6b99fffad3f96ec1f2 [file] [log] [blame] [edit]
# Copyright 2024 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import os
import pathlib
import re
import typing
from collections import OrderedDict
import build_dir
import data
import fx_cmd
from async_utils.command import AsyncCommand
def get_fuchsia_dir() -> pathlib.Path:
"""Get the path to the user's Fuchsia checkout.
Raises:
RuntimeError: The path is not found.
Returns:
pathlib.Path: Path to the Fuchsia checkout.
"""
path_str = os.environ.get("FUCHSIA_DIR")
if not path_str:
raise RuntimeError("Missing FUCHSIA_DIR environment variable.")
return pathlib.Path(path_str)
async def git_collector() -> list[data.Result]:
fuchsia_dir = get_fuchsia_dir()
command = await AsyncCommand.create(
"git",
"--no-optional-locks",
f"--git-dir={str(fuchsia_dir)}/.git",
"rev-parse",
"HEAD",
"JIRI_HEAD",
)
output = await command.run_to_completion()
if output.return_code != 0:
return [data.Result(error="git command failed")]
lines = iter(output.stdout.splitlines())
is_at_head = True
try:
l1 = next(lines)
l2 = next(lines)
is_at_head = l1 == l2
except StopIteration:
pass
return [
data.Result(
item=data.Item(
category=data.Category.SOURCE,
key="is_in_jiri_head",
title="Is fuchsia source project in JIRI_HEAD?",
value=is_at_head,
)
)
]
async def jiri_collector() -> list[data.Result]:
"""Collect data from JIRI for display.
Returns:
list[data.Result]: Results collected from JIRI.
"""
command = await fx_cmd.FxCmd().start("jiri", "override", "-list")
output = await command.run_to_completion()
has_overrides = False
if output.return_code == 0:
has_overrides = len(output.stdout.splitlines()) > 0
return [
data.Result(
item=data.Item(
category=data.Category.SOURCE,
key="has_jiri_overrides",
title="Has Jiri overrides?",
value=has_overrides,
notes="output of 'jiri override -list'",
)
)
]
async def environment_collector() -> list[data.Result]:
"""Collect data from the incoming environment for display.
Returns:
list[data.Result]: Results from the environment.
"""
results: list[data.Result] = []
current_build_dir: pathlib.Path | None = None
try:
current_build_dir = build_dir.get_build_directory()
results.append(
data.Result(
item=data.Item(
category=data.Category.ENVIRONMENT,
key="build_dir",
title="Current build directory",
value=str(current_build_dir),
)
)
)
except build_dir.GetBuildDirectoryError as e:
results.append(data.Result(error=f"Failed to get build directory: {e}"))
device_name: str | None = None
device_notes: str | None = None
# FUCHSIA_NODENAME is unconditionally set for all fx invocations.
if (from_env := os.environ.get("FUCHSIA_NODENAME")) is not None:
device_name = from_env
# Use a separate environment variable to determine the source of FUCHSIA_NODENAME
if os.environ.get("FUCHSIA_NODENAME_IS_FROM_FILE") == "true":
device_notes = "set by `fx set-device`"
else:
device_notes = "set by fx -t"
if device_name:
results.append(
data.Result(
item=data.Item(
category=data.Category.ENVIRONMENT,
key="device_name",
title="Device name",
value=device_name,
notes=device_notes,
)
)
)
return results
async def args_gn_collector() -> list[data.Result]:
"""Collect data from args.gn for display.
Returns:
list[data.Result]: Results from args.gn.
"""
results: list[data.Result] = []
# This dict provides titles and optionally notes for variables
# retrieved from args.gn. If a variable is not in this list,
# it is not included in the command output.
ARGS_TITLE_MAP: OrderedDict[
str, typing.Tuple[str, str | None]
] = OrderedDict(
boards=("Board", None),
products=("Product", None),
main_pb_label=("Product bundle", "set with `fx set-main-pb`"),
universe_package_labels=(
"Universe packages",
"--with argument of `fx set`",
),
base_package_labels=(
"Base packages",
"--with-base argument of `fx set`",
),
cache_package_labels=(
"Cache packages",
"--with-cache argument of `fx set`",
),
host_labels=("Host labels", "--with-host argument of `fx set`"),
developer_test_labels=(
"Developer tests",
"--with-test argument of `fx set`",
),
)
# Locate the args.gn file and run it through `gn format`. This
# provides a JSON abstract syntax tree (AST) of the parsed file
# which we can then process to find the current values of the
# variables assigned in the user's settings.
build_dir_path = build_dir.get_build_directory()
args_gn_path = build_dir_path / "args.gn"
if not args_gn_path.exists():
return []
command = await fx_cmd.FxCmd().start(
"gn", "format", "--dump-tree=json", str(args_gn_path)
)
result = await command.run_to_completion()
if result.return_code != 0:
return [data.Result(error="Failed to process args.gn")]
try:
json_result: dict[str, typing.Any] = json.loads(result.stdout)
except json.JSONDecodeError:
return [data.Result(error="Failed to parse args.gn")]
def _extract_type(
value: typing.Any, target_type: str
) -> str | list[str] | None:
"""Helper to extract values from the gn AST JSON.
Args:
value (typing.Any): Incoming value to process.
target_type (str): Type we are looking for. "LITERAL" or "IDENTIFIER".
Returns:
str | list[str] | None: Extracted value from the tree, or None if not
found.
"""
if isinstance(value, list):
# Find the first matching element of incoming lists.
for v in value:
if (ret := _extract_type(v, target_type)) is not None:
return ret
elif isinstance(value, dict) and value.get("type") in [
target_type,
"LIST",
]:
# This is the value we are looking for, either as a single value or a list.
STRIP_CHARS = "\"' \n" # Get rid of wrapping quotes and whitespace.
if isinstance((ret := value.get("value")), str):
# This is a single value, return it.
return ret.strip(STRIP_CHARS)
elif (children := value.get("child")) is not None:
# This is a list of values. Accumulate values
# filtered by type and return it.
lst: list[str] = []
c: dict[str, typing.Any]
for c in children:
if c.get("type") == target_type and isinstance(
(val := c.get("value")), str
):
lst.append(val.strip(STRIP_CHARS))
return lst
return None
# Will contain the final list of imported files.
imports: list[str] = []
# Will contain the final values of assigned variables in the file.
assigned_variables: dict[str, str | list[str]] = {}
# Process each item contained in the args.gn AST.
item: dict[str, typing.Any]
for item in json_result.get("child") or []:
if item.get("type") == "FUNCTION" and item.get("value") == "import":
# This is an import of the form `import("//some/file.gni")`
# Extract the file name being imported.
value = _extract_type(item.get("child"), "LITERAL")
if isinstance(value, str):
imports.append(value)
if isinstance(value, list):
imports.extend(value)
elif item.get("type") == "BINARY":
# This is an assignment of one of the following types:
# "=": Direct assignment
# "+=": List append
# "-=": List subtraction
# Get the variable name (IDENTIFIER) and the value to process (LITERAL).
identifier = _extract_type(item.get("child"), "IDENTIFIER")
value = _extract_type(item.get("child"), "LITERAL")
if isinstance(identifier, str) and value is not None:
if item.get("value") == "=":
# Handle direct assignment by overriding the value.
assigned_variables[identifier] = value
elif item.get("value") == "+=":
# Handle list append by extending new values
# on the existing list, if one exists. Otherwise,
# create a new list.
if identifier not in assigned_variables:
assigned_variables[identifier] = []
target_list = assigned_variables[identifier]
assert isinstance(target_list, list)
assert isinstance(value, list)
target_list.extend(value)
elif item.get("value") == "-=":
# Handle list subtraction by creating a new
# list containing only those elements of the
# existing value that are not present in the list
# passed to the subtraction operation.
if identifier not in assigned_variables:
assigned_variables[identifier] = []
source_list = assigned_variables[identifier]
assert isinstance(source_list, list)
assert isinstance(value, list)
subtraction_set = set(value)
assigned_variables[identifier] = [
val for val in source_list if val not in subtraction_set
]
# If a product bundle is set, we want to show its name instead of its label.
product_bundles_path = build_dir_path / "product_bundles.json"
if product_bundles_path.exists():
try:
with open(product_bundles_path, "r") as f:
product_bundles = json.load(f)
except (json.JSONDecodeError, IOError):
product_bundles = []
main_pb_label_val = assigned_variables.get("main_pb_label")
if main_pb_label_val and isinstance(main_pb_label_val, str):
for pb in product_bundles:
# The label in product_bundles.json might have a toolchain.
label = pb.get("label", "")
if label.startswith(main_pb_label_val):
assigned_variables["main_pb_label"] = pb.get("name")
break
# Parse the import names into pairs of (type, value).
# For instance, `import("//boards/x64.gni")` would be parsed into
# type="boards", value="x64", and
# `import("//foo/bar/boards/x64-fb.gni")` would be parsed into
# type="boards", value="x64-fb"
# Types are filtered against ARGS_TITLE_MAP for inclusion.
# Types are "boards" and "products"
import_regex = re.compile(r".*(boards|products)\/([a-zA-Z0-9-_]+).gni")
for import_name in imports:
if (match := import_regex.match(import_name)) is not None:
key = match.group(1)
value = match.group(2)
title, _ = ARGS_TITLE_MAP.get(key) or (key, None)
results.append(
data.Result(
item=data.Item(
category=data.Category.BUILD,
key=key,
value=value,
title=title,
notes=import_name,
)
)
)
# Process all of the assigned variables, filtered against
# ARGS_TITLE_MAP for inclusion.
for key, value in assigned_variables.items():
title, notes = ARGS_TITLE_MAP.get(key) or (None, None)
if title is not None:
results.append(
data.Result(
item=data.Item(
category=data.Category.BUILD,
key=key,
title=title,
value=value,
notes=notes,
)
)
)
# Compute some extra values that are a function of some assigned variables.
compilation_mode = assigned_variables.get("compilation_mode")
results.append(
data.Result(
item=data.Item(
category=data.Category.BUILD,
key="compilation_mode",
title="Compilation mode",
value=compilation_mode,
)
)
)
return results