blob: a8aeff34d937287c5c1f661b34c4ac9299da476c [file] [log] [blame]
# Copyright 2021 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common utilities needed by rules_fuchsia rules."""
load(
"@fuchsia_sdk//:workspace_output_base.bzl",
_WORKSPACE_OUTPUT_BASE = "WORKSPACE_OUTPUT_BASE",
)
load(":providers.bzl", "FuchsiaProvidersInfo")
WORKSPACE_OUTPUT_BASE = _WORKSPACE_OUTPUT_BASE
_INVALID_LABEL_CHARACTERS = "\"!%@^_#$&'()*+,;<=>?[]{|}~/".elems()
def normalized_target_name(label):
label = label.lower()
for c in _INVALID_LABEL_CHARACTERS:
label = label.replace(c, ".")
return label
def label_name(label):
# convert the label to a single word
# //foo/bar -> bar
# :bar -> bar
# //foo:bar -> bar
return label.split("/")[-1].split(":")[-1]
def get_project_execroot(ctx):
# Gets the project/workspace execroot relative to the output base.
# See https://bazel.build/docs/output_directories.
return "execroot/%s" % ctx.workspace_name
def get_target_execroot(ctx, target):
# Gets the execroot for a given target, relative to the project execroot.
# See https://bazel.build/docs/output_directories.
return target[DefaultInfo].files_to_run.runfiles_manifest.dirname + "/" + ctx.workspace_name
def stub_executable(ctx):
"""Returns a stub executable that fails with a message."""
executable_file = ctx.actions.declare_file(ctx.label.name + "_fail.sh")
content = """#!/bin/bash
echo "---------------------------------------------------------"
echo "ERROR: Attempting to run a target or dependency that is not runnable"
echo "Got {target}"
echo "---------------------------------------------------------"
exit 1
""".format(target = ctx.attr.name)
ctx.actions.write(
output = executable_file,
content = content,
is_executable = True,
)
return executable_file
def flatten(elements):
"""Flattens an arbitrarily nested list of lists to non-list elements while preserving order."""
result = []
unprocessed = list(elements)
for _ in range(len(str(unprocessed))):
if not unprocessed:
return result
elem = unprocessed.pop(0)
if type(elem) in ("list", "tuple"):
unprocessed = list(elem) + unprocessed
else:
result.append(elem)
fail("Unable to flatten list!")
def collect_runfiles(ctx, *elements, ignore_types = []):
"""Collects multiple types of elements (...files, ...targets, ...runfiles) into runfiles."""
# Map to runfiles objects.
runfiles = []
for elem in flatten(elements):
if type(elem) == "Target":
runfiles.append(elem[DefaultInfo].default_runfiles)
files_to_run = elem[DefaultInfo].files_to_run
if files_to_run.executable and files_to_run.runfiles_manifest:
runfiles.append(ctx.runfiles([
files_to_run.executable,
files_to_run.runfiles_manifest,
]))
elif type(elem) == "File":
runfiles.append(ctx.runfiles([elem]))
elif type(elem) == "runfiles":
runfiles.append(elem)
elif type(elem) not in ignore_types:
fail("Unable to get runfiles from %s: %s" % (type(elem), str(elem)))
# Merges runfiles for a given target.
return ctx.runfiles().merge_all(runfiles)
def wrap_executable(ctx, executable, *arguments, script_name = None):
"""Wraps an executable with predefined command line arguments.
Creates a wrapper script that invokes an underlying executable with
predefined command line arguments.
script_name defaults to `run_${target_name}.sh`.
"""
wrapper = ctx.actions.declare_file(script_name or "run_%s.sh" % ctx.attr.name)
# Convert the executable into a path.
executable_str = executable
if type(executable) == "Target":
executable_str = executable[DefaultInfo].files_to_run.executable
# Convert file arguments into strings and serialize arguments.
command = [
"'%s'" % (arg.short_path if type(arg) == "File" else arg).replace("'", "\\'")
for arg in [executable_str] + list(arguments)
]
ctx.actions.write(wrapper, """#!/bin/bash
$(readlink -f %s) %s $@
""" % (command[0], " ".join(command[1:])), is_executable = True)
return wrapper, collect_runfiles(ctx, executable, arguments, ignore_types = ["string"])
def _add_providers_info(implementation):
def _impl(ctx):
return track_providers(implementation(ctx))
return _impl
def _add_default_executable(implementation):
def _impl(ctx):
providers = implementation(ctx)
if not [provider for provider in providers if type(provider) == "DefaultInfo"]:
providers.append(DefaultInfo(executable = stub_executable(ctx)))
return providers
return _impl
def rule_variants(implementation, variants = [], attrs = {}, **rule_kwargs):
"""Creates variants of a rule.
Valid variants:
- None: Behaves like `rule` natively.
- "executable": Sets executable = True and adds a stub executable if
DefaultInfo is not provided by implementation.
- "test": Sets test = True and adds a stub executable if DefaultInfo is not
provided by implementation.
All other arguments will be forwarded to rule.
"""
return [rule(
executable = variant == "executable",
test = variant == "test",
attrs = dict(attrs, _variant = attr.string(default = variant or "")),
implementation = _add_providers_info(
implementation if variant == None else _add_default_executable(implementation),
),
**rule_kwargs
) for variant in variants]
def rule_variant(implementation, variant = None, attrs = {}, **rule_kwargs):
"""Creates a variant of a rule. See rule_variants for argument descriptions."""
return rule_variants(variants = [variant], attrs = attrs, implementation = implementation, **rule_kwargs)[0]
def track_providers(providers):
return providers + [FuchsiaProvidersInfo(
providers = [
provider
for provider in providers
if type(provider) != "DefaultInfo"
],
)]
def forward_providers(ctx, target, *providers, rename_executable = None):
default_info = target[DefaultInfo]
if default_info.files_to_run and default_info.files_to_run.executable:
executable = default_info.files_to_run.executable
executable_symlink = ctx.actions.declare_file(
rename_executable or "_" + executable.basename,
)
ctx.actions.symlink(
output = executable_symlink,
target_file = executable,
is_executable = True,
)
default_info = DefaultInfo(
files = depset([executable_symlink] + [
file
for file in default_info.files.to_list()
if file != executable
]) if rename_executable else default_info.files,
runfiles = default_info.default_runfiles,
executable = executable_symlink,
)
target_provider_info = target[FuchsiaProvidersInfo] if (
FuchsiaProvidersInfo in target
) else struct(providers = [])
return [
target[Provider]
for Provider in providers
if Provider in target
] + target_provider_info.providers + [default_info]
def _forward_providers(ctx):
return forward_providers(ctx, ctx.attr.actual)
_alias, _alias_for_executable, _alias_for_test = rule_variants(
variants = (None, "executable", "test"),
implementation = _forward_providers,
attrs = {
"actual": attr.label(
doc = "The test workflow entity target to alias.",
providers = [FuchsiaProvidersInfo],
mandatory = True,
),
},
)
def alias(*, name, executable, testonly = False, **kwargs):
"""
We have to create our own alias macro because Bazel is unreasonable:
https://github.com/bazelbuild/bazel/issues/10893
The underlying target must be created with `rule_variant(s)` or manually
include `FuchsiaProvidersInfo` in order to forward providers.
"""
return ((
_alias_for_test if testonly else _alias_for_executable
) if executable else _alias)(
name = name,
testonly = testonly,
**kwargs
)
def filter(obj, value = None, exclude = True):
"""Recursively removes matching fields/elements from an object by mutating."""
if type(obj) not in ("dict", "list"):
fail("Unsupported data type.")
nested_fields = [obj]
# Since dictionaries and lists can be represented as DAGs, this represents
# one filter operation within an iterative BFS.
def filter_next():
obj = nested_fields.pop(0)
# Lists and dictionaries can both be represented as key-value pairs.
for k, nested in (obj.items() if type(obj) == "dict" else enumerate(obj)):
if type(nested) in ("dict", "list"):
# Add a nested object to the BFS queue.
nested_fields.append(nested)
elif (nested == value) == exclude:
# Remove the matching value's field by mutating the object.
obj.pop(k)
# Using and iterative BFS to filter all matching values within `obj` should
# take less than `len(str(obj))` iterations.
for _ in range(len(str(obj))):
# Empty nested_fields means that we're done with our BFS.
if not nested_fields:
return obj
filter_next()
# In case the previous assumption is violated.
fail("Unable to filter all none values!")
def make_resource_struct(src, dest):
return struct(
src = src,
dest = dest,
)
def get_runfiles(target):
# Helper function to get the runfiles as a list of files from a target.
return [symlink.target_file for symlink in target[DefaultInfo].default_runfiles.root_symlinks.to_list()]
# Libs all end with .so or .so followed by a semantic version.
# Examples: libname.so, libname.so.1, libname.so.1.1
def is_lib(file):
rparts = file.basename.rpartition(".so")
if (rparts[1] != ".so"):
return False
for char in rparts[2].elems():
if not (char.isdigit() or char == "."):
return False
return True