blob: 26a85819214ee7467694c0b90a164767bf789c95 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2020 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# A collection of functions to process distribution manifests. These are JSON
# files that contain a a list of objects, which follow the schema documented
# in //build/dist/distribution_manifest.gni
import collections
import dataclasses
import filecmp
import json
import os
from typing import (
Any,
Callable,
Iterable,
List,
Set,
DefaultDict,
Dict,
Optional,
Tuple,
)
# A namedtuple type used to model an entry from a distribution manifest, after
# expansion.
Entry = collections.namedtuple("Entry", ["destination", "source", "label"])
@dataclasses.dataclass
class ParseResult:
"""A class modelling the result of parsing a partial manifest."""
# The list of distribution Entry items.
entries: List[Entry]
# The list of parsing errors. Empty if none.
errors: List[str]
# A { destination_path -> elf_runtime_dir } map.
elf_runtime_map: Dict[str, str]
# This supports distribution manifest as JSON files which are lists of objects
# (dictionaries in Python), fully documented at:
# //docs/concepts/build_system/internals/manifest_formats.md
#
# Note that it is possible for an expanded manifest to have several entries
# for the same destination path. If all entries have the same source
# (either the same source path, or the same source content!), then they
# can be merged into a single one. Otherwise, it is a build error.
#
PartialEntry = Dict[str, str]
def expand_manifest_items_inner(
manifest_items: Iterable[PartialEntry],
opened_files: Set[str],
default_label: Optional[str] = None,
) -> Tuple[List[Entry], List[PartialEntry]]:
"""Expand the content of a distribution manifest file.
Note that this function does not try to de-duplicate identical entries.
Args:
manifest_items: A list of dictionaries, corresponding to the
content of a manifest file.
opened_files: A set of file paths, which will be updated with
the paths of the files that have been read during expansion.
default_label: An optional string that will be used as a default
"label" value if an entry does not have one.
Returns:
An (entries, extras) tuple, where `entries` is an Entry list,
and `extras` is a list of input items that need further processing,
e.g. renaming entries.
"""
entries: List[Entry] = []
extras: List[PartialEntry] = []
if manifest_items is None:
return entries, extras
for item in manifest_items:
if "label" not in item and default_label is not None:
item["label"] = default_label
if "renamed_source" in item:
# A renaming entry, for now just add it to the 'extras' list to
# be processed by the caller.
extras.append(item)
if "copy_from" in item:
# A copy entry, for now just add it to the 'extras' list.
extras.append(item)
if "source" in item:
if "elf_runtime_dir" in item:
# Save the entry in 'extras', to be parsed later then delete
# the key from the item.
extras.append(item.copy())
del item["elf_runtime_dir"]
entries.append(Entry(**item))
elif "file" in item:
file_path = item["file"]
item_label = item["label"]
opened_files.add(file_path)
with open(file_path) as data_file:
data = json.load(data_file)
new_entries, new_extras = expand_manifest_items_inner(
data, opened_files, item_label
)
entries += new_entries
extras += new_extras
return entries, extras
def expand_partial_manifest_items(
manifest_items: Iterable[PartialEntry],
opened_files: Set[str],
default_label: Optional[str] = None,
) -> ParseResult:
"""Expand the content of a distribution manifest file.
Note that this function does not try to de-duplicate identical entries.
Args:
manifest_items: A list of dictionaries, corresponding to the
content of a manifest file.
opened_files: A set of file paths, which will be updated with
the paths of the files that have been read during expansion.
default_label: An optional string that will be used as a default
"label" value if an entry does not have one.
Returns:
An (entries, errors) tuple, where `entries` is an Entry list, and
`errors` is a list of string describing errors found in the input,
which will be empty on success.
"""
entries, extras = expand_manifest_items_inner(
manifest_items, opened_files, default_label
)
# Process extra entries here.
errors: List[str] = []
unknown_renames: List[
PartialEntry
] = [] # rename entries with unknown renamed_source path.
renamed_entries: List[Entry] = []
renamed_sources: Set[
str
] = set() # Source paths of original entries that are renamed.
persistent_sources: Set[
str
] = set() # Source paths of original entries that must be preserved.
elf_runtime_map: Dict[str, str] = {} # Map destination path to the
# corresping elf runtime directory.
if extras:
# Verify that each renaming entry references a given regular entry.
source_entry_map = {e.source: e for e in entries}
# A map that associates with each destination path (e.g. 'bin/foo')
# the extra items that have an elf_runtime_dir key in it.
elf_runtime_entries: Dict[str, List[Dict]] = collections.defaultdict(
list
)
# A map built from all copy entries, that maps their destination path
# to the corresponding source path.
copy_reverse_map = {
e["copy_to"]: e["copy_from"] for e in extras if "copy_from" in e
}
for extra in extras:
if "renamed_source" in extra:
source = extra["renamed_source"]
dest = extra["destination"]
source_entry = source_entry_map.get(source)
if source_entry is None:
# Try with the copy entries.
alt_source = copy_reverse_map.get(source)
if alt_source:
source = alt_source
source_entry = source_entry_map.get(source)
if source_entry is None:
unknown_renames.append(extra)
continue
new_entry = source_entry._replace(destination=dest)
extra_label = extra.get("label")
if extra_label:
new_entry = new_entry._replace(label=extra_label)
renamed_entries.append(new_entry)
renamed_sources.add(source)
if extra.get("keep_original", False):
persistent_sources.add(source)
elif "copy_from" in extra:
# Already handled by copy_reverse_map above.
pass
elif "elf_runtime_dir" in extra:
dest = extra["destination"]
elf_runtime_entries[dest].append(extra)
pass
else:
# Should not happen unless there is a bug in
# expand_manifest_entries_inner.
assert False, "Unsupported extra item: %s" % extra
if elf_runtime_entries:
# For each destination path, there should be a single ELF runtime dir,
# so try to find conflicts here.
elf_conflicts = []
for dest, extras in elf_runtime_entries.items():
elf_dirs = set(e["elf_runtime_dir"] for e in extras)
if len(elf_dirs) > 1:
elf_conflicts += list(extras)
else:
assert len(elf_dirs) == 1
elf_runtime_map[dest] = elf_dirs.pop()
if elf_conflicts:
errors.append(
"ERROR: Entries with same destination path have different ELF runtime dir:"
)
for entry in sorted(
elf_conflicts, key=lambda x: x["destination"]
):
errors.append(
" - destination=%s source=%s label=%s elf_runtime_dir=%s"
% (
entry["destination"],
entry["source"],
entry["label"],
entry["elf_runtime_dir"],
)
)
if unknown_renames:
errors.append(
"ERROR: Renamed distribution entries have unknown source destination:"
)
for extra in unknown_renames:
errors.append(" - %s" % json.dumps(extra))
# When the source path of a copy entry is actually provided by several
# regular entries, it means that one of the latter comes from a resource()
# target, instead of a renamed_binary() one. Unfortunately, there is no
# way to know from the input data which regular entry should be preserved.
#
# For example:
#
# resource("bar") {
# outputs = [ "bin/bar" ]
# deps = [ "//src:foo" ]
# sources = [ "$root_build_dir/foo" ]
# }
#
# renamed_binary("zoo") {
# dest = "bin/zoo"
# source = "$root_build_dir/foo"
# deps = [ "//src:foo" ]
# }
#
# Would generate:
#
# {
# "destination": "bin/bar",
# "source": "foo",
# "label": "//whatever:bar",
# }
#
# {
# "destination": "bin/foo",
# "source": "foo",
# "label": "//src:foo",
# }
#
# {
# renamed_from = "foo",
# destination: "bin/zoo",
# label = "//whatever:zoo"
# }
#
# Notice that from the data above, it's impossible to tell whether to
# remove the first or second entry from the final manifest.
#
# Since this is a seldom case, detect it here and generate an error
# message that explains how to solve the issue.
#
source_to_multi_entries: Dict[str, Set[Entry]] = collections.defaultdict(
set
)
for e in entries:
source_to_multi_entries[e.source].add(e)
multi_source_entries = []
for src, src_entries in source_to_multi_entries.items():
if src in renamed_sources and len(src_entries) > 1:
multi_source_entries += list(src_entries)
if multi_source_entries:
errors.append(
"ERROR: Multiple regular entries with the same source path:"
)
for e in sorted(multi_source_entries):
errors.append(
" - destination=%s source=%s label=%s"
% (e.destination, e.source, e.label)
)
errors.append(
"\nThis generally means a mix of renamed_binary() and resource() targets\n"
+ "that reference the same source. Try replacing the resource() targets by\n"
+ "renamed_binary() ones to fix the problem\n"
)
renamed_sources -= persistent_sources
entries = [
e for e in entries if e.source not in renamed_sources
] + renamed_entries
return ParseResult(
entries=entries, errors=errors, elf_runtime_map=elf_runtime_map
)
def expand_manifest_items(
manifest_items: Iterable[PartialEntry],
opened_files: Set[str],
default_label: Optional[str] = None,
) -> List[Entry]:
"""Expand the content of a distribution manifest file.
Note that this function does not try to de-duplicate identical entries.
Args:
manifest_items: A list of dictionaries, corresponding to the
content of a manifest file.
opened_files: A set of file paths, which will be updated with
the paths of the files that have been read during expansion.
default_label: An optional string that will be used as a default
"label" value if an entry does not have one.
Returns:
An Entry list.
"""
result = expand_partial_manifest_items(
manifest_items, opened_files, default_label
)
if result.errors:
raise Exception("\n".join(result.errors))
return result.entries
def _entries_have_same_source(
entry1: Entry, entry2: Entry, opened_files: Set[str]
) -> bool:
"""Return True iff two entries have the same source.
Args:
entry1, entry2: input entries to compare.
opened_files: a set of file paths, updated with the input entries'
source paths if they need to be opened for comparing their
content.
Returns:
True iff the entries have the same source path, or if the
path point to files with the same content.
"""
if entry1.source == entry2.source:
return True
opened_files.add(entry1.source)
opened_files.add(entry2.source)
return filecmp.cmp(entry1.source, entry2.source)
def expand_manifest(
manifest_items: Iterable[Dict[str, str]], opened_files: Set[str]
) -> Tuple[List[Entry], str]:
"""Expand the content of a distribution manifest into an Entry list.
Note, this removes duplicate entries, if they have the same source
path or content, and will report conflicts otherwise.
Args:
input_entries: An Entry list, that may contain duplicate entries.
Note that two entries are considered duplicates if they
have the same destination, and the same source (either by
path or content).
opened_files: A set of file paths, which will be updated with
the paths of the files that have been read during the merge.
Returns:
A (merged_entries, error_msg) tuple, where merged_entries is an
Entry list of the merged input entries, and error_msg is a string
of error messages (which happen when conflicts are detected), or
an empty string in case of success.
"""
input_entries = expand_manifest_items(manifest_items, opened_files)
# Used to record that a given destination path has two or more conflicting
# entries, with different sources.
source_conflicts: DefaultDict[str, Set[Entry]] = collections.defaultdict(
set
)
dest_to_entries: Dict[str, Entry] = {}
for entry in input_entries:
dest = entry.destination
current_entry = dest_to_entries.setdefault(dest, entry)
if current_entry == entry:
continue
if not _entries_have_same_source(entry, current_entry, opened_files):
source_conflicts[dest].update((current_entry, entry))
continue
# These entries have the same source path, so merge them.
if current_entry.label is None:
dest_to_entries[dest] = current_entry._replace(label=entry.label)
error = ""
for dest, entries in source_conflicts.items():
error += " Conflicting source paths for destination path: %s\n" % dest
for entry in sorted(entries, key=lambda x: x.source):
error += " - source=%s label=%s\n" % (entry.source, entry.label)
if error:
error = "ERROR: Conflicting distribution entries!\n" + error
return (
sorted(dest_to_entries.values(), key=lambda x: x.destination),
error,
)
def distribution_entries_to_string(entries: List[Entry]) -> str:
"""Convert an Entry list to a JSON-formatted string."""
return json.dumps(
[e._asdict() for e in sorted(entries)],
indent=2,
sort_keys=True,
separators=(",", ": "),
)
def convert_fini_manifest_to_distribution_entries(
fini_manifest_lines: Iterable[str], label: str
) -> List[Entry]:
"""Convert a FINI manifest into an Entry list.
Args:
fini_manifest_lines: An iteration of input lines from the
FINI manifest.
label: A GN label that will be applied to all generated
entries in the resulting list.
Returns:
An Entry list.
"""
result: List[Entry] = []
for line in fini_manifest_lines:
dst, _, src = line.strip().partition("=")
entry = Entry(destination=dst, source=src, label=label)
result.append(entry)
return result
def _rewrite_elf_needed(dep: str) -> Optional[str]:
"""Rewrite an ELF DT_NEEDED dependency name.
Args:
dep: dependency name as it appears in ELF DT_NEEDED entry (e.g. 'libc.so')
Returns:
None if the dependency should be ignored, or the input dependency name,
possibly rewritten for specific cases (e.g. 'libc.so' -> 'ld.so.1')
"""
if dep == "libzircon.so":
# libzircon.so being injected by the kernel into user processes, it should
# not appear in Fuchsia packages, and thus should be ignored.
return None
if dep == "libc.so":
# ld.so.1 acts as both the dynamic loader and C library, so any reference
# to libc.so should be rewritten as 'ld.so.1'
return "ld.so.1"
# For all other cases, just return the unmodified dependency name.
return dep
def verify_elf_dependencies(
binary_name: str,
lib_dir: str,
deps: Iterable[str],
get_lib_dependencies: Callable[[str], Optional[List[str]]],
visited_libraries: Set[str] = set(),
) -> List[str]:
"""Verify the ELF dependencies of a given ELF binary.
Args:
binary_name: Name of the binary being verified, only used for error messages.
lib_dir: The directory where the dependency libraries are supposed to be
at runtime.
deps: The list of DT_NEEDED dependency names for the current binary.
get_lib_dependencies: A function that takes a runtime library path
(e.g. "lib/libfoo.so") and returns the corresponding list of DT_NEEDED
dependencies for its input, as a list of strings.
visited_libraries: An optional set of file paths, which is updated
by this function with the paths of the dependency libraries
visited by this function.
Returns:
A list of error strings, which will be empty in case of success.
"""
# Note that we do allow circular dependencies because they do happen
# in practice. In particular when generating instrumented binaries,
# e.g. for the 'asan' case (omitting libzircon.so):
#
# libc.so (a.k.a. ld.so.1)
# ^ ^ ^ |
# | | | v
# | | libclang_rt.asan.so
# | | | ^ ^
# | | v | |
# | libc++abi.so |
# | | |
# | v |
# libunwind.so-----------'
#
errors: List[str] = []
queue: Set[str] = set(deps)
while queue:
dep = queue.pop()
dep2 = _rewrite_elf_needed(dep)
if dep2 is None:
continue
dep_path = os.path.join(lib_dir, dep2)
if dep_path in visited_libraries:
continue
subdeps = get_lib_dependencies(dep_path)
if subdeps is None:
errors.append("%s missing dependency %s" % (binary_name, dep_path))
else:
visited_libraries.add(dep_path)
for subdep in subdeps:
if os.path.join(lib_dir, subdep) not in visited_libraries:
queue.add(subdep)
return errors