blob: 19267a71885186b82c36ceef889d6d0bf00d5a5e [file] [log] [blame] [edit]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Code parsing for coverage.py."""
from __future__ import annotations
import ast
import functools
import collections
import os
import re
import sys
import token
import tokenize
from dataclasses import dataclass
from types import CodeType
from typing import (
cast, Any, Callable, Dict, Iterable, List, Optional, Protocol, Sequence,
Set, Tuple,
)
from coverage import env
from coverage.bytecode import code_objects
from coverage.debug import short_stack
from coverage.exceptions import NoSource, NotPython
from coverage.misc import nice_pair
from coverage.phystokens import generate_tokens
from coverage.types import TArc, TLineNo
class PythonParser:
"""Parse code to find executable lines, excluded lines, etc.
This information is all based on static analysis: no code execution is
involved.
"""
def __init__(
self,
text: str | None = None,
filename: str | None = None,
exclude: str | None = None,
) -> None:
"""
Source can be provided as `text`, the text itself, or `filename`, from
which the text will be read. Excluded lines are those that match
`exclude`, a regex string.
"""
assert text or filename, "PythonParser needs either text or filename"
self.filename = filename or "<code>"
if text is not None:
self.text: str = text
else:
from coverage.python import get_python_source
try:
self.text = get_python_source(self.filename)
except OSError as err:
raise NoSource(f"No source for code: '{self.filename}': {err}") from err
self.exclude = exclude
# The parsed AST of the text.
self._ast_root: ast.AST | None = None
# The normalized line numbers of the statements in the code. Exclusions
# are taken into account, and statements are adjusted to their first
# lines.
self.statements: set[TLineNo] = set()
# The normalized line numbers of the excluded lines in the code,
# adjusted to their first lines.
self.excluded: set[TLineNo] = set()
# The raw_* attributes are only used in this class, and in
# lab/parser.py to show how this class is working.
# The line numbers that start statements, as reported by the line
# number table in the bytecode.
self.raw_statements: set[TLineNo] = set()
# The raw line numbers of excluded lines of code, as marked by pragmas.
self.raw_excluded: set[TLineNo] = set()
# The line numbers of class definitions.
self.raw_classdefs: set[TLineNo] = set()
# The line numbers of docstring lines.
self.raw_docstrings: set[TLineNo] = set()
# Internal detail, used by lab/parser.py.
self.show_tokens = False
# A dict mapping line numbers to lexical statement starts for
# multi-line statements.
self._multiline: dict[TLineNo, TLineNo] = {}
# Lazily-created arc data, and missing arc descriptions.
self._all_arcs: set[TArc] | None = None
self._missing_arc_fragments: TArcFragments | None = None
def lines_matching(self, regex: str) -> set[TLineNo]:
"""Find the lines matching a regex.
Returns a set of line numbers, the lines that contain a match for
`regex`. The entire line needn't match, just a part of it.
Handles multiline regex patterns.
"""
regex_c = re.compile(regex, re.MULTILINE)
matches: set[TLineNo] = set()
last_start = 0
last_start_line = 0
for match in regex_c.finditer(self.text):
start, end = match.span()
start_line = last_start_line + self.text.count('\n', last_start, start)
end_line = last_start_line + self.text.count('\n', last_start, end)
matches.update(self._multiline.get(i, i) for i in range(start_line + 1, end_line + 2))
last_start = start
last_start_line = start_line
return matches
def _raw_parse(self) -> None:
"""Parse the source to find the interesting facts about its lines.
A handful of attributes are updated.
"""
# Find lines which match an exclusion pattern.
if self.exclude:
self.raw_excluded = self.lines_matching(self.exclude)
self.excluded = set(self.raw_excluded)
# The current number of indents.
indent: int = 0
# An exclusion comment will exclude an entire clause at this indent.
exclude_indent: int = 0
# Are we currently excluding lines?
excluding: bool = False
# The line number of the first line in a multi-line statement.
first_line: int = 0
# Is the file empty?
empty: bool = True
# Parenthesis (and bracket) nesting level.
nesting: int = 0
assert self.text is not None
tokgen = generate_tokens(self.text)
for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
if self.show_tokens: # pragma: debugging
print("%10s %5s %-20r %r" % (
tokenize.tok_name.get(toktype, toktype),
nice_pair((slineno, elineno)), ttext, ltext,
))
if toktype == token.INDENT:
indent += 1
elif toktype == token.DEDENT:
indent -= 1
elif toktype == token.OP:
if ttext == ":" and nesting == 0:
should_exclude = (
self.excluded.intersection(range(first_line, elineno + 1))
)
if not excluding and should_exclude:
# Start excluding a suite. We trigger off of the colon
# token so that the #pragma comment will be recognized on
# the same line as the colon.
self.excluded.add(elineno)
exclude_indent = indent
excluding = True
elif ttext in "([{":
nesting += 1
elif ttext in ")]}":
nesting -= 1
elif toktype == token.NEWLINE:
if first_line and elineno != first_line:
# We're at the end of a line, and we've ended on a
# different line than the first line of the statement,
# so record a multi-line range.
for l in range(first_line, elineno+1):
self._multiline[l] = first_line
first_line = 0
if ttext.strip() and toktype != tokenize.COMMENT:
# A non-white-space token.
empty = False
if not first_line:
# The token is not white space, and is the first in a statement.
first_line = slineno
# Check whether to end an excluded suite.
if excluding and indent <= exclude_indent:
excluding = False
if excluding:
self.excluded.add(elineno)
# Find the starts of the executable statements.
if not empty:
byte_parser = ByteParser(self.text, filename=self.filename)
self.raw_statements.update(byte_parser._find_statements())
# The first line of modules can lie and say 1 always, even if the first
# line of code is later. If so, map 1 to the actual first line of the
# module.
if env.PYBEHAVIOR.module_firstline_1 and self._multiline:
self._multiline[1] = min(self.raw_statements)
self.excluded = self.first_lines(self.excluded)
# AST lets us find classes, docstrings, and decorator-affected
# functions and classes.
assert self._ast_root is not None
for node in ast.walk(self._ast_root):
# Find class definitions.
if isinstance(node, ast.ClassDef):
self.raw_classdefs.add(node.lineno)
# Find docstrings.
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef, ast.Module)):
if node.body:
first = node.body[0]
if (
isinstance(first, ast.Expr)
and isinstance(first.value, ast.Constant)
and isinstance(first.value.value, str)
):
self.raw_docstrings.update(
range(first.lineno, cast(int, first.end_lineno) + 1)
)
# Exclusions carry from decorators and signatures to the bodies of
# functions and classes.
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)):
first_line = min((d.lineno for d in node.decorator_list), default=node.lineno)
if self.excluded.intersection(range(first_line, node.lineno + 1)):
self.excluded.update(range(first_line, cast(int, node.end_lineno) + 1))
@functools.lru_cache(maxsize=1000)
def first_line(self, lineno: TLineNo) -> TLineNo:
"""Return the first line number of the statement including `lineno`."""
if lineno < 0:
lineno = -self._multiline.get(-lineno, -lineno)
else:
lineno = self._multiline.get(lineno, lineno)
return lineno
def first_lines(self, linenos: Iterable[TLineNo]) -> set[TLineNo]:
"""Map the line numbers in `linenos` to the correct first line of the
statement.
Returns a set of the first lines.
"""
return {self.first_line(l) for l in linenos}
def translate_lines(self, lines: Iterable[TLineNo]) -> set[TLineNo]:
"""Implement `FileReporter.translate_lines`."""
return self.first_lines(lines)
def translate_arcs(self, arcs: Iterable[TArc]) -> set[TArc]:
"""Implement `FileReporter.translate_arcs`."""
return {(self.first_line(a), self.first_line(b)) for (a, b) in arcs}
def parse_source(self) -> None:
"""Parse source text to find executable lines, excluded lines, etc.
Sets the .excluded and .statements attributes, normalized to the first
line of multi-line statements.
"""
try:
self._ast_root = ast.parse(self.text)
self._raw_parse()
except (tokenize.TokenError, IndentationError, SyntaxError) as err:
if hasattr(err, "lineno"):
lineno = err.lineno # IndentationError
else:
lineno = err.args[1][0] # TokenError
raise NotPython(
f"Couldn't parse '{self.filename}' as Python source: " +
f"{err.args[0]!r} at line {lineno}",
) from err
ignore = self.excluded | self.raw_docstrings
starts = self.raw_statements - ignore
self.statements = self.first_lines(starts) - ignore
def arcs(self) -> set[TArc]:
"""Get information about the arcs available in the code.
Returns a set of line number pairs. Line numbers have been normalized
to the first line of multi-line statements.
"""
if self._all_arcs is None:
self._analyze_ast()
assert self._all_arcs is not None
return self._all_arcs
def _analyze_ast(self) -> None:
"""Run the AstArcAnalyzer and save its results.
`_all_arcs` is the set of arcs in the code.
"""
assert self._ast_root is not None
aaa = AstArcAnalyzer(self._ast_root, self.raw_statements, self._multiline)
aaa.analyze()
self._all_arcs = set()
for l1, l2 in aaa.arcs:
fl1 = self.first_line(l1)
fl2 = self.first_line(l2)
if fl1 != fl2:
self._all_arcs.add((fl1, fl2))
self._missing_arc_fragments = aaa.missing_arc_fragments
@functools.lru_cache()
def exit_counts(self) -> dict[TLineNo, int]:
"""Get a count of exits from that each line.
Excluded lines are excluded.
"""
exit_counts: dict[TLineNo, int] = collections.defaultdict(int)
for l1, l2 in self.arcs():
if l1 < 0:
# Don't ever report -1 as a line number
continue
if l1 in self.excluded:
# Don't report excluded lines as line numbers.
continue
if l2 in self.excluded:
# Arcs to excluded lines shouldn't count.
continue
exit_counts[l1] += 1
# Class definitions have one extra exit, so remove one for each:
for l in self.raw_classdefs:
# Ensure key is there: class definitions can include excluded lines.
if l in exit_counts:
exit_counts[l] -= 1
return exit_counts
def missing_arc_description(
self,
start: TLineNo,
end: TLineNo,
executed_arcs: Iterable[TArc] | None = None,
) -> str:
"""Provide an English sentence describing a missing arc."""
if self._missing_arc_fragments is None:
self._analyze_ast()
assert self._missing_arc_fragments is not None
actual_start = start
if (
executed_arcs and
end < 0 and end == -start and
(end, start) not in executed_arcs and
(end, start) in self._missing_arc_fragments
):
# It's a one-line callable, and we never even started it,
# and we have a message about not starting it.
start, end = end, start
fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
msgs = []
for smsg, emsg in fragment_pairs:
if emsg is None:
if end < 0:
# Hmm, maybe we have a one-line callable, let's check.
if (-end, end) in self._missing_arc_fragments:
return self.missing_arc_description(-end, end)
emsg = "didn't jump to the function exit"
else:
emsg = "didn't jump to line {lineno}"
emsg = emsg.format(lineno=end)
msg = f"line {actual_start} {emsg}"
if smsg is not None:
msg += f" because {smsg.format(lineno=actual_start)}"
msgs.append(msg)
return " or ".join(msgs)
class ByteParser:
"""Parse bytecode to understand the structure of code."""
def __init__(
self,
text: str,
code: CodeType | None = None,
filename: str | None = None,
) -> None:
self.text = text
if code is not None:
self.code = code
else:
assert filename is not None
# We only get here if earlier ast parsing succeeded, so no need to
# catch errors.
self.code = compile(text, filename, "exec", dont_inherit=True)
def child_parsers(self) -> Iterable[ByteParser]:
"""Iterate over all the code objects nested within this one.
The iteration includes `self` as its first value.
"""
return (ByteParser(self.text, code=c) for c in code_objects(self.code))
def _line_numbers(self) -> Iterable[TLineNo]:
"""Yield the line numbers possible in this code object.
Uses co_lnotab described in Python/compile.c to find the
line numbers. Produces a sequence: l0, l1, ...
"""
if hasattr(self.code, "co_lines"):
# PYVERSIONS: new in 3.10
for _, _, line in self.code.co_lines():
if line:
yield line
else:
# Adapted from dis.py in the standard library.
byte_increments = self.code.co_lnotab[0::2]
line_increments = self.code.co_lnotab[1::2]
last_line_num = None
line_num = self.code.co_firstlineno
byte_num = 0
for byte_incr, line_incr in zip(byte_increments, line_increments):
if byte_incr:
if line_num != last_line_num:
yield line_num
last_line_num = line_num
byte_num += byte_incr
if line_incr >= 0x80:
line_incr -= 0x100
line_num += line_incr
if line_num != last_line_num:
yield line_num
def _find_statements(self) -> Iterable[TLineNo]:
"""Find the statements in `self.code`.
Produce a sequence of line numbers that start statements. Recurses
into all code objects reachable from `self.code`.
"""
for bp in self.child_parsers():
# Get all of the lineno information from this code.
yield from bp._line_numbers()
#
# AST analysis
#
@dataclass(frozen=True, order=True)
class ArcStart:
"""The information needed to start an arc.
`lineno` is the line number the arc starts from.
`cause` is an English text fragment used as the `startmsg` for
AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
arc wasn't executed, so should fit well into a sentence of the form,
"Line 17 didn't run because {cause}." The fragment can include "{lineno}"
to have `lineno` interpolated into it.
As an example, this code::
if something(x): # line 1
func(x) # line 2
more_stuff() # line 3
would have two ArcStarts:
- ArcStart(1, "the condition on line 1 was always true")
- ArcStart(1, "the condition on line 1 was never true")
The first would be used to create an arc from 1 to 3, creating a message like
"line 1 didn't jump to line 3 because the condition on line 1 was always true."
The second would be used for the arc from 1 to 2, creating a message like
"line 1 didn't jump to line 2 because the condition on line 1 was never true."
"""
lineno: TLineNo
cause: str = ""
class TAddArcFn(Protocol):
"""The type for AstArcAnalyzer.add_arc()."""
def __call__(
self,
start: TLineNo,
end: TLineNo,
smsg: str | None = None,
emsg: str | None = None,
) -> None:
...
TArcFragments = Dict[TArc, List[Tuple[Optional[str], Optional[str]]]]
class Block:
"""
Blocks need to handle various exiting statements in their own ways.
All of these methods take a list of exits, and a callable `add_arc`
function that they can use to add arcs if needed. They return True if the
exits are handled, or False if the search should continue up the block
stack.
"""
# pylint: disable=unused-argument
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process break exits."""
# Because break can only appear in loops, and most subclasses
# implement process_break_exits, this function is never reached.
raise AssertionError
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process continue exits."""
# Because continue can only appear in loops, and most subclasses
# implement process_continue_exits, this function is never reached.
raise AssertionError
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process raise exits."""
return False
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
"""Process return exits."""
return False
class LoopBlock(Block):
"""A block on the block stack representing a `for` or `while` loop."""
def __init__(self, start: TLineNo) -> None:
# The line number where the loop starts.
self.start = start
# A set of ArcStarts, the arcs from break statements exiting this loop.
self.break_exits: set[ArcStart] = set()
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
self.break_exits.update(exits)
return True
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(xit.lineno, self.start, xit.cause)
return True
class FunctionBlock(Block):
"""A block on the block stack representing a function definition."""
def __init__(self, start: TLineNo, name: str) -> None:
# The line number where the function starts.
self.start = start
# The name of the function.
self.name = name
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(
xit.lineno, -self.start, xit.cause,
f"didn't except from function {self.name!r}",
)
return True
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
for xit in exits:
add_arc(
xit.lineno, -self.start, xit.cause,
f"didn't return from function {self.name!r}",
)
return True
class TryBlock(Block):
"""A block on the block stack representing a `try` block."""
def __init__(self, handler_start: TLineNo | None, final_start: TLineNo | None) -> None:
# The line number of the first "except" handler, if any.
self.handler_start = handler_start
# The line number of the "finally:" clause, if any.
self.final_start = final_start
# The ArcStarts for breaks/continues/returns/raises inside the "try:"
# that need to route through the "finally:" clause.
self.break_from: set[ArcStart] = set()
self.continue_from: set[ArcStart] = set()
self.raise_from: set[ArcStart] = set()
self.return_from: set[ArcStart] = set()
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.break_from.update(exits)
return True
return False
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.continue_from.update(exits)
return True
return False
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.handler_start is not None:
for xit in exits:
add_arc(xit.lineno, self.handler_start, xit.cause)
else:
assert self.final_start is not None
self.raise_from.update(exits)
return True
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
if self.final_start is not None:
self.return_from.update(exits)
return True
return False
class WithBlock(Block):
"""A block on the block stack representing a `with` block."""
def __init__(self, start: TLineNo) -> None:
# We only ever use this block if it is needed, so that we don't have to
# check this setting in all the methods.
assert env.PYBEHAVIOR.exit_through_with
# The line number of the with statement.
self.start = start
# The ArcStarts for breaks/continues/returns/raises inside the "with:"
# that need to go through the with-statement while exiting.
self.break_from: set[ArcStart] = set()
self.continue_from: set[ArcStart] = set()
self.return_from: set[ArcStart] = set()
def _process_exits(
self,
exits: set[ArcStart],
add_arc: TAddArcFn,
from_set: set[ArcStart] | None = None,
) -> bool:
"""Helper to process the four kinds of exits."""
for xit in exits:
add_arc(xit.lineno, self.start, xit.cause)
if from_set is not None:
from_set.update(exits)
return True
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.break_from)
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.continue_from)
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc)
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
return self._process_exits(exits, add_arc, self.return_from)
class NodeList(ast.AST):
"""A synthetic fictitious node, containing a sequence of nodes.
This is used when collapsing optimized if-statements, to represent the
unconditional execution of one of the clauses.
"""
def __init__(self, body: Sequence[ast.AST]) -> None:
self.body = body
self.lineno = body[0].lineno
# TODO: Shouldn't the cause messages join with "and" instead of "or"?
def _make_expression_code_method(noun: str) -> Callable[[AstArcAnalyzer, ast.AST], None]:
"""A function to make methods for expression-based callable _code_object__ methods."""
def _code_object__expression_callable(self: AstArcAnalyzer, node: ast.AST) -> None:
start = self.line_for_node(node)
self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}")
self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}")
return _code_object__expression_callable
class AstArcAnalyzer:
"""Analyze source text with an AST to find executable code paths.
The .analyze() method does the work, and populates these attributes:
`arcs`: a set of (from, to) pairs of the the arcs possible in the code.
`missing_arc_fragments`: a dict mapping (from, to) arcs to lists of
message fragments explaining why the arc is missing from execution::
{ (start, end): [(startmsg, endmsg), ...], }
For an arc starting from line 17, they should be usable to form complete
sentences like: "Line 17 {endmsg} because {startmsg}".
"""
def __init__(
self,
root_node: ast.AST,
statements: set[TLineNo],
multiline: dict[TLineNo, TLineNo],
) -> None:
self.root_node = root_node
# TODO: I think this is happening in too many places.
self.statements = {multiline.get(l, l) for l in statements}
self.multiline = multiline
# Turn on AST dumps with an environment variable.
# $set_env.py: COVERAGE_AST_DUMP - Dump the AST nodes when parsing code.
dump_ast = bool(int(os.getenv("COVERAGE_AST_DUMP", "0")))
if dump_ast: # pragma: debugging
# Dump the AST so that failing tests have helpful output.
print(f"Statements: {self.statements}")
print(f"Multiline map: {self.multiline}")
dumpkw: dict[str, Any] = {}
if sys.version_info >= (3, 9):
dumpkw["indent"] = 4
print(ast.dump(self.root_node, include_attributes=True, **dumpkw))
self.arcs: set[TArc] = set()
self.missing_arc_fragments: TArcFragments = collections.defaultdict(list)
self.block_stack: list[Block] = []
# $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code.
self.debug = bool(int(os.getenv("COVERAGE_TRACK_ARCS", "0")))
def analyze(self) -> None:
"""Examine the AST tree from `self.root_node` to determine possible arcs."""
for node in ast.walk(self.root_node):
node_name = node.__class__.__name__
code_object_handler = getattr(self, "_code_object__" + node_name, None)
if code_object_handler is not None:
code_object_handler(node)
# Code object dispatchers: _code_object__*
#
# These methods are used by analyze() as the start of the analysis.
# There is one for each construct with a code object.
def _code_object__Module(self, node: ast.Module) -> None:
start = self.line_for_node(node)
if node.body:
exits = self.body_exits(node.body, from_start=ArcStart(-start))
for xit in exits:
self.add_arc(xit.lineno, -start, xit.cause, "didn't exit the module")
else:
# Empty module.
self.add_arc(-start, start)
self.add_arc(start, -start)
def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None:
start = self.line_for_node(node)
self.block_stack.append(FunctionBlock(start=start, name=node.name))
exits = self.body_exits(node.body, from_start=ArcStart(-start))
self.process_return_exits(exits)
self.block_stack.pop()
_code_object__AsyncFunctionDef = _code_object__FunctionDef
def _code_object__ClassDef(self, node: ast.ClassDef) -> None:
start = self.line_for_node(node)
self.add_arc(-start, start)
exits = self.body_exits(node.body, from_start=ArcStart(start))
for xit in exits:
self.add_arc(
xit.lineno, -start, xit.cause,
f"didn't exit the body of class {node.name!r}",
)
_code_object__Lambda = _make_expression_code_method("lambda")
_code_object__GeneratorExp = _make_expression_code_method("generator expression")
if env.PYBEHAVIOR.comprehensions_are_functions:
_code_object__DictComp = _make_expression_code_method("dictionary comprehension")
_code_object__SetComp = _make_expression_code_method("set comprehension")
_code_object__ListComp = _make_expression_code_method("list comprehension")
def add_arc(
self,
start: TLineNo,
end: TLineNo,
smsg: str | None = None,
emsg: str | None = None,
) -> None:
"""Add an arc, including message fragments to use if it is missing."""
if self.debug: # pragma: debugging
print(f"\nAdding possible arc: ({start}, {end}): {smsg!r}, {emsg!r}")
print(short_stack())
self.arcs.add((start, end))
if smsg is not None or emsg is not None:
self.missing_arc_fragments[(start, end)].append((smsg, emsg))
def nearest_blocks(self) -> Iterable[Block]:
"""Yield the blocks in nearest-to-farthest order."""
return reversed(self.block_stack)
def line_for_node(self, node: ast.AST) -> TLineNo:
"""What is the right line number to use for this node?
This dispatches to _line__Node functions where needed.
"""
node_name = node.__class__.__name__
handler = cast(
Optional[Callable[[ast.AST], TLineNo]],
getattr(self, "_line__" + node_name, None),
)
if handler is not None:
return handler(node)
else:
return node.lineno
# First lines: _line__*
#
# Dispatched by line_for_node, each method knows how to identify the first
# line number in the node, as Python will report it.
def _line_decorated(self, node: ast.FunctionDef) -> TLineNo:
"""Compute first line number for things that can be decorated (classes and functions)."""
if node.decorator_list:
lineno = node.decorator_list[0].lineno
else:
lineno = node.lineno
return lineno
def _line__Assign(self, node: ast.Assign) -> TLineNo:
return self.line_for_node(node.value)
_line__ClassDef = _line_decorated
def _line__Dict(self, node: ast.Dict) -> TLineNo:
if node.keys:
if node.keys[0] is not None:
return node.keys[0].lineno
else:
# Unpacked dict literals `{**{"a":1}}` have None as the key,
# use the value in that case.
return node.values[0].lineno
else:
return node.lineno
_line__FunctionDef = _line_decorated
_line__AsyncFunctionDef = _line_decorated
def _line__List(self, node: ast.List) -> TLineNo:
if node.elts:
return self.line_for_node(node.elts[0])
else:
return node.lineno
def _line__Module(self, node: ast.Module) -> TLineNo:
if env.PYBEHAVIOR.module_firstline_1:
return 1
elif node.body:
return self.line_for_node(node.body[0])
else:
# Empty modules have no line number, they always start at 1.
return 1
# The node types that just flow to the next node with no complications.
OK_TO_DEFAULT = {
"AnnAssign", "Assign", "Assert", "AugAssign", "Delete", "Expr", "Global",
"Import", "ImportFrom", "Nonlocal", "Pass",
}
def node_exits(self, node: ast.AST) -> set[ArcStart]:
"""Find the set of arc starts that exit this node.
Return a set of ArcStarts, exits from this node to the next. Because a
node represents an entire sub-tree (including its children), the exits
from a node can be arbitrarily complex::
if something(1):
if other(2):
doit(3)
else:
doit(5)
There are three exits from line 1: they start at lines 1, 3 and 5.
There are two exits from line 2: lines 3 and 5.
"""
node_name = node.__class__.__name__
handler = cast(
Optional[Callable[[ast.AST], Set[ArcStart]]],
getattr(self, "_handle__" + node_name, None),
)
if handler is not None:
arc_starts = handler(node)
else:
# No handler: either it's something that's ok to default (a simple
# statement), or it's something we overlooked.
if env.TESTING:
if node_name not in self.OK_TO_DEFAULT:
raise RuntimeError(f"*** Unhandled: {node}") # pragma: only failure
# Default for simple statements: one exit from this node.
arc_starts = {ArcStart(self.line_for_node(node))}
return arc_starts
def body_exits(
self,
body: Sequence[ast.AST],
from_start: ArcStart | None = None,
prev_starts: set[ArcStart] | None = None,
) -> set[ArcStart]:
"""Find arc starts that exit the body of a compound statement.
`body` is the body node. `from_start` is a single `ArcStart` that can
be the previous line in flow before this body. `prev_starts` is a set
of ArcStarts that can be the previous line. Only one of them should be
given.
Also records arcs (using `add_arc`) within the body.
Returns a set of ArcStarts, the exits from this body.
"""
if prev_starts is None:
assert from_start is not None
prev_starts = {from_start}
else:
assert from_start is None
# Loop over the nodes in the body, making arcs from each one's exits to
# the next node.
for body_node in body:
lineno = self.line_for_node(body_node)
first_line = self.multiline.get(lineno, lineno)
if first_line not in self.statements:
maybe_body_node = self.find_non_missing_node(body_node)
if maybe_body_node is None:
continue
body_node = maybe_body_node
lineno = self.line_for_node(body_node)
for prev_start in prev_starts:
self.add_arc(prev_start.lineno, lineno, prev_start.cause)
prev_starts = self.node_exits(body_node)
return prev_starts
def find_non_missing_node(self, node: ast.AST) -> ast.AST | None:
"""Search `node` looking for a child that has not been optimized away.
This might return the node you started with, or it will work recursively
to find a child node in self.statements.
Returns a node, or None if none of the node remains.
"""
# This repeats work just done in body_exits, but this duplication
# means we can avoid a function call in the 99.9999% case of not
# optimizing away statements.
lineno = self.line_for_node(node)
first_line = self.multiline.get(lineno, lineno)
if first_line in self.statements:
return node
missing_fn = cast(
Optional[Callable[[ast.AST], Optional[ast.AST]]],
getattr(self, "_missing__" + node.__class__.__name__, None),
)
if missing_fn is not None:
ret_node = missing_fn(node)
else:
ret_node = None
return ret_node
# Missing nodes: _missing__*
#
# Entire statements can be optimized away by Python. They will appear in
# the AST, but not the bytecode. These functions are called (by
# find_non_missing_node) to find a node to use instead of the missing
# node. They can return None if the node should truly be gone.
def _missing__If(self, node: ast.If) -> ast.AST | None:
# If the if-node is missing, then one of its children might still be
# here, but not both. So return the first of the two that isn't missing.
# Use a NodeList to hold the clauses as a single node.
non_missing = self.find_non_missing_node(NodeList(node.body))
if non_missing:
return non_missing
if node.orelse:
return self.find_non_missing_node(NodeList(node.orelse))
return None
def _missing__NodeList(self, node: NodeList) -> ast.AST | None:
# A NodeList might be a mixture of missing and present nodes. Find the
# ones that are present.
non_missing_children = []
for child in node.body:
maybe_child = self.find_non_missing_node(child)
if maybe_child is not None:
non_missing_children.append(maybe_child)
# Return the simplest representation of the present children.
if not non_missing_children:
return None
if len(non_missing_children) == 1:
return non_missing_children[0]
return NodeList(non_missing_children)
def _missing__While(self, node: ast.While) -> ast.AST | None:
body_nodes = self.find_non_missing_node(NodeList(node.body))
if not body_nodes:
return None
# Make a synthetic While-true node.
new_while = ast.While()
new_while.lineno = body_nodes.lineno
new_while.test = ast.Name()
new_while.test.lineno = body_nodes.lineno
new_while.test.id = "True"
assert hasattr(body_nodes, "body")
new_while.body = body_nodes.body
new_while.orelse = []
return new_while
def is_constant_expr(self, node: ast.AST) -> str | None:
"""Is this a compile-time constant?"""
node_name = node.__class__.__name__
if node_name in ["Constant", "NameConstant", "Num"]:
return "Num"
elif isinstance(node, ast.Name):
if node.id in ["True", "False", "None", "__debug__"]:
return "Name"
return None
# In the fullness of time, these might be good tests to write:
# while EXPR:
# while False:
# listcomps hidden deep in other expressions
# listcomps hidden in lists: x = [[i for i in range(10)]]
# nested function definitions
# Exit processing: process_*_exits
#
# These functions process the four kinds of jump exits: break, continue,
# raise, and return. To figure out where an exit goes, we have to look at
# the block stack context. For example, a break will jump to the nearest
# enclosing loop block, or the nearest enclosing finally block, whichever
# is nearer.
def process_break_exits(self, exits: set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being breaks."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_break_exits(exits, self.add_arc):
break
def process_continue_exits(self, exits: set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being continues."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_continue_exits(exits, self.add_arc):
break
def process_raise_exits(self, exits: set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being raises."""
for block in self.nearest_blocks():
if block.process_raise_exits(exits, self.add_arc):
break
def process_return_exits(self, exits: set[ArcStart]) -> None:
"""Add arcs due to jumps from `exits` being returns."""
for block in self.nearest_blocks(): # pragma: always breaks
if block.process_return_exits(exits, self.add_arc):
break
# Node handlers: _handle__*
#
# Each handler deals with a specific AST node type, dispatched from
# node_exits. Handlers return the set of exits from that node, and can
# also call self.add_arc to record arcs they find. These functions mirror
# the Python semantics of each syntactic construct. See the docstring
# for node_exits to understand the concept of exits from a node.
#
# Every node type that represents a statement should have a handler, or it
# should be listed in OK_TO_DEFAULT.
def _handle__Break(self, node: ast.Break) -> set[ArcStart]:
here = self.line_for_node(node)
break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
self.process_break_exits({break_start})
return set()
def _handle_decorated(self, node: ast.FunctionDef) -> set[ArcStart]:
"""Add arcs for things that can be decorated (classes and functions)."""
main_line: TLineNo = node.lineno
last: TLineNo | None = node.lineno
decs = node.decorator_list
if decs:
last = None
for dec_node in decs:
dec_start = self.line_for_node(dec_node)
if last is not None and dec_start != last: # type: ignore[unreachable]
self.add_arc(last, dec_start) # type: ignore[unreachable]
last = dec_start
assert last is not None
self.add_arc(last, main_line)
last = main_line
if env.PYBEHAVIOR.trace_decorator_line_again:
for top, bot in zip(decs, decs[1:]):
self.add_arc(self.line_for_node(bot), self.line_for_node(top))
self.add_arc(self.line_for_node(decs[0]), main_line)
self.add_arc(main_line, self.line_for_node(decs[-1]))
# The definition line may have been missed, but we should have it
# in `self.statements`. For some constructs, `line_for_node` is
# not what we'd think of as the first line in the statement, so map
# it to the first one.
if node.body:
body_start = self.line_for_node(node.body[0])
body_start = self.multiline.get(body_start, body_start)
# The body is handled in collect_arcs.
assert last is not None
return {ArcStart(last)}
_handle__ClassDef = _handle_decorated
def _handle__Continue(self, node: ast.Continue) -> set[ArcStart]:
here = self.line_for_node(node)
continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
self.process_continue_exits({continue_start})
return set()
def _handle__For(self, node: ast.For) -> set[ArcStart]:
start = self.line_for_node(node.iter)
self.block_stack.append(LoopBlock(start=start))
from_start = ArcStart(start, cause="the loop on line {lineno} never started")
exits = self.body_exits(node.body, from_start=from_start)
# Any exit from the body will go back to the top of the loop.
for xit in exits:
self.add_arc(xit.lineno, start, xit.cause)
my_block = self.block_stack.pop()
assert isinstance(my_block, LoopBlock)
exits = my_block.break_exits
from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
if node.orelse:
else_exits = self.body_exits(node.orelse, from_start=from_start)
exits |= else_exits
else:
# No else clause: exit from the for line.
exits.add(from_start)
return exits
_handle__AsyncFor = _handle__For
_handle__FunctionDef = _handle_decorated
_handle__AsyncFunctionDef = _handle_decorated
def _handle__If(self, node: ast.If) -> set[ArcStart]:
start = self.line_for_node(node.test)
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
exits = self.body_exits(node.body, from_start=from_start)
from_start = ArcStart(start, cause="the condition on line {lineno} was always true")
exits |= self.body_exits(node.orelse, from_start=from_start)
return exits
if sys.version_info >= (3, 10):
def _handle__Match(self, node: ast.Match) -> set[ArcStart]:
start = self.line_for_node(node)
last_start = start
exits = set()
for case in node.cases:
case_start = self.line_for_node(case.pattern)
self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
from_start = ArcStart(
case_start,
cause="the pattern on line {lineno} never matched",
)
exits |= self.body_exits(case.body, from_start=from_start)
last_start = case_start
# case is now the last case, check for wildcard match.
pattern = case.pattern # pylint: disable=undefined-loop-variable
while isinstance(pattern, ast.MatchOr):
pattern = pattern.patterns[-1]
had_wildcard = (
isinstance(pattern, ast.MatchAs)
and pattern.pattern is None
and case.guard is None # pylint: disable=undefined-loop-variable
)
if not had_wildcard:
exits.add(
ArcStart(case_start, cause="the pattern on line {lineno} always matched"),
)
return exits
def _handle__NodeList(self, node: NodeList) -> set[ArcStart]:
start = self.line_for_node(node)
exits = self.body_exits(node.body, from_start=ArcStart(start))
return exits
def _handle__Raise(self, node: ast.Raise) -> set[ArcStart]:
here = self.line_for_node(node)
raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
self.process_raise_exits({raise_start})
# `raise` statement jumps away, no exits from here.
return set()
def _handle__Return(self, node: ast.Return) -> set[ArcStart]:
here = self.line_for_node(node)
return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
self.process_return_exits({return_start})
# `return` statement jumps away, no exits from here.
return set()
def _handle__Try(self, node: ast.Try) -> set[ArcStart]:
if node.handlers:
handler_start = self.line_for_node(node.handlers[0])
else:
handler_start = None
if node.finalbody:
final_start = self.line_for_node(node.finalbody[0])
else:
final_start = None
# This is true by virtue of Python syntax: have to have either except
# or finally, or both.
assert handler_start is not None or final_start is not None
try_block = TryBlock(handler_start, final_start)
self.block_stack.append(try_block)
start = self.line_for_node(node)
exits = self.body_exits(node.body, from_start=ArcStart(start))
# We're done with the `try` body, so this block no longer handles
# exceptions. We keep the block so the `finally` clause can pick up
# flows from the handlers and `else` clause.
if node.finalbody:
try_block.handler_start = None
if node.handlers:
# If there are `except` clauses, then raises in the try body
# will already jump to them. Start this set over for raises in
# `except` and `else`.
try_block.raise_from = set()
else:
self.block_stack.pop()
handler_exits: set[ArcStart] = set()
if node.handlers:
last_handler_start: TLineNo | None = None
for handler_node in node.handlers:
handler_start = self.line_for_node(handler_node)
if last_handler_start is not None:
self.add_arc(last_handler_start, handler_start)
last_handler_start = handler_start
from_cause = "the exception caught by line {lineno} didn't happen"
from_start = ArcStart(handler_start, cause=from_cause)
handler_exits |= self.body_exits(handler_node.body, from_start=from_start)
if node.orelse:
exits = self.body_exits(node.orelse, prev_starts=exits)
exits |= handler_exits
if node.finalbody:
self.block_stack.pop()
final_from = ( # You can get to the `finally` clause from:
exits | # the exits of the body or `else` clause,
try_block.break_from | # or a `break`,
try_block.continue_from | # or a `continue`,
try_block.raise_from | # or a `raise`,
try_block.return_from # or a `return`.
)
final_exits = self.body_exits(node.finalbody, prev_starts=final_from)
if try_block.break_from:
if env.PYBEHAVIOR.finally_jumps_back:
for break_line in try_block.break_from:
lineno = break_line.lineno
cause = break_line.cause.format(lineno=lineno)
for final_exit in final_exits:
self.add_arc(final_exit.lineno, lineno, cause)
breaks = try_block.break_from
else:
breaks = self._combine_finally_starts(try_block.break_from, final_exits)
self.process_break_exits(breaks)
if try_block.continue_from:
if env.PYBEHAVIOR.finally_jumps_back:
for continue_line in try_block.continue_from:
lineno = continue_line.lineno
cause = continue_line.cause.format(lineno=lineno)
for final_exit in final_exits:
self.add_arc(final_exit.lineno, lineno, cause)
continues = try_block.continue_from
else:
continues = self._combine_finally_starts(try_block.continue_from, final_exits)
self.process_continue_exits(continues)
if try_block.raise_from:
self.process_raise_exits(
self._combine_finally_starts(try_block.raise_from, final_exits),
)
if try_block.return_from:
if env.PYBEHAVIOR.finally_jumps_back:
for return_line in try_block.return_from:
lineno = return_line.lineno
cause = return_line.cause.format(lineno=lineno)
for final_exit in final_exits:
self.add_arc(final_exit.lineno, lineno, cause)
returns = try_block.return_from
else:
returns = self._combine_finally_starts(try_block.return_from, final_exits)
self.process_return_exits(returns)
if exits:
# The finally clause's exits are only exits for the try block
# as a whole if the try block had some exits to begin with.
exits = final_exits
return exits
def _combine_finally_starts(self, starts: set[ArcStart], exits: set[ArcStart]) -> set[ArcStart]:
"""Helper for building the cause of `finally` branches.
"finally" clauses might not execute their exits, and the causes could
be due to a failure to execute any of the exits in the try block. So
we use the causes from `starts` as the causes for `exits`.
"""
causes = []
for start in sorted(starts):
if start.cause:
causes.append(start.cause.format(lineno=start.lineno))
cause = " or ".join(causes)
exits = {ArcStart(xit.lineno, cause) for xit in exits}
return exits
def _handle__While(self, node: ast.While) -> set[ArcStart]:
start = to_top = self.line_for_node(node.test)
constant_test = self.is_constant_expr(node.test)
top_is_body0 = False
if constant_test:
top_is_body0 = True
if env.PYBEHAVIOR.keep_constant_test:
top_is_body0 = False
if top_is_body0:
to_top = self.line_for_node(node.body[0])
self.block_stack.append(LoopBlock(start=to_top))
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
exits = self.body_exits(node.body, from_start=from_start)
for xit in exits:
self.add_arc(xit.lineno, to_top, xit.cause)
exits = set()
my_block = self.block_stack.pop()
assert isinstance(my_block, LoopBlock)
exits.update(my_block.break_exits)
from_start = ArcStart(start, cause="the condition on line {lineno} was always true")
if node.orelse:
else_exits = self.body_exits(node.orelse, from_start=from_start)
exits |= else_exits
else:
# No `else` clause: you can exit from the start.
if not constant_test:
exits.add(from_start)
return exits
def _handle__With(self, node: ast.With) -> set[ArcStart]:
start = self.line_for_node(node)
if env.PYBEHAVIOR.exit_through_with:
self.block_stack.append(WithBlock(start=start))
exits = self.body_exits(node.body, from_start=ArcStart(start))
if env.PYBEHAVIOR.exit_through_with:
with_block = self.block_stack.pop()
assert isinstance(with_block, WithBlock)
with_exit = {ArcStart(start)}
if exits:
for xit in exits:
self.add_arc(xit.lineno, start)
exits = with_exit
if with_block.break_from:
self.process_break_exits(
self._combine_finally_starts(with_block.break_from, with_exit),
)
if with_block.continue_from:
self.process_continue_exits(
self._combine_finally_starts(with_block.continue_from, with_exit),
)
if with_block.return_from:
self.process_return_exits(
self._combine_finally_starts(with_block.return_from, with_exit),
)
return exits
_handle__AsyncWith = _handle__With