# Copyright 2019 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
from contextlib import contextmanager
import functools
import itertools
import re

from recipe_engine import recipe_api

Range = collections.namedtuple("Range", "start end")

FORMATTING_MESSAGE = """File not formatted properly.
Run the following to format:

"""

MISSING_COMMIT_TAG_MESSAGE = 'The change description should start with a commit tag like "[tag] Change Description".'

# Skip the inclusivity check on a whole section. Please do not change the order of these lines.
INCLUSIVE_DISABLE_RE = re.compile(r"inclusive-language:\s*disable")
INCLUSIVE_ENABLE_RE = re.compile(r"inclusive-language:\s*enable")

INCLUSIVE_IGNORE_RE = re.compile(r"inclusive-language:\s*ignore")

# This list is the backup copy of the canonical source file:
# //tools/mdlint/rules/respectful_code_words.json which implements
# https://fuchsia.dev/fuchsia-src/contribute/respectful_code
# This version of the file was copied from
# https://fuchsia.googlesource.com/fuchsia/+/e9939747816166d1b72a255bd9648e99b1da9f71/tools/mdlint/rules/respectful_code_words.json.
# inclusive-language: disable
INCLUSIVE_WORD_REPLACEMENTS = {
    "blackhat": ["illegal", "unethical"],
    "blacklist": ["denylist", "blocklist"],
    "blacklisted": ["denied", "blocked"],
    "blacklisting": ["denying", "blocking"],
    "blacklists": ["denylists", "blocklists"],
    "citizen": ["priority"],
    "citizens": ["priorities"],
    "cop": ["build gardener", "build monitor", "supervisor", "primary"],
    "cops": ["build gardeners", "build monitors", "supervisors", "primaries"],
    "crazier": ["unexpected", "catastrophic", "incoherent"],
    "crazies": ["unexpected", "catastrophes", "incoherences"],
    "craziest": ["unexpected", "catastrophic", "incoherent"],
    "crazy": ["unexpected", "catastrophic", "incoherent"],
    "cripple": ["slow down"],
    "crippled": ["slowed down"],
    "cripples": ["slows down"],
    "crippling": ["slowing down"],
    "dummies": ["placeholders", "samples", "copies", "prototypes", "mock-up"],
    "dummy": ["placeholder", "sample", "copy", "prototype", "mock-up"],
    "ghetto": ["no suggestion"],
    "grandfather": [
        "legacy clause",
        "exempt",
        "existing",
        "holdover",
        "carryover",
        "baseline",
    ],
    "grandfathered": [
        "legacy",
        "exempt",
        "existing",
        "holdover",
        "carryover",
        "baseline",
    ],
    "grandfathering": [
        "legacy clause",
        "exempt",
        "existing",
        "holdover",
        "carryover",
        "baseline",
    ],
    "grandfathers": [
        "legacy clauses",
        "exempt",
        "existing",
        "holdovers",
        "carryovers",
        "baselines",
    ],
    "guru": ["expert", "teacher"],
    "insane": ["unexpected", "catastrophic", "incoherent"],
    "man-hour": ["work hour", "person hour"],
    "man-in-the-middle": ["person-in-the-middle"],
    "manned": ["staffed", "attended to", "crewed"],
    "manning": ["staffing", "attending to"],
    "manpower": ["workforce", "staff"],
    "master": ["main", "primary"],
    "masters": ["mains", "primaries"],
    "native": ["core", "built-in", "machine code", "platform-specific"],
    "pow-wow": ["meeting", "huddle", "talk", "summit"],
    "powwow": ["meeting", "huddle", "talk", "summit"],
    "primitive": ["alpha", "nascent"],
    "redline": ["priority line", "memory limit", "maximum"],
    "redlined": ["hit the maximum", "hit the memory limit"],
    "redlining": ["hitting the maximum", "hitting the memory limit"],
    "sane": ["valid", "sound", "rational", "sensible"],
    "sanity": [
        "check",
        "quick check",
        "confidence check",
        "coherence check",
        "calibration check",
    ],
    "slave": ["secondary", "replica", "subsidiary"],
    "slaves": ["secondaries", "replicas", "subsidiaries"],
    "whitehat": ["ethical"],
    "whitelist": ["allowlist", "safelist", "approvelist"],
    "whitelisted": ["allowlisted", "safelisted", "approvelisted"],
    "whitelisting": ["allowlisting", "safelisting", "approvelisting"],
    "whitelists": ["allowlists", "safelists", "approvelists"],
}
# inclusive-language: enable


def _analyzer_name(analyzer_func):
    """Return a normalized name for the analyzer function."""
    return analyzer_func.__name__.lstrip("_").lower()


class TriciumAnalyzeApi(recipe_api.RecipeApi):
    """API for running analyses on Tricium."""

    _FILENAME_RE = re.compile(r"^\+\+\+\ [^/]+/(.*)")
    _CHUNK_RE = re.compile(
        r"^@@ \-(?P<before_line>\d+)(,(?P<before_count>\d+))? \+(?P<after_line>\d+)(,(?P<after_count>\d+))?",
    )

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._ext_to_analyzers = {
            ".c": [self._ClangFormat, self._ClangTidy],
            ".cc": [self._ClangFormat, self._ClangTidy],
            ".cml": [self._CmlFormat],
            ".cpp": [self._ClangFormat, self._ClangTidy],
            ".dart": [self._DartFmt],
            ".h": [self._ClangFormat, self._ClangTidy],
            ".hh": [self._ClangFormat, self._ClangTidy],
            ".hpp": [self._ClangFormat, self._ClangTidy],
            ".fidl": [self._FidlFormat, self._FidlLint],
            ".gn": [self._GNFormat],
            ".gni": [self._GNFormat],
            ".go": [self._GoFmt, self._GoVet],
            ".md": [self._MdLint],
            ".py": [self._Black, self._Yapf],
            ".rs": [self._RustFmt],
            ".star": [self._Yapf],
            ".ts": [self._ClangFormat],
            ".triage": [self._Json5Format],
        }

        # Analyzer tools that are always produced as part of a Fuchsia built,
        # rather than distributed as prebuilts.
        self._built_tools = [
            "cmc",
            "fidl-format",
            "fidl-lint",
            "formatjson5",
            "mdlint",
        ]

        # The paths to these tools may be set directly by the recipe.
        self.black = None
        self.go = None
        self.gofmt = None
        self.yapf = None

        self.checkout = None
        self.build_results = None
        # Whether to suggest the use of the fx tool.
        # The tool only works properly when run in fuchsia.git or one of its sub-directories.
        self.suggest_fx = True
        # Used to fail/pass builds.
        self.has_comments = False

    def __call__(
        self,
        filenames,
        enabled_analyzers,
        enabled_luci_analyzers=(),
    ):
        """Check for errors in the given files.

        Runs the relevant language's analyzers over each file and posts
        Tricium comments if errors are found.

        Args:
            filenames (seq(str)): Relative paths files in the checkout. Must
                be strings rather than Path objects.
            enabled_analyzers (seq(str)): Names of enabled analyzers.
            enabled_luci_analyzers (seq(str)): Names of enabled analyzers
                supported by the recipe engine's tricium recipe module.
        """
        # reset to false on every execution..
        self.has_comments = False
        enabled_analyzers = [a.lower() for a in enabled_analyzers]

        if self.build_results:  # pragma: no cover
            assert self.checkout, "`build_results` must be set with `checkout`"

        with self.m.step.defer_results():
            with self.m.step.nest("check for inclusivity"):
                self._check_for_inclusivity("", contents=self._commit_message())
                for filename in filenames:
                    self._check_for_inclusivity(filename)
            if enabled_luci_analyzers:
                with self.m.step.nest("run luci analyzers"):
                    self._run_luci_analyzers(
                        enabled_luci_analyzers,
                        # cwd is None if we're still in start_dir.
                        checkout_dir=self.m.context.cwd or self.m.path["start_dir"],
                        filenames=filenames,
                    )

            # TODO(fxbug.dev/82386): Move most analyzer logic into staticlints.
            if "staticlints" in enabled_analyzers:
                # staticlints runs on all files at once.
                self._run_staticlints(filenames)

            for filename in filenames:
                analyzers = self._analyzers_for_file(filename, enabled_analyzers)
                if not analyzers:
                    continue
                with self.m.step.nest(f"analyze {filename}"):
                    for analyzer_func in analyzers:
                        analyzer_func(filename)
            self.m.tricium.write_comments()

    def _check_for_inclusivity(self, filename, contents=""):
        if not contents:
            contents = self.m.file.read_text(
                f"read {filename}",
                self.m.context.cwd.join(filename),
                include_log=False,
            )
            change_diff = self.m.git(
                f"get change diff for {filename}",
                "diff-tree",
                "--no-commit-id",
                "--diff-filter=d",
                "-U0",
                "HEAD",
                "--",
                filename,
                stdout=self.m.raw_io.output_text(),
            ).stdout
            change_line_ranges = self._get_ranges_from_diff(
                change_diff, include_before=False, include_after=True
            )
        else:
            change_line_ranges = [Range(1, len(contents.splitlines()) + 1)]
        content_lines = contents.splitlines()

        # Ideally, we would read the canonical source for inclusive words:
        # https://source.corp.google.com/fuchsia/tools/mdlint/rules/respectful_code_words.json
        # However, that file resides in the Fuchsia repository, and not all
        # recipes check out that repo. Therefore, we default to the hard-coded
        # version and use the canonical one if present.
        inclusive_word_replacements = INCLUSIVE_WORD_REPLACEMENTS
        inclusive_file = self.checkout.root_dir.join(
            "tools", "mdlint", "rules", "respectful_code_words.json"
        )
        if self.m.path.exists(inclusive_file):
            inclusive_word_replacements = self.m.file.read_json(
                name="reading inclusive words file",
                source=inclusive_file,
                test_data={"foo": ["bar", "baz"], "master": ["main", "primary"]},
                include_log=False,
            )

        enabled = True
        line_indices_per_word = {}
        for i, line in enumerate(content_lines):
            if not enabled:
                if not INCLUSIVE_ENABLE_RE.search(line):
                    continue
                enabled = True
            if INCLUSIVE_DISABLE_RE.search(line):
                enabled = False
                continue
            if INCLUSIVE_IGNORE_RE.search(line):
                continue

            # The line indices used by tricium and in change_diff start with 1, so
            # add 1 to get the 1-based index.
            line_index = i + 1

            # Exclude URLs in the check.
            url_regex = re.compile(r"\w+://[^\s]*")
            line_without_urls = url_regex.sub("", line)

            for word in inclusive_word_replacements:
                word_regex = re.compile(r"\b%s\b" % word)
                line_indices = line_indices_per_word.get(word, [])
                if word_regex.search(line_without_urls):
                    for r in change_line_ranges:
                        if line_index >= r.start and line_index < r.end:
                            line_indices.append(line_index)
                            break
                line_indices_per_word[word] = line_indices

        for word, replacements in inclusive_word_replacements.items():
            line_indices = line_indices_per_word.get(word, [])
            # If a non-inclusive word appears too many times in a file, combine
            # all occurrences into one comment to avoid overwhelming Tricium
            # with too many comments.
            if len(line_indices) > 3:
                self.m.tricium.add_comment(
                    "Inclusivity",
                    f"Please avoid '{word}' found on lines {str(line_indices)}. Suggested replacements: {str(replacements)}.\n\nSee https://fuchsia.dev/fuchsia-src/contribute/respectful_code",
                    filename,
                )
            else:
                for i in line_indices:
                    self.m.tricium.add_comment(
                        "Inclusivity",
                        f"Please avoid '{word}'. Suggested replacements: {str(replacements)}.\n\nSee https://fuchsia.dev/fuchsia-src/contribute/respectful_code",
                        filename,
                        start_line=i,
                    )

    def _run_luci_analyzers(self, enabled_luci_analyzers, checkout_dir, filenames):
        all_analyzers = self.m.tricium.analyzers.by_name()
        self.m.tricium.run_legacy(
            [all_analyzers[name] for name in enabled_luci_analyzers],
            input_base=checkout_dir,
            affected_files=filenames,
            commit_message=self._commit_message(),
            # Don't emit comments yet. We'll handle that ourselves after running
            # non-LUCI analyzers.
            emit=False,
        )

    def _analyzers_for_file(self, filename, enabled_analyzers):
        assert isinstance(
            filename, str
        ), "filenames must be string paths relative to the checkout"
        _, ext = self.m.path.splitext(filename)
        return [
            analyzer_func
            for analyzer_func in self._ext_to_analyzers.get(ext, [])
            if _analyzer_name(analyzer_func) in enabled_analyzers
        ]

    def _run_staticlints(self, filenames):
        assert self.checkout
        step = self.m.step(
            "run staticlints",
            [
                self.build_results.tool("staticlints"),
                "-checkout-dir",
                self.checkout.root_dir,
                "-build-dir",
                self.build_results.build_dir,
                "-files-json",
                self.m.json.input([{"path": f} for f in filenames]),
                "-output-json",
                # If the command fails then the output might be empty, so don't
                # assume it's valid JSON.
                self.m.raw_io.output(add_output_log=True),
            ],
            # We don't want to raise an exception until after emitting any
            # comments. As of 2022-02 the Tricium service ignores comments from
            # failed builds, but that is likely to change in the future so we do
            # want to make a best effort at emitting comments even if some
            # analysis steps fail.
            ok_ret="any",
        )
        findings = []
        if step.raw_io.output.strip():
            findings = self.m.json.loads(step.raw_io.output)
        for finding in findings:
            # staticlints emits JSON objects whose fields correspond exactly to
            # those of the Tricium comment schema.
            self.m.tricium.add_comment(**finding)
        if step.retcode:
            step.presentation.status = self.m.step.FAILURE
            self.m.step.raise_on_failure(step)

    @contextmanager
    def _diff_format(self, category, filename, cmd_format="fx format-code --files=%s"):
        """Checks for diffs after running an auto-formatter.

        If there's a diff in the lines that were touched by the CL under
        test, adds a comment on the CL.
        """
        # This step gets the changed ranges between parent commit and current CL.
        # The diff will have following format:
        #
        # diff --git a/filename b/filename
        # --- a/filename
        # --- b/filename
        # @@ PARENT_START_LINE, LINE_COUNT(optional) CL_START_LINE, LINE_COUNT(optional) @@ CONTEXT
        # CHANGE_DETAILS
        #
        change_diff = self.m.git(
            "get change diff",
            "diff-tree",
            "--no-commit-id",
            "--diff-filter=d",
            "-U0",
            "HEAD",
            "--",
            filename,
            stdout=self.m.raw_io.output_text(),
        ).stdout
        change_line_ranges = self._get_ranges_from_diff(
            change_diff, include_before=False, include_after=True
        )
        self.m.step.active_result.presentation.logs["change_line_ranges"] = str(
            change_line_ranges
        )

        # The caller should run the formatter within the `with` block that
        # called this function.
        yield

        # This step gets the changed ranges between current CL and formatted
        # CL. The diff will have following format:
        #
        # diff --git a/filename b/filename
        # --- a/filename
        # --- b/filename
        # @@ CL_START_LINE, LINE_COUNT(optional) FORMATTED_CL_START_LINE, LINE_COUNT(optional) @@ CONTEXT
        # CHANGE_DETAILS
        #
        formatted_diff = self.m.git(
            "get formatted diff",
            "diff-index",
            "--no-commit-id",
            "--diff-filter=d",
            "-U0",
            "HEAD",
            "--",
            filename,
            stdout=self.m.raw_io.output_text(),
        ).stdout
        # range_tree contains ranges of changed lines in current cl compared to its parent commit
        # range_index contains ranges of changed lines in current cl that changed by formatter
        # the intersection of 'range_tree' and 'range_index' will reveal the ranges in the cl
        # affected by formatter.
        formatted_line_ranges = self._get_ranges_from_diff(
            formatted_diff, include_before=True, include_after=False
        )
        self.m.step.active_result.presentation.logs["formatted_line_ranges"] = str(
            formatted_line_ranges
        )

        # Ideally we'd have a generic way to support self.suggest_fx == False in this
        # function. However today there's only one analyzer that actually needs this,
        # and restructuring the code around this use case would add complexity on net.
        # If we start supporting this for many analyzers we should reconsider, perhaps
        # by having a class per analyzer rather than just a function.
        intersection = self._intersect_ranges(change_line_ranges, formatted_line_ranges)
        if intersection:
            self.has_comments = True
            self.m.tricium.add_comment(
                f"Format/{category}",
                f"{FORMATTING_MESSAGE}{cmd_format % filename}",
                filename,
            )
        self.m.git("reset", "reset", "--hard", "HEAD")

    def _Black(self, filename):
        with self._diff_format(
            "Black",
            filename,
            cmd_format="black %s\n"
            "If black isn't in your PATH, see http://go/fxi-cookbook#getting-the-infra-source-code",
        ):
            self.m.step("black", [self.black, filename])

    def _FidlFormat(self, filename):
        assert self.checkout
        # Fidl test files often purposefully formatted in unrecommended ways
        # so they should be skipped.
        if str(filename).endswith(".test.fidl"):
            return
        with self._diff_format("FidlFormat", filename):
            with self.m.step.nest("fidl-format"):
                fidl_format_path = self.build_results.tool("fidl-format")
                self.m.step("run", [fidl_format_path, "-i", filename])

    def _CmlFormat(self, filename):
        assert self.checkout
        with self._diff_format("CmlFormat", filename):
            with self.m.step.nest("cmc"):
                cmc_path = self.build_results.tool("cmc")
                self.m.step(
                    "run",
                    [cmc_path, "format", "--in-place", filename],
                )

    def _GoFmt(self, filename):
        with self._diff_format("GoFmt", filename):
            with self.m.step.nest("gofmt"):
                if not self.gofmt:
                    self.gofmt = self.build_results.tool("gofmt")
                self.m.step("run", [self.gofmt, "-w", "-s", filename])

    def _GNFormat(self, filename):
        with self._diff_format("GNFormat", filename):
            with self.m.step.nest("gn format"):
                self.m.step("run", [self.build_results.tool("gn"), "format", filename])

    def _RustFmt(self, filename):
        assert self.checkout
        with self._diff_format("RustFmt", filename):
            with self.m.step.nest("rustfmt"):
                self.m.step(
                    "run",
                    [
                        self.build_results.tool("rustfmt"),
                        f"--config-path={self.checkout.root_dir.join('rustfmt.toml')}",
                        "--unstable-features",
                        "--skip-children",
                        filename,
                    ],
                )

    def _Yapf(self, filename):
        cmd_format = "fx format-code --files=%s"
        if not self.suggest_fx:
            cmd_format = "yapf --in-place %s"
        with self._diff_format("YAPF", filename, cmd_format):
            with self.m.step.nest("yapf"):
                if not self.yapf:
                    self.yapf = self.build_results.tool("yapf")
                self.m.step("run", [self.yapf, "--in-place", filename])

    def _DartFmt(self, filename):
        with self._diff_format("DartFmt", filename):
            with self.m.step.nest("dart format"):
                self.m.step(
                    "run", [self.build_results.tool("dart"), "format", filename]
                )

    def _ClangFormat(self, filename):
        with self._diff_format(
            "ClangFormat",
            filename,
            cmd_format="fx format-code --changed-lines --files=%s",
        ), self.m.step.nest("clang-format"):
            paths = self.m.git(
                "get file diff",
                "diff",
                "-U0",
                "--no-color",
                "HEAD^",
                "--",
                filename,
                stdout=self.m.raw_io.output_text(),
            )

            self.m.python3(
                "clang-format-diff.py",
                [
                    self.build_results.tool("clang-format-diff"),
                    "-p1",
                    "-i",
                    "-style=file",
                    "-fallback-style=Google",
                    "-sort-includes",
                    "-binary",
                    self.build_results.tool("clang-format"),
                ],
                stdin=self.m.raw_io.input_text(data=paths.stdout),
            )

    def _capitalize_msg(self, message):
        if not message or message[0].isupper():
            return message
        return message[0].upper() + message[1:]

    def _FidlLint(self, filename):
        assert self.checkout
        # Fidl test files are often purposefully use syntax that does not follow
        # linting rules so they should be skipped.
        if str(filename).endswith(".test.fidl"):
            return

        with self.m.step.nest("fidl-lint"):
            fidl_lint_path = self.build_results.tool("fidl-lint")
            results = self.m.step(
                "run",
                [fidl_lint_path, "--format=json", filename],
                ok_ret=(0, 1),
                stdout=self.m.json.output(),
            ).stdout

            for result in results:
                capitalized_msg = self._capitalize_msg(result["message"]) + "."
                capitalized_desc = ""
                for suggestion in result.get("suggestions", ()):
                    if "description" in suggestion:
                        capitalized_desc += (
                            self._capitalize_msg(suggestion["description"]) + ". "
                        )
                if capitalized_desc:
                    capitalized_msg = capitalized_msg + " " + capitalized_desc[:-1]
                result["message"] = capitalized_msg
                # fidl-lint's JSON output already conforms to the Tricium
                # comment schema so there's no need to parse it.
                self.m.tricium.add_comment(**result)

    def _GoVet(self, filename):
        with self.m.step.nest("go vet") as presentation:
            cwd = self.m.context.cwd
            package_dir = cwd.join(self.m.path.dirname(filename))
            package_warnings = self._go_vet_package(package_dir)
            if not package_warnings:
                return
            presentation.logs["warnings"] = self.m.json.dumps(
                package_warnings, indent=2
            ).splitlines()

        for warning in package_warnings:
            warning_file = self.m.path.relpath(warning.path, cwd)
            if warning_file != filename:
                continue
            self.m.tricium.add_comment(
                "Lint/GoVet",
                warning.message,
                # All file paths reported to tricium should be relative
                # to the root of the git repo. The caller ensures that
                # cwd is the root of the git repo.
                filename,
                start_line=warning.line,
                end_line=warning.line,
                start_char=warning.char,
                end_char=warning.char + 1,
            )

    _GoVetWarning = collections.namedtuple("GoVetWarning", "path message line char")

    @functools.lru_cache(maxsize=None)
    def _go_vet_package(self, package_dir):
        with self.m.context(cwd=package_dir):
            if not self.go:
                self.go = self.build_results.tool("go")
            step = self.m.step(
                "run",
                [self.go, "vet", "-json"],
                stderr=self.m.raw_io.output_text(),
                ok_ret="any",
            )
            if step.retcode:
                # With the -json flag set, `go vet` will only return a
                # non-zero retcode if the Go code is not compilable. If the
                # code is actually not compilable by the Fuchsia build
                # system then that will be caught in CQ; otherwise it's
                # likely just not compilable by the native Go toolchain
                # because it relies on generated Go files produced by ninja.
                # So we can skip vetting this code, since Tricium warnings
                # are best-effort anyway.
                step.presentation.step_text = "failed to compile, skipping"
                return None

        stderr_lines = step.stderr.splitlines()
        step.presentation.logs["stderr"] = stderr_lines
        # Unfortunately `go vet -json` does not output only valid JSON, so
        # we have to parse the output manually.
        # Look at the test cases in examples/ for the expected output format.
        parsed_output = None
        current_entry_lines = []
        for line in stderr_lines:
            if current_entry_lines:
                current_entry_lines.append(line)
                # Ends the JSON object
                if line == "}":
                    parsed_output = self.m.json.loads("\n".join(current_entry_lines))
                    break
            # Empty JSON object
            elif line == "{}":
                parsed_output = {}
                break
            # Start new non-empty JSON object
            elif line == "{":
                assert not current_entry_lines
                current_entry_lines.append(line)

        assert parsed_output is not None, "invalid go vet output"

        go_vet_warnings = []
        for package_warnings in parsed_output.values():
            # Each package's warnings are grouped by the warning type (e.g.
            # "unreachable"), but we don't care about the warning type because
            # the full warning message is available for each warning.
            for warning in itertools.chain(*package_warnings.values()):
                abspath, line, column = warning["posn"].split(":")
                go_vet_warnings.append(
                    self._GoVetWarning(
                        path=abspath,
                        line=int(line),
                        # go vet emits 1-based column indices, but tricium
                        # expects 0-based.
                        char=int(column) - 1,
                        message=warning["message"],
                    )
                )

        return go_vet_warnings

    def _ClangTidy(self, filename):
        assert self.checkout

        with self.m.step.nest("clang-tidy"):
            clang_tidy = self.build_results.tool("clang-tidy")
            clang_tidy_diff = self.build_results.tool("clang-tidy-diff")
            warnings_file = self.m.path["cleanup"].join("clang_tidy_fixes.yaml")

            diff = self.m.git(
                "get file diff",
                "diff",
                "-U0",
                "--no-color",
                "HEAD^",
                "--",
                filename,
                stdout=self.m.raw_io.output_text(),
            )

            with self.m.context(cwd=self.checkout.root_dir):
                clang_tidy_args = [
                    "-p1",
                    "-path",
                    self.build_results.compdb_path,
                    "-export-fixes",
                    warnings_file,
                    "-clang-tidy-binary",
                    clang_tidy,
                ]

                step_result = self.m.step(
                    name="clang-tidy-diff.py",
                    cmd=[
                        "vpython3",
                        "-vpython-spec",
                        self.resource("clang-tidy-diff.vpython"),
                        clang_tidy_diff,
                    ]
                    + clang_tidy_args,
                    stdin=self.m.raw_io.input_text(data=diff.stdout),
                    # This script may return 1 if there are compile
                    # errors -- that's okay, since this is a linter
                    # check. We'll log them below.
                    ok_ret=(0, 1),
                )

                if step_result.retcode:
                    self.m.step.active_result.presentation.status = "WARNING"
                errors = self._parse_warnings(warnings_file)

                self.m.path.mock_add_paths(
                    self.checkout.root_dir.join("path", "to", "file.cpp")
                )
                # We iterate through all produced error sets...
                for check in errors:
                    # ...and for each check, iterate through all the errors it produced...
                    for err in errors[check]:
                        # ...and extract the information from that error for a comment.
                        error_filepath = self.m.path.abspath(
                            self.build_results.build_dir.join(
                                err["DiagnosticMessage"]["FilePath"]
                            )
                        )
                        if (
                            not self.m.path.exists(error_filepath)
                            or err["DiagnosticMessage"]["FilePath"] == ""
                        ):
                            continue  # pragma: no cover

                        # Extract the line and character for this warning.
                        sline, schar = self._get_line_from_offset(
                            error_filepath, err["DiagnosticMessage"]["FileOffset"]
                        )
                        end_line, end_char = sline, schar + 1
                        if (sline, schar) == (0, 0):
                            end_line, end_char = 0, 0  # file level comment.

                        # Add the comment to Tricium.
                        self.m.tricium.add_comment(
                            "Lint/ClangTidy",
                            f"{err['DiagnosticName']}: {err['DiagnosticMessage']['Message']}",
                            # All file paths reported to tricium should be relative to the root of the git repo.
                            # The caller ensures that cwd is the root of the git repo.
                            self.m.path.relpath(
                                str(err["DiagnosticMessage"]["FilePath"]),
                                self.m.path.abspath(self.m.context.cwd),
                            ),
                            start_line=sline,
                            start_char=schar,
                            end_line=end_line,
                            end_char=end_char,
                        )

    def _Json5Format(self, filename):
        assert self.checkout
        with self._diff_format("Json5Format", filename):
            with self.m.step.nest("json5"):
                formatjson5_path = self.build_results.tool("formatjson5")
                self.m.step(
                    "run",
                    [formatjson5_path, "--replace", filename],
                )

    def _MdLint(self, filename):
        output = self._run_mdlint_once()
        for finding in output.get(filename, []):
            # mdlint's output is already of the format that tricium expects.
            self.m.tricium.add_comment(**finding)

    @functools.lru_cache(maxsize=None)
    def _run_mdlint_once(self):
        assert self.checkout
        mdlint = self.build_results.tool("mdlint")
        with self.m.step.nest("mdlint"):
            step = self.m.step(
                "run",
                [
                    mdlint,
                    "--root-dir",
                    "docs",
                    "--filter-filenames",
                    "governance/rfcs",
                    "--enable",
                    "all",
                    "--json",
                ],
                stderr=self.m.json.output(),
                step_test_data=lambda: self.m.json.test_api.output_stream([], "stderr"),
                ok_ret=(0, 1),
            )
        findings = collections.defaultdict(list)
        for finding in step.stderr:
            findings[finding["path"]].append(finding)
        return findings

    def _parse_warnings(self, warnings_file):
        """Parse all warnings output by clang-tidy.

        Clang-Tidy issues warnings as follows:
          - DiagnosticName:  'check name'
            Message:         'error message'
            FileOffset:      <offset (int)>
            FilePath:        'file path'
            Replacements:
              - FilePath:        'replacement file path'
                Offset:          <replacement start offset (int)>
                Length:          <replacement length (int)>
                ReplacementText: 'replacement text'

        Args:
          raw_warnings (str): YAML-encoded warnings as output by the clang-tidy binary

        Returns:
          A dict of parsed warnings by check.
            Schema:
              {
                '<check name>': [
                    {
                      'DiagnosticName':   'check name'
                      'Message':          'error message',
                      'StartLine':         <error start line (int)>,
                      'StartChar':         <error start char (int)>,
                      'Replacements': [
                          {
                            'File':       'replacement file path',
                            'StartLine':  <replacement start line (int)>,
                            'StartChar':  <replacement start char (int)>,
                            'EndLine':    <replacement end line (int)>,
                            'EndChar':    <replacement end char (int)>,
                            'Text':       'replacement text'
                          },
                          ...
                        ]
                    },
                    ...
                  ],
                '<other check name>': [ ... ]
              }
        """
        self.m.path.mock_add_paths(warnings_file)
        if not self.m.path.exists(warnings_file):
            return {}  # pragma: no cover
        parsed_results = self.m.yaml.read_file(
            f"load {self.m.path.basename(warnings_file)}", warnings_file
        )
        if not parsed_results:
            return {}
        all_warnings = {}
        for warning in parsed_results["Diagnostics"]:
            if warning["DiagnosticName"] not in all_warnings:
                all_warnings[warning["DiagnosticName"]] = []
            all_warnings[warning["DiagnosticName"]].append(warning)
        return all_warnings

    def _get_line_from_offset(self, path, offset):
        """Get the file line and char number from a file offset.

        Clang-Tidy emits warnings that mark the location of the error by the char
        offset from the beginning of the file. This converts that number into a line
        and char position.

        Args:
            path (str): Path to file.
            offset (int): Offset to convert.
        """
        file_data = self._read_file(path)
        line = 1
        char = 0
        for i, c in enumerate(file_data):
            if c == "\n":
                line += 1
                char = 0
            else:
                char += 1
            if i + 1 == offset:
                return line, char
        return 0, 0

    # Only read a file once even if it has multiple analysis errors.
    @functools.lru_cache(maxsize=None)
    def _read_file(self, path):
        return self.m.file.read_text(
            f"read {path}",
            path,
            test_data="""test
d
newlineoutput""",
        )

    def _get_ranges_from_diff(self, diff, include_before=False, include_after=False):
        """Compute sequence of ranges of changed lines from diff.

        The diff *must* contain only one file.

        Args:
            diff (str): Unified diff.
            include_before (str): Whether to include line ranges from the
                base of the diff (i.e., before the changes in the diff were
                made).
            include_after (str): Whether to include line ranges from the
                target of the diff (i.e., after the changes in the diff were
                made).
        """
        ranges = []
        found_filename = False
        for line in diff.splitlines():
            if self._FILENAME_RE.search(line):
                assert not found_filename, "diff contains multiple files"
                found_filename = True
            match = self._CHUNK_RE.search(line)
            if not match:
                continue
            if include_before:
                start_line = int(match.group("before_line"))
                line_count = 1
                if match.group("before_count"):
                    line_count = int(match.group("before_count"))
                ranges.append(Range(start_line, start_line + line_count))
            if include_after:
                start_line = int(match.group("after_line"))
                line_count = 1
                if match.group("after_count"):
                    line_count = int(match.group("after_count"))
                ranges.append(Range(start_line, start_line + line_count))
        return ranges

    def _intersect_ranges(self, ranges1, ranges2):
        """Given two lists of line ranges, find their intersection.

        Each range *includes* its start and end lines.

        Assumes that within each list, the ranges are non-overlapping and
        sorted in increasing order.

        Example:
            ranges1: [(1, 5), (7, 12), (100, 101)]
            ranges2: [(2, 8), (8, 9)]
            output: [(2, 5), (7, 9)]
        """
        ranges = []
        i1 = i2 = 0
        while i1 < len(ranges1) and i2 < len(ranges2):
            r1, r2 = ranges1[i1], ranges2[i2]
            # We found a pair of overlapping ranges, so record a new range
            # corresponding to the overlap between the two.
            if r1.end >= r2.start and r2.end >= r1.start:
                points = sorted([r1.start, r1.end, r2.start, r2.end])
                ranges.append(Range(points[1], points[2]))
            if r1.end < r2.end:
                i1 += 1
            else:
                i2 += 1

        # If one range ends at the same line that the next range starts, merge
        # them into a single range.
        merged_ranges = []
        i = 0
        while i < len(ranges):
            if i + 1 < len(ranges) and ranges[i].end == ranges[i + 1].start:
                merged = Range(ranges[i].start, ranges[i + 1].end)
                merged_ranges.append(merged)
                i += 2
            else:
                merged_ranges.append(ranges[i])
                i += 1

        return merged_ranges

    def check_commit_message(self):
        """Checks if the "Commit-Message-has-tags" Gerrit label is unset."""
        with self.m.step.nest("check commit tags"):
            # If commit message tags are required for the repo, the label value
            # will always be a non-null dict. The dict will be empty if the
            # label is unset.
            if (
                self._gerrit_change()["labels"].get("Commit-Message-has-tags", None)
                == {}
            ):
                self.m.tricium.add_comment(
                    "Format/CommitTag",
                    MISSING_COMMIT_TAG_MESSAGE,
                    "",
                )

    @functools.lru_cache(maxsize=None)
    def _gerrit_change(self):
        change = self.m.buildbucket.build.input.gerrit_changes[0]
        details = self.m.gerrit.change_details(
            name="get change details",
            change_id=str(change.change),
            # Retrieve full commit message for all revisions, since the patchset
            # that triggered this build may not be the current (latest) patchset
            # so we'll need to retrieve the commit message from an older
            # patchset.
            query_params=["ALL_COMMITS", "ALL_REVISIONS"],
            host=change.host,
            test_data=self.m.json.test_api.output(
                {
                    "labels": {},
                    "current_revision": "123abc",
                    "revisions": {
                        "123abc": {
                            "_number": change.patchset,
                            "commit": {"message": "[foo] Add tests"},
                        }
                    },
                }
            ),
        ).json.output
        # Gerrit's "change details" endpoint doesn't support requesting a
        # specific patchset, so the "current_revision" field will always point
        # to the latest patchset available, even if it's newer than the
        # patchset that triggered the current build. So make sure that we only
        # look at the patchset that triggered this build.
        for sha, revision in details["revisions"].items():
            if revision["_number"] == change.patchset:
                details["current_revision"] = sha
        return details

    def _commit_message(self):
        change = self._gerrit_change()
        current_revision = change["current_revision"]
        return change["revisions"][current_revision]["commit"]["message"]
