blob: c3af8a0613bfd80312dd0d0fb2fcc9165ee352ce [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common utilities needed by infra rules."""
_INVALID_LABEL_CHARACTERS = "\"!%@^_#$&'()*+,;<=>?[]{|}~/".elems()
def normalized_target_name(target):
label = target.name.lower()
for c in _INVALID_LABEL_CHARACTERS:
label = label.replace(c, ".")
return label
def get_project_execroot(ctx):
# Gets the project/workspace execroot relative to the output base.
# See https://bazel.build/docs/output_directories.
return "execroot/%s" % ctx.workspace_name
def get_target_execroot(ctx, target):
# Gets the execroot for a given target, relative to the project execroot.
# See https://bazel.build/docs/output_directories.
return target[DefaultInfo].files_to_run.runfiles_manifest.dirname + "/" + ctx.workspace_name
def stub_executable(ctx):
"""Returns a stub executable that fails with a message."""
executable_file = ctx.actions.declare_file(ctx.label.name + "_fail.sh")
content = """#!/bin/bash
echo "---------------------------------------------------------"
echo "ERROR: Attempting to run a target or dependency that is not runnable"
echo "Got {target}"
echo "---------------------------------------------------------"
exit 1
""".format(target = ctx.attr.name)
ctx.actions.write(
output = executable_file,
content = content,
is_executable = True,
)
return executable_file
def flatten(elements):
"""Flattens an arbitrarily nested list of lists to non-list elements."""
result = []
unprocessed = list(elements)
for _ in range(len(str(unprocessed))):
if not unprocessed:
return result
elem = unprocessed.pop()
if type(elem) == "list":
unprocessed.extend(elem)
else:
result.append(elem)
fail("Unable to flatten list!")
def collect_runfiles(ctx, *elements):
"""Collects multiple types of elements (...files, ...targets, ...runfiles) into runfiles."""
# Map to runfiles objects.
runfiles = []
for elem in flatten(elements):
if type(elem) == "Target":
runfiles.append(elem[DefaultInfo].default_runfiles)
runfiles.append(ctx.runfiles(elem[DefaultInfo].files.to_list()))
files_to_run = elem[DefaultInfo].files_to_run
if files_to_run.executable and files_to_run.runfiles_manifest:
runfiles.append(ctx.runfiles([
files_to_run.executable,
files_to_run.runfiles_manifest,
]))
elif type(elem) == "File":
runfiles.append(ctx.runfiles([elem]))
elif type(elem) == "runfiles":
runfiles.append(elem)
elif elem != None:
fail("Unable to get runfiles from %s: %s" % (type(elem), str(elem)))
# Merges runfiles for a given target.
return ctx.runfiles().merge_all(runfiles)
def filter(obj, value = None, exclude = True):
"""Recursively removes matching fields/elements from an object by mutating."""
if type(obj) not in ("dict", "list"):
fail("Unsupported data type.")
nested_fields = [obj]
# Since dictionaries and lists can be represented as DAGs, this represents
# one filter operation within an iterative BFS.
def filter_next():
obj = nested_fields.pop(0)
# Lists and dictionaries can both be represented as key-value pairs.
for k, nested in (obj.items() if type(obj) == "dict" else enumerate(obj)):
if type(nested) in ("dict", "list"):
# Add a nested object to the BFS queue.
nested_fields.append(nested)
elif (nested == value) == exclude:
# Remove the matching value's field by mutating the object.
obj.pop(k)
# Using and iterative BFS to filter all matching values within `obj` should
# take less than `len(str(obj))` iterations.
for _ in range(len(str(obj))):
# Empty nested_fields means that we're done with our BFS.
if not nested_fields:
return obj
filter_next()
# In case the previous assumption is violated.
fail("Unable to filter all none values!")