| # Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 |
| # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt |
| |
| """Coverage data for coverage.py. |
| |
| This file had the 4.x JSON data support, which is now gone. This file still |
| has storage-agnostic helpers, and is kept to avoid changing too many imports. |
| CoverageData is now defined in sqldata.py, and imported here to keep the |
| imports working. |
| |
| """ |
| |
| from __future__ import annotations |
| |
| import functools |
| import glob |
| import hashlib |
| import os.path |
| |
| from typing import Callable, Iterable |
| |
| from coverage.exceptions import CoverageException, NoDataError |
| from coverage.files import PathAliases |
| from coverage.misc import Hasher, file_be_gone, human_sorted, plural |
| from coverage.sqldata import CoverageData |
| |
| |
| def line_counts(data: CoverageData, fullpath: bool = False) -> dict[str, int]: |
| """Return a dict summarizing the line coverage data. |
| |
| Keys are based on the file names, and values are the number of executed |
| lines. If `fullpath` is true, then the keys are the full pathnames of |
| the files, otherwise they are the basenames of the files. |
| |
| Returns a dict mapping file names to counts of lines. |
| |
| """ |
| summ = {} |
| filename_fn: Callable[[str], str] |
| if fullpath: |
| # pylint: disable=unnecessary-lambda-assignment |
| filename_fn = lambda f: f |
| else: |
| filename_fn = os.path.basename |
| for filename in data.measured_files(): |
| lines = data.lines(filename) |
| assert lines is not None |
| summ[filename_fn(filename)] = len(lines) |
| return summ |
| |
| |
| def add_data_to_hash(data: CoverageData, filename: str, hasher: Hasher) -> None: |
| """Contribute `filename`'s data to the `hasher`. |
| |
| `hasher` is a `coverage.misc.Hasher` instance to be updated with |
| the file's data. It should only get the results data, not the run |
| data. |
| |
| """ |
| if data.has_arcs(): |
| hasher.update(sorted(data.arcs(filename) or [])) |
| else: |
| hasher.update(sorted_lines(data, filename)) |
| hasher.update(data.file_tracer(filename)) |
| |
| |
| def combinable_files(data_file: str, data_paths: Iterable[str] | None = None) -> list[str]: |
| """Make a list of data files to be combined. |
| |
| `data_file` is a path to a data file. `data_paths` is a list of files or |
| directories of files. |
| |
| Returns a list of absolute file paths. |
| """ |
| data_dir, local = os.path.split(os.path.abspath(data_file)) |
| |
| data_paths = data_paths or [data_dir] |
| files_to_combine = [] |
| for p in data_paths: |
| if os.path.isfile(p): |
| files_to_combine.append(os.path.abspath(p)) |
| elif os.path.isdir(p): |
| pattern = glob.escape(os.path.join(os.path.abspath(p), local)) +".*" |
| files_to_combine.extend(glob.glob(pattern)) |
| else: |
| raise NoDataError(f"Couldn't combine from non-existent path '{p}'") |
| |
| # SQLite might have made journal files alongside our database files. |
| # We never want to combine those. |
| files_to_combine = [fnm for fnm in files_to_combine if not fnm.endswith("-journal")] |
| |
| # Sorting isn't usually needed, since it shouldn't matter what order files |
| # are combined, but sorting makes tests more predictable, and makes |
| # debugging more understandable when things go wrong. |
| return sorted(files_to_combine) |
| |
| |
| def combine_parallel_data( |
| data: CoverageData, |
| aliases: PathAliases | None = None, |
| data_paths: Iterable[str] | None = None, |
| strict: bool = False, |
| keep: bool = False, |
| message: Callable[[str], None] | None = None, |
| ) -> None: |
| """Combine a number of data files together. |
| |
| `data` is a CoverageData. |
| |
| Treat `data.filename` as a file prefix, and combine the data from all |
| of the data files starting with that prefix plus a dot. |
| |
| If `aliases` is provided, it's a `PathAliases` object that is used to |
| re-map paths to match the local machine's. |
| |
| If `data_paths` is provided, it is a list of directories or files to |
| combine. Directories are searched for files that start with |
| `data.filename` plus dot as a prefix, and those files are combined. |
| |
| If `data_paths` is not provided, then the directory portion of |
| `data.filename` is used as the directory to search for data files. |
| |
| Unless `keep` is True every data file found and combined is then deleted |
| from disk. If a file cannot be read, a warning will be issued, and the |
| file will not be deleted. |
| |
| If `strict` is true, and no files are found to combine, an error is |
| raised. |
| |
| `message` is a function to use for printing messages to the user. |
| |
| """ |
| files_to_combine = combinable_files(data.base_filename(), data_paths) |
| |
| if strict and not files_to_combine: |
| raise NoDataError("No data to combine") |
| |
| if aliases is None: |
| map_path = None |
| else: |
| map_path = functools.lru_cache(maxsize=None)(aliases.map) |
| |
| file_hashes = set() |
| combined_any = False |
| |
| for f in files_to_combine: |
| if f == data.data_filename(): |
| # Sometimes we are combining into a file which is one of the |
| # parallel files. Skip that file. |
| if data._debug.should("dataio"): |
| data._debug.write(f"Skipping combining ourself: {f!r}") |
| continue |
| |
| try: |
| rel_file_name = os.path.relpath(f) |
| except ValueError: |
| # ValueError can be raised under Windows when os.getcwd() returns a |
| # folder from a different drive than the drive of f, in which case |
| # we print the original value of f instead of its relative path |
| rel_file_name = f |
| |
| with open(f, "rb") as fobj: |
| hasher = hashlib.new("sha3_256") |
| hasher.update(fobj.read()) |
| sha = hasher.digest() |
| combine_this_one = sha not in file_hashes |
| |
| delete_this_one = not keep |
| if combine_this_one: |
| if data._debug.should("dataio"): |
| data._debug.write(f"Combining data file {f!r}") |
| file_hashes.add(sha) |
| try: |
| new_data = CoverageData(f, debug=data._debug) |
| new_data.read() |
| except CoverageException as exc: |
| if data._warn: |
| # The CoverageException has the file name in it, so just |
| # use the message as the warning. |
| data._warn(str(exc)) |
| if message: |
| message(f"Couldn't combine data file {rel_file_name}: {exc}") |
| delete_this_one = False |
| else: |
| data.update(new_data, map_path=map_path) |
| combined_any = True |
| if message: |
| message(f"Combined data file {rel_file_name}") |
| else: |
| if message: |
| message(f"Skipping duplicate data {rel_file_name}") |
| |
| if delete_this_one: |
| if data._debug.should("dataio"): |
| data._debug.write(f"Deleting data file {f!r}") |
| file_be_gone(f) |
| |
| if strict and not combined_any: |
| raise NoDataError("No usable data files") |
| |
| |
| def debug_data_file(filename: str) -> None: |
| """Implementation of 'coverage debug data'.""" |
| data = CoverageData(filename) |
| filename = data.data_filename() |
| print(f"path: {filename}") |
| if not os.path.exists(filename): |
| print("No data collected: file doesn't exist") |
| return |
| data.read() |
| print(f"has_arcs: {data.has_arcs()!r}") |
| summary = line_counts(data, fullpath=True) |
| filenames = human_sorted(summary.keys()) |
| nfiles = len(filenames) |
| print(f"{nfiles} file{plural(nfiles)}:") |
| for f in filenames: |
| line = f"{f}: {summary[f]} line{plural(summary[f])}" |
| plugin = data.file_tracer(f) |
| if plugin: |
| line += f" [{plugin}]" |
| print(line) |
| |
| |
| def sorted_lines(data: CoverageData, filename: str) -> list[int]: |
| """Get the sorted lines for a file, for tests.""" |
| lines = data.lines(filename) |
| return sorted(lines or []) |