blob: 1252e414799684245d4d75ebf017be77cc50f110 [file] [log] [blame] [edit]
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""Coverage data for coverage.py.
This file had the 4.x JSON data support, which is now gone. This file still
has storage-agnostic helpers, and is kept to avoid changing too many imports.
CoverageData is now defined in sqldata.py, and imported here to keep the
imports working.
"""
from __future__ import annotations
import functools
import glob
import hashlib
import os.path
from typing import Callable, Iterable
from coverage.exceptions import CoverageException, NoDataError
from coverage.files import PathAliases
from coverage.misc import Hasher, file_be_gone, human_sorted, plural
from coverage.sqldata import CoverageData
def line_counts(data: CoverageData, fullpath: bool = False) -> dict[str, int]:
"""Return a dict summarizing the line coverage data.
Keys are based on the file names, and values are the number of executed
lines. If `fullpath` is true, then the keys are the full pathnames of
the files, otherwise they are the basenames of the files.
Returns a dict mapping file names to counts of lines.
"""
summ = {}
filename_fn: Callable[[str], str]
if fullpath:
# pylint: disable=unnecessary-lambda-assignment
filename_fn = lambda f: f
else:
filename_fn = os.path.basename
for filename in data.measured_files():
lines = data.lines(filename)
assert lines is not None
summ[filename_fn(filename)] = len(lines)
return summ
def add_data_to_hash(data: CoverageData, filename: str, hasher: Hasher) -> None:
"""Contribute `filename`'s data to the `hasher`.
`hasher` is a `coverage.misc.Hasher` instance to be updated with
the file's data. It should only get the results data, not the run
data.
"""
if data.has_arcs():
hasher.update(sorted(data.arcs(filename) or []))
else:
hasher.update(sorted_lines(data, filename))
hasher.update(data.file_tracer(filename))
def combinable_files(data_file: str, data_paths: Iterable[str] | None = None) -> list[str]:
"""Make a list of data files to be combined.
`data_file` is a path to a data file. `data_paths` is a list of files or
directories of files.
Returns a list of absolute file paths.
"""
data_dir, local = os.path.split(os.path.abspath(data_file))
data_paths = data_paths or [data_dir]
files_to_combine = []
for p in data_paths:
if os.path.isfile(p):
files_to_combine.append(os.path.abspath(p))
elif os.path.isdir(p):
pattern = glob.escape(os.path.join(os.path.abspath(p), local)) +".*"
files_to_combine.extend(glob.glob(pattern))
else:
raise NoDataError(f"Couldn't combine from non-existent path '{p}'")
# SQLite might have made journal files alongside our database files.
# We never want to combine those.
files_to_combine = [fnm for fnm in files_to_combine if not fnm.endswith("-journal")]
# Sorting isn't usually needed, since it shouldn't matter what order files
# are combined, but sorting makes tests more predictable, and makes
# debugging more understandable when things go wrong.
return sorted(files_to_combine)
def combine_parallel_data(
data: CoverageData,
aliases: PathAliases | None = None,
data_paths: Iterable[str] | None = None,
strict: bool = False,
keep: bool = False,
message: Callable[[str], None] | None = None,
) -> None:
"""Combine a number of data files together.
`data` is a CoverageData.
Treat `data.filename` as a file prefix, and combine the data from all
of the data files starting with that prefix plus a dot.
If `aliases` is provided, it's a `PathAliases` object that is used to
re-map paths to match the local machine's.
If `data_paths` is provided, it is a list of directories or files to
combine. Directories are searched for files that start with
`data.filename` plus dot as a prefix, and those files are combined.
If `data_paths` is not provided, then the directory portion of
`data.filename` is used as the directory to search for data files.
Unless `keep` is True every data file found and combined is then deleted
from disk. If a file cannot be read, a warning will be issued, and the
file will not be deleted.
If `strict` is true, and no files are found to combine, an error is
raised.
`message` is a function to use for printing messages to the user.
"""
files_to_combine = combinable_files(data.base_filename(), data_paths)
if strict and not files_to_combine:
raise NoDataError("No data to combine")
if aliases is None:
map_path = None
else:
map_path = functools.lru_cache(maxsize=None)(aliases.map)
file_hashes = set()
combined_any = False
for f in files_to_combine:
if f == data.data_filename():
# Sometimes we are combining into a file which is one of the
# parallel files. Skip that file.
if data._debug.should("dataio"):
data._debug.write(f"Skipping combining ourself: {f!r}")
continue
try:
rel_file_name = os.path.relpath(f)
except ValueError:
# ValueError can be raised under Windows when os.getcwd() returns a
# folder from a different drive than the drive of f, in which case
# we print the original value of f instead of its relative path
rel_file_name = f
with open(f, "rb") as fobj:
hasher = hashlib.new("sha3_256")
hasher.update(fobj.read())
sha = hasher.digest()
combine_this_one = sha not in file_hashes
delete_this_one = not keep
if combine_this_one:
if data._debug.should("dataio"):
data._debug.write(f"Combining data file {f!r}")
file_hashes.add(sha)
try:
new_data = CoverageData(f, debug=data._debug)
new_data.read()
except CoverageException as exc:
if data._warn:
# The CoverageException has the file name in it, so just
# use the message as the warning.
data._warn(str(exc))
if message:
message(f"Couldn't combine data file {rel_file_name}: {exc}")
delete_this_one = False
else:
data.update(new_data, map_path=map_path)
combined_any = True
if message:
message(f"Combined data file {rel_file_name}")
else:
if message:
message(f"Skipping duplicate data {rel_file_name}")
if delete_this_one:
if data._debug.should("dataio"):
data._debug.write(f"Deleting data file {f!r}")
file_be_gone(f)
if strict and not combined_any:
raise NoDataError("No usable data files")
def debug_data_file(filename: str) -> None:
"""Implementation of 'coverage debug data'."""
data = CoverageData(filename)
filename = data.data_filename()
print(f"path: {filename}")
if not os.path.exists(filename):
print("No data collected: file doesn't exist")
return
data.read()
print(f"has_arcs: {data.has_arcs()!r}")
summary = line_counts(data, fullpath=True)
filenames = human_sorted(summary.keys())
nfiles = len(filenames)
print(f"{nfiles} file{plural(nfiles)}:")
for f in filenames:
line = f"{f}: {summary[f]} line{plural(summary[f])}"
plugin = data.file_tracer(f)
if plugin:
line += f" [{plugin}]"
print(line)
def sorted_lines(data: CoverageData, filename: str) -> list[int]:
"""Get the sorted lines for a file, for tests."""
lines = data.lines(filename)
return sorted(lines or [])