blob: 209f49ae83d3a87b41ec2566945c873c86f1ec3f [file] [log] [blame]
#!/usr/bin/env python
# Copyright (C) 2012-2019 Steven Myint
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Removes unused imports and unused variables as reported by pyflakes."""
import ast
import collections
import difflib
import fnmatch
import io
import logging
import os
import re
import signal
import string
import sys
import tokenize
import distutils.sysconfig
import pyflakes.api
import pyflakes.messages
import pyflakes.reporter
__version__ = '1.5.0'
_LOGGER = logging.getLogger('autoflake')
_LOGGER.propagate = False
ATOMS = frozenset([tokenize.NAME, tokenize.NUMBER, tokenize.STRING])
EXCEPT_REGEX = re.compile(r'^\s*except [\s,()\w]+ as \w+:$')
PYTHON_SHEBANG_REGEX = re.compile(r'^#!.*\bpython[3]?\b\s*$')
MAX_PYTHON_FILE_DETECTION_BYTES = 1024
def standard_paths():
"""Yield paths to standard modules."""
for is_plat_spec in [True, False]:
# Yield lib paths.
path = distutils.sysconfig.get_python_lib(
standard_lib=True,
plat_specific=is_plat_spec,
)
yield from os.listdir(path)
# Yield lib-dynload paths.
dynload_path = os.path.join(path, 'lib-dynload')
if os.path.isdir(dynload_path):
yield from os.listdir(dynload_path)
def standard_package_names():
"""Yield standard module names."""
for name in standard_paths():
if name.startswith('_') or '-' in name:
continue
if '.' in name and not name.endswith(('so', 'py', 'pyc')):
continue
yield name.split('.')[0]
IMPORTS_WITH_SIDE_EFFECTS = {'antigravity', 'rlcompleter', 'this'}
# In case they are built into CPython.
BINARY_IMPORTS = {
'datetime', 'grp', 'io', 'json', 'math', 'multiprocessing',
'parser', 'pwd', 'string', 'operator', 'os', 'sys', 'time',
}
SAFE_IMPORTS = (
frozenset(standard_package_names()) -
IMPORTS_WITH_SIDE_EFFECTS |
BINARY_IMPORTS
)
def unused_import_line_numbers(messages):
"""Yield line numbers of unused imports."""
for message in messages:
if isinstance(message, pyflakes.messages.UnusedImport):
yield message.lineno
def unused_import_module_name(messages):
"""Yield line number and module name of unused imports."""
pattern = r'\'(.+?)\''
for message in messages:
if isinstance(message, pyflakes.messages.UnusedImport):
module_name = re.search(pattern, str(message))
if module_name:
module_name = module_name.group()[1:-1]
yield (message.lineno, module_name)
def star_import_used_line_numbers(messages):
"""Yield line number of star import usage."""
for message in messages:
if isinstance(message, pyflakes.messages.ImportStarUsed):
yield message.lineno
def star_import_usage_undefined_name(messages):
"""Yield line number, undefined name, and its possible origin module."""
for message in messages:
if isinstance(message, pyflakes.messages.ImportStarUsage):
undefined_name = message.message_args[0]
module_name = message.message_args[1]
yield (message.lineno, undefined_name, module_name)
def unused_variable_line_numbers(messages):
"""Yield line numbers of unused variables."""
for message in messages:
if isinstance(message, pyflakes.messages.UnusedVariable):
yield message.lineno
def duplicate_key_line_numbers(messages, source):
"""Yield line numbers of duplicate keys."""
messages = [
message for message in messages
if isinstance(message, pyflakes.messages.MultiValueRepeatedKeyLiteral)
]
if messages:
# Filter out complex cases. We don't want to bother trying to parse
# this stuff and get it right. We can do it on a key-by-key basis.
key_to_messages = create_key_to_messages_dict(messages)
lines = source.split('\n')
for (key, messages) in key_to_messages.items():
good = True
for message in messages:
line = lines[message.lineno - 1]
key = message.message_args[0]
if not dict_entry_has_key(line, key):
good = False
if good:
for message in messages:
yield message.lineno
def create_key_to_messages_dict(messages):
"""Return dict mapping the key to list of messages."""
dictionary = collections.defaultdict(lambda: [])
for message in messages:
dictionary[message.message_args[0]].append(message)
return dictionary
def check(source):
"""Return messages from pyflakes."""
reporter = ListReporter()
try:
pyflakes.api.check(source, filename='<string>', reporter=reporter)
except (AttributeError, RecursionError, UnicodeDecodeError):
pass
return reporter.messages
class StubFile:
"""Stub out file for pyflakes."""
def write(self, *_):
"""Stub out."""
class ListReporter(pyflakes.reporter.Reporter):
"""Accumulate messages in messages list."""
def __init__(self):
"""Initialize.
Ignore errors from Reporter.
"""
ignore = StubFile()
pyflakes.reporter.Reporter.__init__(self, ignore, ignore)
self.messages = []
def flake(self, message):
"""Accumulate messages."""
self.messages.append(message)
def extract_package_name(line):
"""Return package name in import statement."""
assert '\\' not in line
assert '(' not in line
assert ')' not in line
assert ';' not in line
if line.lstrip().startswith(('import', 'from')):
word = line.split()[1]
else:
# Ignore doctests.
return None
package = word.split('.')[0]
assert ' ' not in package
return package
def multiline_import(line, previous_line=''):
"""Return True if import is spans multiples lines."""
for symbol in '()':
if symbol in line:
return True
return multiline_statement(line, previous_line)
def multiline_statement(line, previous_line=''):
"""Return True if this is part of a multiline statement."""
for symbol in '\\:;':
if symbol in line:
return True
sio = io.StringIO(line)
try:
list(tokenize.generate_tokens(sio.readline))
return previous_line.rstrip().endswith('\\')
except (SyntaxError, tokenize.TokenError):
return True
class PendingFix:
"""Allows a rewrite operation to span multiple lines.
In the main rewrite loop, every time a helper function returns a
``PendingFix`` object instead of a string, this object will be called
with the following line.
"""
def __init__(self, line):
"""Analyse and store the first line."""
self.accumulator = collections.deque([line])
def __call__(self, line):
"""Process line considering the accumulator.
Return self to keep processing the following lines or a string
with the final result of all the lines processed at once.
"""
raise NotImplementedError('Abstract method needs to be overwritten')
def _valid_char_in_line(char, line):
"""Return True if a char appears in the line and is not commented."""
comment_index = line.find('#')
char_index = line.find(char)
valid_char_in_line = (
char_index >= 0 and
(comment_index > char_index or comment_index < 0)
)
return valid_char_in_line
def _top_module(module_name):
"""Return the name of the top level module in the hierarchy."""
if module_name[0] == '.':
return '%LOCAL_MODULE%'
return module_name.split('.')[0]
def _modules_to_remove(unused_modules, safe_to_remove=SAFE_IMPORTS):
"""Discard unused modules that are not safe to remove from the list."""
return [x for x in unused_modules if _top_module(x) in safe_to_remove]
def _segment_module(segment):
"""Extract the module identifier inside the segment.
It might be the case the segment does not have a module (e.g. is composed
just by a parenthesis or line continuation and whitespace). In this
scenario we just keep the segment... These characters are not valid in
identifiers, so they will never be contained in the list of unused modules
anyway.
"""
return segment.strip(string.whitespace + ',\\()') or segment
class FilterMultilineImport(PendingFix):
"""Remove unused imports from multiline import statements.
This class handles both the cases: "from imports" and "direct imports".
Some limitations exist (e.g. imports with comments, lines joined by ``;``,
etc). In these cases, the statement is left unchanged to avoid problems.
"""
IMPORT_RE = re.compile(r'\bimport\b\s*')
INDENTATION_RE = re.compile(r'^\s*')
BASE_RE = re.compile(r'\bfrom\s+([^ ]+)')
SEGMENT_RE = re.compile(
r'([^,\s]+(?:[\s\\]+as[\s\\]+[^,\s]+)?[,\s\\)]*)', re.M,
)
# ^ module + comma + following space (including new line and continuation)
IDENTIFIER_RE = re.compile(r'[^,\s]+')
def __init__(
self, line, unused_module=(), remove_all_unused_imports=False,
safe_to_remove=SAFE_IMPORTS, previous_line='',
):
"""Receive the same parameters as ``filter_unused_import``."""
self.remove = unused_module
self.parenthesized = '(' in line
self.from_, imports = self.IMPORT_RE.split(line, maxsplit=1)
match = self.BASE_RE.search(self.from_)
self.base = match.group(1) if match else None
self.give_up = False
if not remove_all_unused_imports:
if self.base and _top_module(self.base) not in safe_to_remove:
self.give_up = True
else:
self.remove = _modules_to_remove(self.remove, safe_to_remove)
if '\\' in previous_line:
# Ignore tricky things like "try: \<new line> import" ...
self.give_up = True
self.analyze(line)
PendingFix.__init__(self, imports)
def is_over(self, line=None):
"""Return True if the multiline import statement is over."""
line = line or self.accumulator[-1]
if self.parenthesized:
return _valid_char_in_line(')', line)
return not _valid_char_in_line('\\', line)
def analyze(self, line):
"""Decide if the statement will be fixed or left unchanged."""
if any(ch in line for ch in ';:#'):
self.give_up = True
def fix(self, accumulated):
"""Given a collection of accumulated lines, fix the entire import."""
old_imports = ''.join(accumulated)
ending = get_line_ending(old_imports)
# Split imports into segments that contain the module name +
# comma + whitespace and eventual <newline> \ ( ) chars
segments = [x for x in self.SEGMENT_RE.findall(old_imports) if x]
modules = [_segment_module(x) for x in segments]
keep = _filter_imports(modules, self.base, self.remove)
# Short-circuit if no import was discarded
if len(keep) == len(segments):
return self.from_ + 'import ' + ''.join(accumulated)
fixed = ''
if keep:
# Since it is very difficult to deal with all the line breaks and
# continuations, let's use the code layout that already exists and
# just replace the module identifiers inside the first N-1 segments
# + the last segment
templates = list(zip(modules, segments))
templates = templates[:len(keep) - 1] + templates[-1:]
# It is important to keep the last segment, since it might contain
# important chars like `)`
fixed = ''.join(
template.replace(module, keep[i])
for i, (module, template) in enumerate(templates)
)
# Fix the edge case: inline parenthesis + just one surviving import
if self.parenthesized and any(ch not in fixed for ch in '()'):
fixed = fixed.strip(string.whitespace + '()') + ending
# Replace empty imports with a "pass" statement
empty = len(fixed.strip(string.whitespace + '\\(),')) < 1
if empty:
indentation = self.INDENTATION_RE.search(self.from_).group(0)
return indentation + 'pass' + ending
return self.from_ + 'import ' + fixed
def __call__(self, line=None):
"""Accumulate all the lines in the import and then trigger the fix."""
if line:
self.accumulator.append(line)
self.analyze(line)
if not self.is_over(line):
return self
if self.give_up:
return self.from_ + 'import ' + ''.join(self.accumulator)
return self.fix(self.accumulator)
def _filter_imports(imports, parent=None, unused_module=()):
# We compare full module name (``a.module`` not `module`) to
# guarantee the exact same module as detected from pyflakes.
sep = '' if parent and parent[-1] == '.' else '.'
def full_name(name):
return name if parent is None else parent + sep + name
return [x for x in imports if full_name(x) not in unused_module]
def filter_from_import(line, unused_module):
"""Parse and filter ``from something import a, b, c``.
Return line without unused import modules, or `pass` if all of the
module in import is unused.
"""
(indentation, imports) = re.split(
pattern=r'\bimport\b',
string=line, maxsplit=1,
)
base_module = re.search(
pattern=r'\bfrom\s+([^ ]+)',
string=indentation,
).group(1)
imports = re.split(pattern=r'\s*,\s*', string=imports.strip())
filtered_imports = _filter_imports(imports, base_module, unused_module)
# All of the import in this statement is unused
if not filtered_imports:
return get_indentation(line) + 'pass' + get_line_ending(line)
indentation += 'import '
return (
indentation +
', '.join(sorted(filtered_imports)) +
get_line_ending(line)
)
def break_up_import(line):
"""Return line with imports on separate lines."""
assert '\\' not in line
assert '(' not in line
assert ')' not in line
assert ';' not in line
assert '#' not in line
assert not line.lstrip().startswith('from')
newline = get_line_ending(line)
if not newline:
return line
(indentation, imports) = re.split(
pattern=r'\bimport\b',
string=line, maxsplit=1,
)
indentation += 'import '
assert newline
return ''.join([
indentation + i.strip() + newline
for i in sorted(imports.split(','))
])
def filter_code(
source, additional_imports=None,
expand_star_imports=False,
remove_all_unused_imports=False,
remove_duplicate_keys=False,
remove_unused_variables=False,
ignore_init_module_imports=False,
):
"""Yield code with unused imports removed."""
imports = SAFE_IMPORTS
if additional_imports:
imports |= frozenset(additional_imports)
del additional_imports
messages = check(source)
if ignore_init_module_imports:
marked_import_line_numbers = frozenset()
else:
marked_import_line_numbers = frozenset(
unused_import_line_numbers(messages),
)
marked_unused_module = collections.defaultdict(lambda: [])
for line_number, module_name in unused_import_module_name(messages):
marked_unused_module[line_number].append(module_name)
if expand_star_imports and not (
# See explanations in #18.
re.search(r'\b__all__\b', source) or
re.search(r'\bdel\b', source)
):
marked_star_import_line_numbers = frozenset(
star_import_used_line_numbers(messages),
)
if len(marked_star_import_line_numbers) > 1:
# Auto expanding only possible for single star import
marked_star_import_line_numbers = frozenset()
else:
undefined_names = []
for line_number, undefined_name, _ \
in star_import_usage_undefined_name(messages):
undefined_names.append(undefined_name)
if not undefined_names:
marked_star_import_line_numbers = frozenset()
else:
marked_star_import_line_numbers = frozenset()
if remove_unused_variables:
marked_variable_line_numbers = frozenset(
unused_variable_line_numbers(messages),
)
else:
marked_variable_line_numbers = frozenset()
if remove_duplicate_keys:
marked_key_line_numbers = frozenset(
duplicate_key_line_numbers(messages, source),
)
else:
marked_key_line_numbers = frozenset()
line_messages = get_messages_by_line(messages)
sio = io.StringIO(source)
previous_line = ''
result = None
for line_number, line in enumerate(sio.readlines(), start=1):
if isinstance(result, PendingFix):
result = result(line)
elif '#' in line:
result = line
elif line_number in marked_import_line_numbers:
result = filter_unused_import(
line,
unused_module=marked_unused_module[line_number],
remove_all_unused_imports=remove_all_unused_imports,
imports=imports,
previous_line=previous_line,
)
elif line_number in marked_variable_line_numbers:
result = filter_unused_variable(line)
elif line_number in marked_key_line_numbers:
result = filter_duplicate_key(
line, line_messages[line_number],
line_number, marked_key_line_numbers,
source,
)
elif line_number in marked_star_import_line_numbers:
result = filter_star_import(line, undefined_names)
else:
result = line
if not isinstance(result, PendingFix):
yield result
previous_line = line
def get_messages_by_line(messages):
"""Return dictionary that maps line number to message."""
line_messages = {}
for message in messages:
line_messages[message.lineno] = message
return line_messages
def filter_star_import(line, marked_star_import_undefined_name):
"""Return line with the star import expanded."""
undefined_name = sorted(set(marked_star_import_undefined_name))
return re.sub(r'\*', ', '.join(undefined_name), line)
def filter_unused_import(
line, unused_module, remove_all_unused_imports,
imports, previous_line='',
):
"""Return line if used, otherwise return None."""
# Ignore doctests.
if line.lstrip().startswith('>'):
return line
if multiline_import(line, previous_line):
filt = FilterMultilineImport(
line, unused_module,
remove_all_unused_imports,
imports, previous_line,
)
return filt()
is_from_import = line.lstrip().startswith('from')
if ',' in line and not is_from_import:
return break_up_import(line)
package = extract_package_name(line)
if not remove_all_unused_imports and package not in imports:
return line
if ',' in line:
assert is_from_import
return filter_from_import(line, unused_module)
else:
# We need to replace import with "pass" in case the import is the
# only line inside a block. For example,
# "if True:\n import os". In such cases, if the import is
# removed, the block will be left hanging with no body.
return (
get_indentation(line) +
'pass' +
get_line_ending(line)
)
def filter_unused_variable(line, previous_line=''):
"""Return line if used, otherwise return None."""
if re.match(EXCEPT_REGEX, line):
return re.sub(r' as \w+:$', ':', line, count=1)
elif multiline_statement(line, previous_line):
return line
elif line.count('=') == 1:
split_line = line.split('=')
assert len(split_line) == 2
value = split_line[1].lstrip()
if ',' in split_line[0]:
return line
if is_literal_or_name(value):
# Rather than removing the line, replace with it "pass" to avoid
# a possible hanging block with no body.
value = 'pass' + get_line_ending(line)
return get_indentation(line) + value
else:
return line
def filter_duplicate_key(
line, message, line_number, marked_line_numbers,
source, previous_line='',
):
"""Return '' if first occurrence of the key otherwise return `line`."""
if marked_line_numbers and line_number == sorted(marked_line_numbers)[0]:
return ''
return line
def dict_entry_has_key(line, key):
"""Return True if `line` is a dict entry that uses `key`.
Return False for multiline cases where the line should not be removed by
itself.
"""
if '#' in line:
return False
result = re.match(r'\s*(.*)\s*:\s*(.*),\s*$', line)
if not result:
return False
try:
candidate_key = ast.literal_eval(result.group(1))
except (SyntaxError, ValueError):
return False
if multiline_statement(result.group(2)):
return False
return candidate_key == key
def is_literal_or_name(value):
"""Return True if value is a literal or a name."""
try:
ast.literal_eval(value)
return True
except (SyntaxError, ValueError):
pass
if value.strip() in ['dict()', 'list()', 'set()']:
return True
# Support removal of variables on the right side. But make sure
# there are no dots, which could mean an access of a property.
return re.match(r'^\w+\s*$', value)
def useless_pass_line_numbers(source):
"""Yield line numbers of unneeded "pass" statements."""
sio = io.StringIO(source)
previous_token_type = None
last_pass_row = None
last_pass_indentation = None
previous_line = ''
for token in tokenize.generate_tokens(sio.readline):
token_type = token[0]
start_row = token[2][0]
line = token[4]
is_pass = (token_type == tokenize.NAME and line.strip() == 'pass')
# Leading "pass".
if (
start_row - 1 == last_pass_row and
get_indentation(line) == last_pass_indentation and
token_type in ATOMS and
not is_pass
):
yield start_row - 1
if is_pass:
last_pass_row = start_row
last_pass_indentation = get_indentation(line)
# Trailing "pass".
if (
is_pass and
previous_token_type != tokenize.INDENT and
not previous_line.rstrip().endswith('\\')
):
yield start_row
previous_token_type = token_type
previous_line = line
def filter_useless_pass(source):
"""Yield code with useless "pass" lines removed."""
try:
marked_lines = frozenset(useless_pass_line_numbers(source))
except (SyntaxError, tokenize.TokenError):
marked_lines = frozenset()
sio = io.StringIO(source)
for line_number, line in enumerate(sio.readlines(), start=1):
if line_number not in marked_lines:
yield line
def get_indentation(line):
"""Return leading whitespace."""
if line.strip():
non_whitespace_index = len(line) - len(line.lstrip())
return line[:non_whitespace_index]
else:
return ''
def get_line_ending(line):
"""Return line ending."""
non_whitespace_index = len(line.rstrip()) - len(line)
if not non_whitespace_index:
return ''
else:
return line[non_whitespace_index:]
def fix_code(
source, additional_imports=None, expand_star_imports=False,
remove_all_unused_imports=False, remove_duplicate_keys=False,
remove_unused_variables=False, ignore_init_module_imports=False,
):
"""Return code with all filtering run on it."""
if not source:
return source
# pyflakes does not handle "nonlocal" correctly.
if 'nonlocal' in source:
remove_unused_variables = False
filtered_source = None
while True:
filtered_source = ''.join(
filter_useless_pass(
''.join(
filter_code(
source,
additional_imports=additional_imports,
expand_star_imports=expand_star_imports,
remove_all_unused_imports=remove_all_unused_imports,
remove_duplicate_keys=remove_duplicate_keys,
remove_unused_variables=remove_unused_variables,
ignore_init_module_imports=ignore_init_module_imports,
),
),
),
)
if filtered_source == source:
break
source = filtered_source
return filtered_source
def fix_file(filename, args, standard_out=None):
"""Run fix_code() on a file."""
if standard_out is None:
standard_out = sys.stdout
encoding = detect_encoding(filename)
with open_with_encoding(filename, encoding=encoding) as input_file:
return _fix_file(
input_file, filename, args, args['write_to_stdout'],
standard_out, encoding=encoding,
)
def _fix_file(
input_file, filename, args, write_to_stdout, standard_out,
encoding=None,
):
source = input_file.read()
original_source = source
isInitFile = os.path.basename(filename) == '__init__.py'
if args['ignore_init_module_imports'] and isInitFile:
ignore_init_module_imports = True
else:
ignore_init_module_imports = False
filtered_source = fix_code(
source,
additional_imports=(
args['imports'].split(',') if args['imports'] else None
),
expand_star_imports=args['expand_star_imports'],
remove_all_unused_imports=args['remove_all_unused_imports'],
remove_duplicate_keys=args['remove_duplicate_keys'],
remove_unused_variables=args['remove_unused_variables'],
ignore_init_module_imports=ignore_init_module_imports,
)
if original_source != filtered_source:
if args['check']:
standard_out.write(
f'{filename}: Unused imports/variables detected\n',
)
sys.exit(1)
if write_to_stdout:
standard_out.write(filtered_source)
elif args['in_place']:
with open_with_encoding(
filename, mode='w',
encoding=encoding,
) as output_file:
output_file.write(filtered_source)
_LOGGER.info('Fixed %s', filename)
else:
diff = get_diff_text(
io.StringIO(original_source).readlines(),
io.StringIO(filtered_source).readlines(),
filename,
)
standard_out.write(''.join(diff))
elif write_to_stdout:
standard_out.write(filtered_source)
else:
if args['check'] and not args['quiet']:
standard_out.write('No issues detected!\n')
else:
_LOGGER.debug('Clean %s: nothing to fix', filename)
def open_with_encoding(
filename, encoding, mode='r',
limit_byte_check=-1,
):
"""Return opened file with a specific encoding."""
if not encoding:
encoding = detect_encoding(filename, limit_byte_check=limit_byte_check)
return open(
filename, mode=mode, encoding=encoding,
newline='',
) # Preserve line endings
def detect_encoding(filename, limit_byte_check=-1):
"""Return file encoding."""
try:
with open(filename, 'rb') as input_file:
encoding = _detect_encoding(input_file.readline)
# Check for correctness of encoding.
with open_with_encoding(filename, encoding) as input_file:
input_file.read(limit_byte_check)
return encoding
except (LookupError, SyntaxError, UnicodeDecodeError):
return 'latin-1'
def _detect_encoding(readline):
"""Return file encoding."""
try:
encoding = tokenize.detect_encoding(readline)[0]
return encoding
except (LookupError, SyntaxError, UnicodeDecodeError):
return 'latin-1'
def get_diff_text(old, new, filename):
"""Return text of unified diff between old and new."""
newline = '\n'
diff = difflib.unified_diff(
old, new,
'original/' + filename,
'fixed/' + filename,
lineterm=newline,
)
text = ''
for line in diff:
text += line
# Work around missing newline (http://bugs.python.org/issue2142).
if not line.endswith(newline):
text += newline + r'\ No newline at end of file' + newline
return text
def _split_comma_separated(string):
"""Return a set of strings."""
return {text.strip() for text in string.split(',') if text.strip()}
def is_python_file(filename):
"""Return True if filename is Python file."""
if filename.endswith('.py'):
return True
try:
with open_with_encoding(
filename,
None,
limit_byte_check=MAX_PYTHON_FILE_DETECTION_BYTES,
) as f:
text = f.read(MAX_PYTHON_FILE_DETECTION_BYTES)
if not text:
return False
first_line = text.splitlines()[0]
except (OSError, IndexError):
return False
if not PYTHON_SHEBANG_REGEX.match(first_line):
return False
return True
def is_exclude_file(filename, exclude):
"""Return True if file matches exclude pattern."""
base_name = os.path.basename(filename)
if base_name.startswith('.'):
return True
for pattern in exclude:
if fnmatch.fnmatch(base_name, pattern):
return True
if fnmatch.fnmatch(filename, pattern):
return True
return False
def match_file(filename, exclude):
"""Return True if file is okay for modifying/recursing."""
if is_exclude_file(filename, exclude):
_LOGGER.debug('Skipped %s: matched to exclude pattern', filename)
return False
if not os.path.isdir(filename) and not is_python_file(filename):
return False
return True
def find_files(filenames, recursive, exclude):
"""Yield filenames."""
while filenames:
name = filenames.pop(0)
if recursive and os.path.isdir(name):
for root, directories, children in os.walk(name):
filenames += [
os.path.join(root, f) for f in children
if match_file(
os.path.join(root, f),
exclude,
)
]
directories[:] = [
d for d in directories
if match_file(
os.path.join(root, d),
exclude,
)
]
else:
if not is_exclude_file(name, exclude):
yield name
else:
_LOGGER.debug('Skipped %s: matched to exclude pattern', name)
def process_pyproject_toml(toml_file_path):
"""Extract config mapping from pyproject.toml file."""
import toml
return toml.load(toml_file_path).get('tool', {}).get('autoflake', None)
def process_setup_cfg(cfg_file_path):
"""Extract config mapping from setup.cfg file."""
import configparser
reader = configparser.ConfigParser()
reader.read(cfg_file_path)
if not reader.has_section('autoflake'):
return None
return reader['autoflake']
def merge_configuration_file(args):
"""Merge configuration from a file into args."""
# Configuration file parsers {filename: parser function}.
CONFIG_FILES = {
'pyproject.toml': process_pyproject_toml,
'setup.cfg': process_setup_cfg,
}
BOOL_TYPES = {
'1': True,
'yes': True,
'true': True,
'on': True,
'0': False,
'no': False,
'false': False,
'off': False,
}
# Traverse the file tree common to all files given as argument looking for
# a configuration file
config_path = os.path.commonpath([
os.path.abspath(file)
for file in args.files
])
config = None
while True:
for config_file, processor in CONFIG_FILES.items():
config_file_path = os.path.join(
os.path.join(config_path, config_file),
)
if os.path.isfile(config_file_path):
config = processor(config_file_path)
if config is not None:
break
if config is not None:
break
config_path, tail = os.path.split(config_path)
if not tail:
break
if config:
# merge config
for name, value in config.items():
if name in {'imports', 'exclude'}:
# comma separated list properties
if isinstance(value, list) and all(
isinstance(val, str) for val in value
):
value = ','.join(str(val) for val in value)
if not isinstance(value, str):
_LOGGER.error(
"'%s' in the config file should be a comma separated"
' string or list of strings',
name,
)
return False
current_value = getattr(args, name, None)
if current_value is not None:
value = ','.join((value, current_value))
setattr(args, name, value)
elif name in {
'check', 'expand-star-imports', 'ignore-init-module-imports',
'in-place', 'recursive', 'remove-all-unused-imports',
'remove-duplicate-keys', 'remove-unused-variables',
}:
# boolean properties
if isinstance(value, str):
value = BOOL_TYPES.get(value, value)
if not isinstance(value, bool):
_LOGGER.error(
"'%s' in the config file should be a boolean", name,
)
return False
if value:
setattr(args, name.replace('-', '_'), value)
else:
_LOGGER.error("'%s' is not a valid configuration option", name)
return False
return True
def _main(argv, standard_out, standard_error, standard_input=None):
"""Return exit status.
0 means no error.
"""
import argparse
parser = argparse.ArgumentParser(description=__doc__, prog='autoflake')
parser.add_argument(
'-c', '--check', action='store_true',
help='return error code if changes are needed',
)
parser.add_argument(
'-r', '--recursive', action='store_true',
help='drill down directories recursively',
)
parser.add_argument(
'-j', '--jobs', type=int, metavar='n', default=0,
help='number of parallel jobs; '
'match CPU count if value is 0 (default: 0)',
)
parser.add_argument(
'--exclude', metavar='globs',
help='exclude file/directory names that match these '
'comma-separated globs',
)
parser.add_argument(
'--imports',
help='by default, only unused standard library '
'imports are removed; specify a comma-separated '
'list of additional modules/packages',
)
parser.add_argument(
'--expand-star-imports', action='store_true',
help='expand wildcard star imports with undefined '
'names; this only triggers if there is only '
'one star import in the file; this is skipped if '
'there are any uses of `__all__` or `del` in the '
'file',
)
parser.add_argument(
'--remove-all-unused-imports', action='store_true',
help='remove all unused imports (not just those from '
'the standard library)',
)
parser.add_argument(
'--ignore-init-module-imports', action='store_true',
help='exclude __init__.py when removing unused '
'imports',
)
parser.add_argument(
'--remove-duplicate-keys', action='store_true',
help='remove all duplicate keys in objects',
)
parser.add_argument(
'--remove-unused-variables', action='store_true',
help='remove unused variables',
)
parser.add_argument(
'--version', action='version',
version='%(prog)s ' + __version__,
)
parser.add_argument(
'--quiet', action='store_true',
help='Suppress output if there are no issues',
)
parser.add_argument(
'-v', '--verbose', action='count', dest='verbosity',
default=0, help='print more verbose logs (you can '
'repeat `-v` to make it more verbose)',
)
parser.add_argument(
'--stdin-display-name', dest='stdin_display_name',
default='stdin',
help='the name used when processing input from stdin',
)
parser.add_argument('files', nargs='+', help='files to format')
group = parser.add_mutually_exclusive_group()
group.add_argument(
'-i', '--in-place', action='store_true',
help='make changes to files instead of printing diffs',
)
group.add_argument(
'-s', '--stdout', action='store_true',
dest='write_to_stdout',
help=(
'print changed text to stdout. defaults to true '
'when formatting stdin, or to false otherwise'
),
)
args = parser.parse_args(argv[1:])
if standard_error is None:
_LOGGER.addHandler(logging.NullHandler())
else:
_LOGGER.addHandler(logging.StreamHandler(standard_error))
loglevels = [logging.WARNING, logging.INFO, logging.DEBUG]
try:
loglevel = loglevels[args.verbosity]
except IndexError: # Too much -v
loglevel = loglevels[-1]
_LOGGER.setLevel(loglevel)
if not merge_configuration_file(args):
return 1
if args.remove_all_unused_imports and args.imports:
_LOGGER.error('Using both --remove-all and --imports is redundant')
return 1
if args.exclude:
args.exclude = _split_comma_separated(args.exclude)
else:
args.exclude = set()
if args.jobs < 1:
args.jobs = os.cpu_count() or 1
filenames = list(set(args.files))
failure = False
# convert argparse namespace to a dict so that it can be serialized
# by multiprocessing
args = vars(args)
files = list(find_files(filenames, args['recursive'], args['exclude']))
if args['jobs'] == 1 or len(files) == 1 or args['jobs'] == 1 or \
'-' in files or standard_out is not None:
for name in files:
if name == '-':
_fix_file(
standard_input, args['stdin_display_name'],
args=args, write_to_stdout=True,
standard_out=standard_out or sys.stdout,
)
else:
try:
fix_file(name, args=args, standard_out=standard_out)
except OSError as exception:
_LOGGER.error(str(exception))
failure = True
else:
import multiprocessing
with multiprocessing.Pool(args['jobs']) as pool:
futs = []
for name in files:
fut = pool.apply_async(fix_file, args=(name, args))
futs.append(fut)
for fut in futs:
try:
fut.get()
except OSError as exception:
_LOGGER.error(str(exception))
failure = True
return 1 if failure else 0
def main():
"""Command-line entry point."""
try:
# Exit on broken pipe.
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except AttributeError: # pragma: no cover
# SIGPIPE is not available on Windows.
pass
try:
return _main(
sys.argv,
standard_out=None,
standard_error=sys.stderr,
standard_input=sys.stdin,
)
except KeyboardInterrupt: # pragma: no cover
return 2 # pragma: no cover
if __name__ == '__main__':
sys.exit(main())