blob: 10ea3a1a2909539959b3f10b9ba7e715b433cd2f [file] [log] [blame]
# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# Pylint is not smart enough to infer the return type of methods with a custom
# property decorator like @cached_property, so we have to disable some spurious
# warnings from cached property accesses. See
# https://github.com/PyCQA/pylint/issues/3484
#
# pylint: disable=no-member
import collections
from contextlib import contextmanager
import itertools
import re
from recipe_engine import recipe_api
from RECIPE_MODULES.fuchsia.utils import memoize
Range = collections.namedtuple("Range", "start end")
FORMATTING_MESSAGE = """File not formatted properly.
Run the following to format:
"""
MISSING_COMMIT_TAG_MESSAGE = 'The change description should start with a commit tag like "[tag] Change Description".'
# Skip the inclusivity check on a whole section. Please do not change the order of these lines.
INCLUSIVE_DISABLE_RE = re.compile(r"inclusive-language:\s*disable")
INCLUSIVE_ENABLE_RE = re.compile(r"inclusive-language:\s*enable")
INCLUSIVE_IGNORE_RE = re.compile(r"inclusive-language:\s*ignore")
# The list of non-inclusive words and their suggested replacements should match
# https://fuchsia.dev/fuchsia-src/contribute/respectful_code?hl=en#what_are_examples_of_terminology_to_be_avoided
# inclusive-language: disable
INCLUSIVE_WORD_REPLACEMENTS = {
"master": ["primary", "controller", "leader", "host"],
"slave": [
"replica",
"subordinate",
"secondary",
"follower",
"device",
"peripheral",
],
"whitelist": ["allowlist", "exception list", "inclusion list"],
"blacklist": ["denylist", "blocklist", "exclusion list"],
"insane": ["unexpected", "catastrophic", "incoherent"],
"sane": ["expected", "appropriate", "sensible", "valid"],
"crazy": ["unexpected", "catastrophic", "incoherent"],
"redline": ["priority line", "limit", "soft limit"],
}
# inclusive-language: enable
def _analyzer_name(analyzer_func):
"""Return a normalized name for the analyzer function."""
return analyzer_func.__name__.lstrip("_").lower()
class TriciumAnalyzeApi(recipe_api.RecipeApi):
"""API for running analyses on Tricium."""
_FILENAME_RE = re.compile(r"^\+\+\+\ [^/]+/(.*)")
_CHUNK_RE = re.compile(
r"^@@ \-(?P<before_line>\d+)(,(?P<before_count>\d+))? \+(?P<after_line>\d+)(,(?P<after_count>\d+))?",
)
def __init__(self, *args, **kwargs):
super(TriciumAnalyzeApi, self).__init__(*args, **kwargs)
self._ext_to_analyzers = {
".c": [self._ClangFormat, self._ClangTidy],
".cc": [self._ClangFormat, self._ClangTidy],
".cml": [self._CmlFormat],
".cpp": [self._ClangFormat, self._ClangTidy],
".dart": [self._DartFmt],
".h": [self._ClangFormat, self._ClangTidy],
".hh": [self._ClangFormat, self._ClangTidy],
".hpp": [self._ClangFormat, self._ClangTidy],
".fidl": [self._FidlFormat, self._FidlLint],
".gn": [self._GNFormat],
".gni": [self._GNFormat],
".go": [self._GoFmt, self._GoVet],
".md": [self._MdLint],
".py": [self._Black, self._Yapf],
".rs": [self._RustFmt],
".star": [self._Yapf],
".ts": [self._ClangFormat],
".triage": [self._Json5Format],
}
# Analyzer tools that are always produced as part of a Fuchsia built,
# rather than distributed as prebuilts.
self._built_tools = [
"cmc",
"fidl-format",
"fidl-lint",
"formatjson5",
"mdlint",
]
# The paths to these tools may be set directly by the recipe.
self.black = None
self.go = None
self.gofmt = None
self.yapf = None
self.checkout = None
self._build_results = None
# Whether to suggest the use of the fx tool.
# The tool only works properly when run in fuchsia.git or one of its sub-directories.
self.suggest_fx = True
def __call__(
self, filenames, fint_params_path, enabled_analyzers, enabled_luci_analyzers=()
):
"""Check for errors in the given files.
Runs the relevant language's analyzers over each file and posts
Tricium comments if errors are found.
Args:
filenames (seq(str)): Relative paths files in the checkout. Must
be strings rather than Path objects.
enabled_analyzers (seq(str)): Names of enabled analyzers.
enabled_luci_analyzers (seq(str)): Names of enabled analyzers
supported by the recipe engine's tricium recipe module.
fint_params_file (str): Path to a file containing instructions
for building Fuchsia. If set, we'll assume we're operating on
a Fuchsia checkout, and build all the necessary tools along
with generated sources.
"""
enabled_analyzers = [a.lower() for a in enabled_analyzers]
if fint_params_path:
assert self.checkout
self._build_results = self.m.build.with_options(
checkout=self.checkout, fint_params_path=fint_params_path
)
with self.m.step.defer_results():
with self.m.step.nest("check for inclusivity"):
self._check_for_inclusivity("", contents=self._commit_message())
for filename in filenames:
self._check_for_inclusivity(filename)
if enabled_luci_analyzers:
with self.m.step.nest("run luci analyzers"):
self._run_luci_analyzers(
enabled_luci_analyzers,
# cwd is None if we're still in start_dir.
checkout_dir=self.m.context.cwd or self.m.path["start_dir"],
filenames=filenames,
)
for filename in filenames:
analyzers = self._analyzers_for_file(filename, enabled_analyzers)
if not analyzers:
continue
with self.m.step.nest("analyze %s" % filename):
for analyzer_func in analyzers:
analyzer_func(filename)
self.m.tricium.write_comments()
def _check_for_inclusivity(self, filename, contents=""):
if not contents:
contents = self.m.file.read_text(
"read %s" % filename,
self.m.context.cwd.join(filename),
include_log=False,
)
change_diff = self.m.git(
"get change diff for %s" % filename,
"diff-tree",
"--no-commit-id",
"--diff-filter=d",
"-U0",
"HEAD",
"--",
filename,
stdout=self.m.raw_io.output(),
).stdout
change_line_ranges = self._get_ranges_from_diff(
change_diff, include_before=False, include_after=True
)
else:
change_line_ranges = [Range(1, len(contents.splitlines()) + 1)]
content_lines = contents.splitlines()
enabled = True
line_indices_per_word = {}
for i, line in enumerate(content_lines):
if not enabled:
if not INCLUSIVE_ENABLE_RE.search(line):
continue
enabled = True
if INCLUSIVE_DISABLE_RE.search(line):
enabled = False
continue
if INCLUSIVE_IGNORE_RE.search(line):
continue
# The line indices used by tricium and in change_diff start with 1, so
# add 1 to get the 1-based index.
line_index = i + 1
for word in INCLUSIVE_WORD_REPLACEMENTS:
word_regex = re.compile(r"\b%s\b" % word)
line_indices = line_indices_per_word.get(word, [])
if word_regex.search(line):
for r in change_line_ranges:
if line_index >= r.start and line_index < r.end:
line_indices.append(line_index)
break
line_indices_per_word[word] = line_indices
for word, replacements in INCLUSIVE_WORD_REPLACEMENTS.iteritems():
line_indices = line_indices_per_word.get(word, [])
# If a non-inclusive word appears too many times in a file, combine
# all occurrences into one comment to avoid overwhelming Tricium
# with too many comments.
if len(line_indices) > 3:
self.m.tricium.add_comment(
"Inclusivity",
"Please avoid '%s' found on lines %s. Suggested replacements: %s.\n\nSee https://fuchsia.dev/fuchsia-src/contribute/respectful_code"
% (word, str(line_indices), str(replacements)),
filename,
)
else:
for i in line_indices:
self.m.tricium.add_comment(
"Inclusivity",
"Please avoid '%s'. Suggested replacements: %s.\n\nSee https://fuchsia.dev/fuchsia-src/contribute/respectful_code"
% (word, str(replacements)),
filename,
start_line=i,
)
def _run_luci_analyzers(self, enabled_luci_analyzers, checkout_dir, filenames):
all_analyzers = self.m.tricium.analyzers.by_name()
self.m.tricium.run_legacy(
[all_analyzers[name] for name in enabled_luci_analyzers],
input_base=checkout_dir,
affected_files=filenames,
commit_message=self._commit_message(),
# Don't emit comments yet. We'll handle that ourselves after running
# non-LUCI analyzers.
emit=False,
)
def _analyzers_for_file(self, filename, enabled_analyzers):
assert isinstance(
filename, str
), "filenames must be string paths relative to the checkout"
_, ext = self.m.path.splitext(filename)
return [
analyzer_func
for analyzer_func in self._ext_to_analyzers.get(ext, [])
if _analyzer_name(analyzer_func) in enabled_analyzers
]
@contextmanager
def _diff_format(self, category, filename, cmd_format="fx format-code --files=%s"):
"""Checks for diffs after running an auto-formatter.
If there's a diff in the lines that were touched by the CL under
test, adds a comment on the CL.
"""
# This step gets the changed ranges between parent commit and current CL.
# The diff will have following format:
#
# diff --git a/filename b/filename
# --- a/filename
# --- b/filename
# @@ PARENT_START_LINE, LINE_COUNT(optional) CL_START_LINE, LINE_COUNT(optional) @@ CONTEXT
# CHANGE_DETAILS
#
change_diff = self.m.git(
"get change diff",
"diff-tree",
"--no-commit-id",
"--diff-filter=d",
"-U0",
"HEAD",
"--",
filename,
stdout=self.m.raw_io.output(),
).stdout
change_line_ranges = self._get_ranges_from_diff(
change_diff, include_before=False, include_after=True
)
self.m.step.active_result.presentation.logs["change_line_ranges"] = str(
change_line_ranges
)
# The caller should run the formatter within the `with` block that
# called this function.
yield
# This step gets the changed ranges between current CL and formatted
# CL. The diff will have following format:
#
# diff --git a/filename b/filename
# --- a/filename
# --- b/filename
# @@ CL_START_LINE, LINE_COUNT(optional) FORMATTED_CL_START_LINE, LINE_COUNT(optional) @@ CONTEXT
# CHANGE_DETAILS
#
formatted_diff = self.m.git(
"get formatted diff",
"diff-index",
"--no-commit-id",
"--diff-filter=d",
"-U0",
"HEAD",
"--",
filename,
stdout=self.m.raw_io.output(),
).stdout
# range_tree contains ranges of changed lines in current cl compared to its parent commit
# range_index contains ranges of changed lines in current cl that changed by formatter
# the intersection of 'range_tree' and 'range_index' will reveal the ranges in the cl
# affected by formatter.
formatted_line_ranges = self._get_ranges_from_diff(
formatted_diff, include_before=True, include_after=False
)
self.m.step.active_result.presentation.logs["formatted_line_ranges"] = str(
formatted_line_ranges
)
# Ideally we'd have a generic way to support self.suggest_fx == False in this
# function. However today there's only one analyzer that actually needs this,
# and restructuring the code around this use case would add complexity on net.
# If we start supporting this for many analyzers we should reconsider, perhaps
# by having a class per analyzer rather than just a function.
intersection = self._intersect_ranges(change_line_ranges, formatted_line_ranges)
if intersection:
self.m.tricium.add_comment(
"Format/%s" % category,
"%s%s" % (FORMATTING_MESSAGE, cmd_format % filename),
filename,
)
self.m.git("reset", "reset", "--hard", "HEAD")
def _Black(self, filename):
with self._diff_format(
"Black",
filename,
cmd_format="black %s\n"
"If black isn't in your PATH, see http://go/fxi-cookbook#getting-the-infra-source-code",
):
self.m.step("black", [self.black, filename])
def _FidlFormat(self, filename):
assert self.checkout
# Fidl test files often purposefully formatted in unrecommended ways
# so they should be skipped.
if str(filename).endswith(".test.fidl"):
return
with self._diff_format("FidlFormat", filename):
with self.m.step.nest("fidl-format"):
fidl_format_path = self._build_results.tool("fidl-format")
self.m.step("run", [fidl_format_path, "-i", filename])
def _CmlFormat(self, filename):
assert self.checkout
with self._diff_format("CmlFormat", filename):
with self.m.step.nest("cmc"):
cmc_path = self._build_results.tool("cmc")
self.m.step(
"run",
[cmc_path, "format", "--cml", "--in-place", filename],
)
def _GoFmt(self, filename):
with self._diff_format("GoFmt", filename):
with self.m.step.nest("gofmt"):
if not self.gofmt:
self.gofmt = self._build_results.tool("gofmt")
self.m.step("run", [self.gofmt, "-w", "-s", filename])
def _GNFormat(self, filename):
with self._diff_format("GNFormat", filename):
with self.m.step.nest("gn format"):
self.m.step("run", [self._build_results.tool("gn"), "format", filename])
def _RustFmt(self, filename):
with self._diff_format("RustFmt", filename):
with self.m.step.nest("rustfmt"):
self.m.step(
"run",
[
self._build_results.tool("rustfmt"),
"--config-path={}".format(
self.checkout.root_dir.join("rustfmt.toml")
),
"--unstable-features",
"--skip-children",
filename,
],
)
def _Yapf(self, filename):
cmd_format = "fx format-code --files=%s"
if not self.suggest_fx:
cmd_format = "yapf --in-place %s"
with self._diff_format("YAPF", filename, cmd_format):
with self.m.step.nest("yapf"):
if not self.yapf:
self.yapf = self._build_results.tool("yapf")
self.m.step("run", [self.yapf, "--in-place", filename])
def _DartFmt(self, filename):
with self._diff_format("DartFmt", filename):
with self.m.step.nest("dart format"):
self.m.step(
"run", [self._build_results.tool("dart"), "format", filename]
)
def _ClangFormat(self, filename):
with self._diff_format("ClangFormat", filename), self.m.step.nest(
"clang-format"
):
paths = self.m.git(
"get file diff",
"diff",
"-U0",
"--no-color",
"HEAD^",
"--",
filename,
stdout=self.m.raw_io.output(),
)
self.m.python(
name="clang-format-diff.py",
script=self._build_results.tool("clang-format-diff"),
args=[
"-p1",
"-i",
"-style=file",
"-fallback-style=Google",
"-sort-includes",
"-binary",
self._build_results.tool("clang-format"),
],
stdin=self.m.raw_io.input_text(data=paths.stdout),
)
def _capitalize_msg(self, message):
if not message or message[0].isupper():
return message
return message[0].upper() + message[1:]
def _FidlLint(self, filename):
assert self.checkout
# Fidl test files are often purposefully use syntax that does not follow
# linting rules so they should be skipped.
if str(filename).endswith(".test.fidl"):
return
with self.m.step.nest("fidl-lint"):
fidl_lint_path = self._build_results.tool("fidl-lint")
results = self.m.step(
"run",
[fidl_lint_path, "--format=json", filename],
ok_ret=(0, 1),
stdout=self.m.json.output(),
).stdout
for result in results:
capitalized_msg = self._capitalize_msg(result["message"]) + "."
capitalized_desc = ""
for suggestion in result.get("suggestions", ()):
if "description" in suggestion:
capitalized_desc += (
self._capitalize_msg(suggestion["description"]) + ". "
)
if capitalized_desc:
capitalized_msg = capitalized_msg + " " + capitalized_desc[:-1]
self.m.tricium.add_comment(
"Lint/FidlLint",
capitalized_msg,
# All file paths reported to tricium should be relative to the root of the git repo.
# The caller ensures that cwd is the root of the git repo.
self.m.path.relpath(
result["path"], self.m.path.abspath(self.m.context.cwd)
),
start_line=result["start_line"],
start_char=result["start_char"],
end_line=result["end_line"],
end_char=result["end_char"],
)
def _GoVet(self, filename):
with self.m.step.nest("go vet") as presentation:
cwd = self.m.context.cwd
package_dir = cwd.join(self.m.path.dirname(filename))
package_warnings = self._go_vet_package(package_dir)
if not package_warnings:
return
presentation.logs["warnings"] = self.m.json.dumps(
package_warnings, indent=2
).splitlines()
for warning in package_warnings:
warning_file = self.m.path.relpath(warning.path, cwd)
if warning_file != filename:
continue
self.m.tricium.add_comment(
"Lint/GoVet",
warning.message,
# All file paths reported to tricium should be relative
# to the root of the git repo. The caller ensures that
# cwd is the root of the git repo.
filename,
start_line=warning.line,
end_line=warning.line,
start_char=warning.char,
end_char=warning.char,
)
_GoVetWarning = collections.namedtuple("GoVetWarning", "path message line char")
@memoize
def _go_vet_package(self, package_dir):
with self.m.context(cwd=package_dir):
if not self.go:
self.go = self._build_results.tool("go")
step = self.m.step(
"run",
[self.go, "vet", "-json"],
stderr=self.m.raw_io.output(),
ok_ret="any",
)
if step.retcode:
# With the -json flag set, `go vet` will only return a
# non-zero retcode if the Go code is not compilable. If the
# code is actually not compilable by the Fuchsia build
# system then that will be caught in CQ; otherwise it's
# likely just not compilable by the native Go toolchain
# because it relies on generated Go files produced by ninja.
# So we can skip vetting this code, since Tricium warnings
# are best-effort anyway.
step.presentation.step_text = "failed to compile, skipping"
return None
stderr_lines = step.stderr.splitlines()
step.presentation.logs["stderr"] = stderr_lines
# Unfortunately `go vet -json` does not output only valid JSON, so
# we have to parse the output manually.
# Look at the test cases in examples/ for the expected output format.
parsed_output = None
current_entry_lines = []
for line in stderr_lines:
if current_entry_lines:
current_entry_lines.append(line)
# Ends the JSON object
if line == "}":
parsed_output = self.m.json.loads("\n".join(current_entry_lines))
break
# Empty JSON object
elif line == "{}":
parsed_output = {}
break
# Start new non-empty JSON object
elif line == "{":
assert not current_entry_lines
current_entry_lines.append(line)
assert parsed_output is not None, "invalid go vet output"
go_vet_warnings = []
for package_warnings in parsed_output.values():
# Each package's warnings are grouped by the warning type (e.g.
# "unreachable"), but we don't care about the warning type because
# the full warning message is available for each warning.
for warning in itertools.chain(*package_warnings.values()):
abspath, line, column = warning["posn"].split(":")
go_vet_warnings.append(
self._GoVetWarning(
path=abspath,
line=int(line),
# go vet emits 1-based column indices, but tricium
# expects 0-based.
char=int(column) - 1,
message=warning["message"],
)
)
return go_vet_warnings
def _ClangTidy(self, filename):
assert self.checkout
with self.m.step.nest("clang-tidy"):
clang_tidy = self._build_results.tool("clang-tidy")
clang_tidy_diff = self._build_results.tool("clang-tidy-diff")
warnings_file = self.m.path["cleanup"].join("clang_tidy_fixes.yaml")
diff = self.m.git(
"get file diff",
"diff",
"-U0",
"--no-color",
"HEAD^",
"--",
filename,
stdout=self.m.raw_io.output(),
)
with self.m.context(cwd=self.checkout.root_dir):
clang_tidy_args = [
"-p1",
"-path",
self._build_results.compdb_path,
"-export-fixes",
warnings_file,
"-clang-tidy-binary",
clang_tidy,
]
step_result = self.m.python(
name="clang-tidy-diff.py",
script=clang_tidy_diff,
args=clang_tidy_args,
stdin=self.m.raw_io.input_text(data=diff.stdout),
# This script may return 1 if there are compile
# errors -- that's okay, since this is a linter
# check. We'll log them below.
ok_ret=(0, 1),
venv=self.resource("clang-tidy-diff.vpython"),
)
if step_result.retcode:
self.m.step.active_result.presentation.status = "WARNING"
errors = self._parse_warnings(warnings_file)
self.m.path.mock_add_paths("[START_DIR]/path/to/file.cpp")
# We iterate through all produced error sets...
for check in errors:
# ...and for each check, iterate through all the errors it produced...
for err in errors[check]:
# ...and extract the information from that error for a comment.
error_filepath = self.m.path.abspath(
self._build_results.build_dir.join(
err["DiagnosticMessage"]["FilePath"]
)
)
if (
not self.m.path.exists(error_filepath)
or err["DiagnosticMessage"]["FilePath"] == ""
):
continue # pragma: no cover
# Extract the line and character for this warning.
sline, schar = self._get_line_from_offset(
error_filepath, err["DiagnosticMessage"]["FileOffset"]
)
# Add the comment to Tricium.
self.m.tricium.add_comment(
"Lint/ClangTidy",
"%s: %s"
% (
err["DiagnosticName"],
err["DiagnosticMessage"]["Message"],
),
# All file paths reported to tricium should be relative to the root of the git repo.
# The caller ensures that cwd is the root of the git repo.
self.m.path.relpath(
str(err["DiagnosticMessage"]["FilePath"]),
self.m.path.abspath(self.m.context.cwd),
),
start_line=sline,
start_char=schar,
end_line=sline,
end_char=schar,
)
def _Json5Format(self, filename):
assert self.checkout
with self._diff_format("Json5Format", filename):
with self.m.step.nest("json5"):
formatjson5_path = self._build_results.tool("formatjson5")
self.m.step(
"run",
[formatjson5_path, "--replace", filename],
)
def _MdLint(self, filename):
output = self._run_mdlint_once()
for finding in output.get(filename, []):
# mdlint's output is already of the format that tricium expects.
self.m.tricium.add_comment(**finding)
@memoize
def _run_mdlint_once(self):
assert self.checkout
mdlint = self._build_results.tool("mdlint")
with self.m.step.nest("mdlint"):
step = self.m.step(
"run",
[
mdlint,
"--root-dir",
"docs",
"--filter-filenames",
"governance/rfcs",
"--enable",
"all",
"--json",
],
stderr=self.m.json.output(),
step_test_data=lambda: self.m.json.test_api.output_stream([], "stderr"),
ok_ret=(0, 1),
)
findings = collections.defaultdict(list)
for finding in step.stderr:
findings[finding["path"]].append(finding)
return findings
def _parse_warnings(self, warnings_file):
"""Parse all warnings output by clang-tidy.
Clang-Tidy issues warnings as follows:
- DiagnosticName: 'check name'
Message: 'error message'
FileOffset: <offset (int)>
FilePath: 'file path'
Replacements:
- FilePath: 'replacement file path'
Offset: <replacement start offset (int)>
Length: <replacement length (int)>
ReplacementText: 'replacement text'
Args:
raw_warnings (str): YAML-encoded warnings as output by the clang-tidy binary
Returns:
A dict of parsed warnings by check.
Schema:
{
'<check name>': [
{
'DiagnosticName': 'check name'
'Message': 'error message',
'StartLine': <error start line (int)>,
'StartChar': <error start char (int)>,
'Replacements': [
{
'File': 'replacement file path',
'StartLine': <replacement start line (int)>,
'StartChar': <replacement start char (int)>,
'EndLine': <replacement end line (int)>,
'EndChar': <replacement end char (int)>,
'Text': 'replacement text'
},
...
]
},
...
],
'<other check name>': [ ... ]
}
"""
self.m.path.mock_add_paths(warnings_file)
if not self.m.path.exists(warnings_file):
return {} # pragma: no cover
parsed_results = self.m.python(
"load yaml",
self.resource("parse_yaml.py"),
args=[warnings_file],
stdout=self.m.json.output(),
).stdout
if not parsed_results:
return {}
all_warnings = {}
for warning in parsed_results["Diagnostics"]:
if warning["DiagnosticName"] not in all_warnings:
all_warnings[warning["DiagnosticName"]] = []
all_warnings[warning["DiagnosticName"]].append(warning)
return all_warnings
def _get_line_from_offset(self, path, offset):
"""Get the file line and char number from a file offset.
Clang-Tidy emits warnings that mark the location of the error by the char
offset from the beginning of the file. This converts that number into a line
and char position.
Args:
path (str): Path to file.
offset (int): Offset to convert.
"""
file_data = self._read_file(path)
line = 1
char = 0
for i, c in enumerate(file_data):
if c == "\n":
line += 1
char = 0
else:
char += 1
if i + 1 == offset:
return line, char
return 0, 0
@memoize # Only read a file once even if it has multiple analysis errors.
def _read_file(self, path):
return self.m.file.read_text(
"read %s" % path,
path,
test_data="""test
d
newlineoutput""",
)
def _get_ranges_from_diff(self, diff, include_before=False, include_after=False):
"""Compute sequence of ranges of changed lines from diff.
The diff *must* contain only one file.
Args:
diff (str): Unified diff.
include_before (str): Whether to include line ranges from the
base of the diff (i.e., before the changes in the diff were
made).
include_after (str): Whether to include line ranges from the
target of the diff (i.e., after the changes in the diff were
made).
"""
ranges = []
found_filename = False
for line in diff.splitlines():
if self._FILENAME_RE.search(line):
assert not found_filename, "diff contains multiple files"
found_filename = True
match = self._CHUNK_RE.search(line)
if not match:
continue
if include_before:
start_line = int(match.group("before_line"))
line_count = 1
if match.group("before_count"):
line_count = int(match.group("before_count"))
ranges.append(Range(start_line, start_line + line_count))
if include_after:
start_line = int(match.group("after_line"))
line_count = 1
if match.group("after_count"):
line_count = int(match.group("after_count"))
ranges.append(Range(start_line, start_line + line_count))
return ranges
def _intersect_ranges(self, ranges1, ranges2):
"""Given two lists of line ranges, find their intersection.
Each range *includes* its start and end lines.
Assumes that within each list, the ranges are non-overlapping and
sorted in increasing order.
Example:
ranges1: [(1, 5), (7, 12), (100, 101)]
ranges2: [(2, 8), (8, 9)]
output: [(2, 5), (7, 9)]
"""
ranges = []
i1 = i2 = 0
while i1 < len(ranges1) and i2 < len(ranges2):
r1, r2 = ranges1[i1], ranges2[i2]
# We found a pair of overlapping ranges, so record a new range
# corresponding to the overlap between the two.
if r1.end >= r2.start and r2.end >= r1.start:
points = sorted([r1.start, r1.end, r2.start, r2.end])
ranges.append(Range(points[1], points[2]))
if r1.end < r2.end:
i1 += 1
else:
i2 += 1
# If one range ends at the same line that the next range starts, merge
# them into a single range.
merged_ranges = []
i = 0
while i < len(ranges):
if i + 1 < len(ranges) and ranges[i].end == ranges[i + 1].start:
merged = Range(ranges[i].start, ranges[i + 1].end)
merged_ranges.append(merged)
i += 2
else:
merged_ranges.append(ranges[i])
i += 1
return merged_ranges
def check_commit_message(self):
"""Checks if the "Commit-Message-has-tags" Gerrit label is unset."""
with self.m.step.nest("check commit tags"):
# If commit message tags are required for the repo, the label value
# will always be a non-null dict. The dict will be empty if the
# label is unset.
if (
self._gerrit_change()["labels"].get("Commit-Message-has-tags", None)
== {}
):
self.m.tricium.add_comment(
"Format/CommitTag",
MISSING_COMMIT_TAG_MESSAGE,
"",
)
@memoize
def _gerrit_change(self):
change = self.m.buildbucket.build.input.gerrit_changes[0]
details = self.m.gerrit.change_details(
name="get change details",
change_id=str(change.change),
# Retrieve full commit message for all revisions, since the patchset
# that triggered this build may not be the current (latest) patchset
# so we'll need to retrieve the commit message from an older
# patchset.
query_params=["ALL_COMMITS", "ALL_REVISIONS"],
host=change.host,
test_data=self.m.json.test_api.output(
{
"labels": {},
"current_revision": "123abc",
"revisions": {
"123abc": {
"_number": change.patchset,
"commit": {"message": "[foo] Add tests"},
}
},
}
),
).json.output
# Gerrit's "change details" endpoint doesn't support requesting a
# specific patchset, so the "current_revision" field will always point
# to the latest patchset available, even if it's newer than the
# patchset that triggered the current build. So make sure that we only
# look at the patchset that triggered this build.
for sha, revision in details["revisions"].items():
if revision["_number"] == change.patchset:
details["current_revision"] = sha
return details
def _commit_message(self):
change = self._gerrit_change()
current_revision = change["current_revision"]
return change["revisions"][current_revision]["commit"]["message"]