| # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html |
| # For details: https://github.com/PyCQA/pylint/blob/master/LICENSE |
| |
| import csv |
| import operator |
| import platform |
| import sys |
| from collections import Counter |
| from io import StringIO |
| from typing import Dict, List, Optional, Tuple |
| |
| import pytest |
| from _pytest.config import Config |
| |
| from pylint import checkers |
| from pylint.lint import PyLinter |
| from pylint.testutils.constants import _EXPECTED_RE, _OPERATORS, UPDATE_OPTION |
| from pylint.testutils.functional_test_file import ( |
| FunctionalTestFile, |
| NoFileError, |
| parse_python_version, |
| ) |
| from pylint.testutils.output_line import OutputLine |
| from pylint.testutils.reporter_for_tests import FunctionalTestReporter |
| from pylint.utils import utils |
| |
| |
| class LintModuleTest: |
| maxDiff = None |
| |
| def __init__(self, test_file: FunctionalTestFile, config: Optional[Config] = None): |
| _test_reporter = FunctionalTestReporter() |
| self._linter = PyLinter() |
| self._linter.set_reporter(_test_reporter) |
| self._linter.config.persistent = 0 |
| checkers.initialize(self._linter) |
| self._linter.disable("suppressed-message") |
| self._linter.disable("locally-disabled") |
| self._linter.disable("useless-suppression") |
| try: |
| self._linter.read_config_file(test_file.option_file) |
| if self._linter.cfgfile_parser.has_option("MASTER", "load-plugins"): |
| plugins = utils._splitstrip( |
| self._linter.cfgfile_parser.get("MASTER", "load-plugins") |
| ) |
| self._linter.load_plugin_modules(plugins) |
| self._linter.load_config_file() |
| except NoFileError: |
| pass |
| self._test_file = test_file |
| self._config = config |
| |
| def setUp(self): |
| if self._should_be_skipped_due_to_version(): |
| pytest.skip( |
| "Test cannot run with Python %s." |
| % sys.version.split(" ", maxsplit=1)[0] |
| ) |
| missing = [] |
| for requirement in self._test_file.options["requires"]: |
| try: |
| __import__(requirement) |
| except ImportError: |
| missing.append(requirement) |
| if missing: |
| pytest.skip("Requires %s to be present." % ",".join(missing)) |
| except_implementations = self._test_file.options["except_implementations"] |
| if except_implementations: |
| implementations = [i.strip() for i in except_implementations.split(",")] |
| if platform.python_implementation() in implementations: |
| msg = "Test cannot run with Python implementation %r" |
| pytest.skip(msg % platform.python_implementation()) |
| excluded_platforms = self._test_file.options["exclude_platforms"] |
| if excluded_platforms: |
| platforms = [p.strip() for p in excluded_platforms.split(",")] |
| if sys.platform.lower() in platforms: |
| pytest.skip("Test cannot run on platform %r" % sys.platform) |
| |
| def runTest(self): |
| self._runTest() |
| |
| def _should_be_skipped_due_to_version(self): |
| return ( |
| sys.version_info < self._test_file.options["min_pyver"] |
| or sys.version_info > self._test_file.options["max_pyver"] |
| ) |
| |
| def __str__(self): |
| return f"{self._test_file.base} ({self.__class__.__module__}.{self.__class__.__name__})" |
| |
| @staticmethod |
| def get_expected_messages(stream): |
| """Parses a file and get expected messages. |
| |
| :param stream: File-like input stream. |
| :type stream: enumerable |
| :returns: A dict mapping line,msg-symbol tuples to the count on this line. |
| :rtype: dict |
| """ |
| messages = Counter() |
| for i, line in enumerate(stream): |
| match = _EXPECTED_RE.search(line) |
| if match is None: |
| continue |
| line = match.group("line") |
| if line is None: |
| line = i + 1 |
| elif line.startswith("+") or line.startswith("-"): |
| line = i + 1 + int(line) |
| else: |
| line = int(line) |
| |
| version = match.group("version") |
| op = match.group("op") |
| if version: |
| required = parse_python_version(version) |
| if not _OPERATORS[op](sys.version_info, required): |
| continue |
| |
| for msg_id in match.group("msgs").split(","): |
| messages[line, msg_id.strip()] += 1 |
| return messages |
| |
| @staticmethod |
| def multiset_difference( |
| expected_entries: Counter, actual_entries: Counter |
| ) -> Tuple[Counter, Dict[str, int]]: |
| """Takes two multisets and compares them. |
| |
| A multiset is a dict with the cardinality of the key as the value.""" |
| missing = expected_entries.copy() |
| missing.subtract(actual_entries) |
| unexpected = {} |
| for key, value in list(missing.items()): |
| if value <= 0: |
| missing.pop(key) |
| if value < 0: |
| unexpected[key] = -value |
| return missing, unexpected |
| |
| # pylint: disable=consider-using-with |
| def _open_expected_file(self): |
| try: |
| return open(self._test_file.expected_output) |
| except FileNotFoundError: |
| return StringIO("") |
| |
| # pylint: disable=consider-using-with |
| def _open_source_file(self): |
| if self._test_file.base == "invalid_encoded_data": |
| return open(self._test_file.source) |
| if "latin1" in self._test_file.base: |
| return open(self._test_file.source, encoding="latin1") |
| return open(self._test_file.source, encoding="utf8") |
| |
| def _get_expected(self): |
| with self._open_source_file() as f: |
| expected_msgs = self.get_expected_messages(f) |
| if not expected_msgs: |
| return Counter(), [] |
| with self._open_expected_file() as f: |
| expected_output_lines = [ |
| OutputLine.from_csv(row) for row in csv.reader(f, "test") |
| ] |
| return expected_msgs, expected_output_lines |
| |
| def _get_actual(self): |
| messages = self._linter.reporter.messages |
| messages.sort(key=lambda m: (m.line, m.symbol, m.msg)) |
| received_msgs = Counter() |
| received_output_lines = [] |
| for msg in messages: |
| assert ( |
| msg.symbol != "fatal" |
| ), f"Pylint analysis failed because of '{msg.msg}'" |
| received_msgs[msg.line, msg.symbol] += 1 |
| received_output_lines.append(OutputLine.from_msg(msg)) |
| return received_msgs, received_output_lines |
| |
| def _runTest(self): |
| __tracebackhide__ = True # pylint: disable=unused-variable |
| modules_to_check = [self._test_file.source] |
| self._linter.check(modules_to_check) |
| expected_messages, expected_output = self._get_expected() |
| actual_messages, actual_output = self._get_actual() |
| assert ( |
| expected_messages == actual_messages |
| ), self.error_msg_for_unequal_messages( |
| actual_messages, expected_messages, actual_output |
| ) |
| self._check_output_text(expected_messages, expected_output, actual_output) |
| |
| def error_msg_for_unequal_messages( |
| self, actual_messages, expected_messages, actual_output: List[OutputLine] |
| ): |
| msg = ['Wrong results for file "%s":' % (self._test_file.base)] |
| missing, unexpected = self.multiset_difference( |
| expected_messages, actual_messages |
| ) |
| if missing: |
| msg.append("\nExpected in testdata:") |
| msg.extend(" %3d: %s" % msg for msg in sorted(missing)) |
| if unexpected: |
| msg.append("\nUnexpected in testdata:") |
| msg.extend(" %3d: %s" % msg for msg in sorted(unexpected)) # type: ignore |
| error_msg = "\n".join(msg) |
| if self._config and self._config.getoption("verbose") > 0: |
| error_msg += "\n\nActual pylint output for this file:\n" |
| error_msg += "\n".join(str(o) for o in actual_output) |
| return error_msg |
| |
| def error_msg_for_unequal_output(self, expected_lines, received_lines) -> str: |
| missing = set(expected_lines) - set(received_lines) |
| unexpected = set(received_lines) - set(expected_lines) |
| error_msg = ( |
| f"Wrong output for '{self._test_file.base}.txt':\n" |
| "You can update the expected output automatically with: '" |
| f"python tests/test_functional.py {UPDATE_OPTION} -k " |
| f'"test_functional[{self._test_file.base}]"\'\n\n' |
| ) |
| sort_by_line_number = operator.attrgetter("lineno") |
| if missing: |
| error_msg += "\n- Missing lines:\n" |
| for line in sorted(missing, key=sort_by_line_number): |
| error_msg += f"{line}\n" |
| if unexpected: |
| error_msg += "\n- Unexpected lines:\n" |
| for line in sorted(unexpected, key=sort_by_line_number): |
| error_msg += f"{line}\n" |
| return error_msg |
| |
| def _check_output_text(self, _, expected_output, actual_output): |
| """This is a function because we want to be able to update the text in LintModuleOutputUpdate""" |
| assert expected_output == actual_output, self.error_msg_for_unequal_output( |
| expected_output, actual_output |
| ) |