blob: 1ad4864c7822c5e13b5d3ee48ad78e7aca5bbcf7 [file] [log] [blame] [edit]
"""Transform a mypy AST to the IR form (Intermediate Representation).
For example, consider a function like this:
def f(x: int) -> int:
return x * 2 + 1
It would be translated to something that conceptually looks like this:
r0 = 2
r1 = 1
r2 = x * r0 :: int
r3 = r2 + r1 :: int
return r3
"""
from typing import (
TypeVar, Callable, Dict, List, Tuple, Optional, Union, Sequence, Set, Any, cast
)
from typing_extensions import overload, NoReturn
from collections import OrderedDict
from abc import abstractmethod
import importlib.util
import itertools
from mypy.build import Graph
from mypy.nodes import (
MypyFile, SymbolNode, Statement, FuncItem, FuncDef, ReturnStmt, AssignmentStmt, OpExpr,
IntExpr, NameExpr, LDEF, Var, IfStmt, UnaryExpr, ComparisonExpr, WhileStmt, CallExpr,
IndexExpr, Block, Expression, ListExpr, ExpressionStmt, MemberExpr, ForStmt, RefExpr, Lvalue,
BreakStmt, ContinueStmt, ConditionalExpr, OperatorAssignmentStmt, TupleExpr, ClassDef,
TypeInfo, Import, ImportFrom, ImportAll, DictExpr, StrExpr, CastExpr, TempNode,
PassStmt, PromoteExpr, AssignmentExpr, AwaitExpr, BackquoteExpr, AssertStmt, BytesExpr,
ComplexExpr, Decorator, DelStmt, DictionaryComprehension, EllipsisExpr, EnumCallExpr, ExecStmt,
FloatExpr, GeneratorExpr, GlobalDecl, LambdaExpr, ListComprehension, SetComprehension,
NamedTupleExpr, NewTypeExpr, NonlocalDecl, OverloadedFuncDef, PrintStmt, RaiseStmt,
RevealExpr, SetExpr, SliceExpr, StarExpr, SuperExpr, TryStmt, TypeAliasExpr, TypeApplication,
TypeVarExpr, TypedDictExpr, UnicodeExpr, WithStmt, YieldFromExpr, YieldExpr, GDEF, ARG_POS,
ARG_OPT, ARG_NAMED, ARG_NAMED_OPT, ARG_STAR, ARG_STAR2, is_class_var, op_methods
)
from mypy.types import (
Type, Instance, CallableType, NoneTyp, TupleType, UnionType, AnyType, TypeVarType, PartialType,
TypeType, Overloaded, TypeOfAny, UninhabitedType, UnboundType, TypedDictType,
LiteralType,
get_proper_type,
)
from mypy.visitor import ExpressionVisitor, StatementVisitor
from mypy.checkexpr import map_actuals_to_formals
from mypy.state import strict_optional_set
from mypy.util import split_target
from mypyc.common import (
ENV_ATTR_NAME, NEXT_LABEL_ATTR_NAME, TEMP_ATTR_NAME, LAMBDA_NAME,
MAX_LITERAL_SHORT_INT, TOP_LEVEL_NAME, SELF_NAME, decorator_helper_name,
FAST_ISINSTANCE_MAX_SUBCLASSES, PROPSET_PREFIX
)
from mypyc.prebuildvisitor import PreBuildVisitor
from mypyc.ops import (
BasicBlock, AssignmentTarget, AssignmentTargetRegister, AssignmentTargetIndex,
AssignmentTargetAttr, AssignmentTargetTuple, Environment, Op, LoadInt, RType, Value, Register,
Return, FuncIR, Assign, Branch, Goto, RuntimeArg, Call, Box, Unbox, Cast, RTuple, Unreachable,
TupleGet, TupleSet, ClassIR, NonExtClassInfo, RInstance, ModuleIR, ModuleIRs, GetAttr, SetAttr,
LoadStatic, InitStatic, MethodCall, INVALID_FUNC_DEF, int_rprimitive, float_rprimitive,
bool_rprimitive, list_rprimitive, is_list_rprimitive, dict_rprimitive, set_rprimitive,
str_rprimitive, tuple_rprimitive, none_rprimitive, is_none_rprimitive, object_rprimitive,
exc_rtuple,
PrimitiveOp, ControlOp, OpDescription, RegisterOp,
is_object_rprimitive, LiteralsMap, FuncSignature, VTableAttr, VTableMethod, VTableEntries,
NAMESPACE_TYPE, NAMESPACE_MODULE,
RaiseStandardError, LoadErrorValue, NO_TRACEBACK_LINE_NO, FuncDecl,
FUNC_NORMAL, FUNC_STATICMETHOD, FUNC_CLASSMETHOD,
RUnion, is_optional_type, optional_value_type, all_concrete_classes
)
from mypyc.ops_primitive import binary_ops, unary_ops, func_ops, method_ops, name_ref_ops
from mypyc.ops_list import (
list_append_op, list_extend_op, list_len_op, new_list_op, to_list, list_pop_last
)
from mypyc.ops_tuple import list_tuple_op, new_tuple_op
from mypyc.ops_dict import (
new_dict_op, dict_get_item_op, dict_set_item_op, dict_update_in_display_op,
)
from mypyc.ops_set import new_set_op, set_add_op, set_update_op
from mypyc.ops_misc import (
none_op, none_object_op, true_op, false_op, iter_op, next_op, next_raw_op,
check_stop_op, send_op, yield_from_except_op, coro_op,
py_getattr_op, py_setattr_op, py_delattr_op, py_hasattr_op,
py_call_op, py_call_with_kwargs_op, py_method_call_op,
fast_isinstance_op, bool_op, new_slice_op, not_implemented_op,
type_op, pytype_from_template_op, import_op, get_module_dict_op,
ellipsis_op, method_new_op, type_is_op, type_object_op, py_calc_meta_op,
dataclass_sleight_of_hand,
)
from mypyc.ops_exc import (
raise_exception_op, raise_exception_with_tb_op, reraise_exception_op,
error_catch_op, restore_exc_info_op, exc_matches_op, get_exc_value_op,
get_exc_info_op, keep_propagating_op, set_stop_iteration_value,
)
from mypyc.genops_for import ForGenerator, ForRange, ForList, ForIterable, ForEnumerate, ForZip
from mypyc.rt_subtype import is_runtime_subtype
from mypyc.subtype import is_subtype
from mypyc.sametype import is_same_type, is_same_method_signature
from mypyc.crash import catch_errors
from mypyc.options import CompilerOptions
from mypyc.errors import Errors
GenFunc = Callable[[], None]
DictEntry = Tuple[Optional[Value], Value]
class UnsupportedException(Exception):
pass
# The stubs for callable contextmanagers are busted so cast it to the
# right type...
F = TypeVar('F', bound=Callable[..., Any])
strict_optional_dec = cast(Callable[[F], F], strict_optional_set(True))
def build_type_map(mapper: 'Mapper',
modules: List[MypyFile],
graph: Graph,
types: Dict[Expression, Type],
options: CompilerOptions,
errors: Errors) -> None:
# Collect all classes defined in everything we are compiling
classes = []
for module in modules:
module_classes = [node for node in module.defs if isinstance(node, ClassDef)]
classes.extend([(module, cdef) for cdef in module_classes])
# Collect all class mappings so that we can bind arbitrary class name
# references even if there are import cycles.
for module, cdef in classes:
class_ir = ClassIR(cdef.name, module.fullname(), is_trait(cdef),
is_abstract=cdef.info.is_abstract)
# If global optimizations are disabled, turn of tracking of class children
if not options.global_opts:
class_ir.children = None
mapper.type_to_ir[cdef.info] = class_ir
# Figure out which classes need to be compiled as non-extension classes.
mark_non_ext_classes(mapper.type_to_ir)
# Populate structural information in class IR for extension classes.
for module, cdef in classes:
with catch_errors(module.path, cdef.line):
if mapper.type_to_ir[cdef.info].is_ext_class:
prepare_class_def(module.path, module.fullname(), cdef, errors, mapper)
else:
prepare_non_ext_class_def(module.path, module.fullname(), cdef, errors, mapper)
# Collect all the functions also. We collect from the symbol table
# so that we can easily pick out the right copy of a function that
# is conditionally defined.
for module in modules:
for name, node in module.names.items():
# We need to filter out functions that are imported or
# aliases. The best way to do this seems to be by
# checking that the fullname matches.
if (isinstance(node.node, (FuncDef, Decorator, OverloadedFuncDef))
and node.fullname == module.fullname() + '.' + name):
prepare_func_def(module.fullname(), None, get_func_def(node.node), mapper)
# TODO: what else?
@strict_optional_dec # Turn on strict optional for any type manipulations we do
def build_ir(modules: List[MypyFile],
graph: Graph,
types: Dict[Expression, Type],
mapper: 'Mapper',
options: CompilerOptions,
errors: Errors) -> ModuleIRs:
build_type_map(mapper, modules, graph, types, options, errors)
result = OrderedDict() # type: ModuleIRs
# Generate IR for all modules.
module_names = [mod.fullname() for mod in modules]
class_irs = []
for module in modules:
# First pass to determine free symbols.
pbv = PreBuildVisitor()
module.accept(pbv)
# Second pass.
builder = IRBuilder(
module.fullname(), types, graph, errors, mapper, module_names, pbv, options
)
builder.visit_mypy_file(module)
module_ir = ModuleIR(
module.fullname(),
list(builder.imports),
builder.functions,
builder.classes,
builder.final_names
)
result[module.fullname()] = module_ir
class_irs.extend(builder.classes)
# Compute vtables.
for cir in class_irs:
if cir.is_ext_class:
compute_vtable(cir)
return result
def is_trait_decorator(d: Expression) -> bool:
return isinstance(d, RefExpr) and d.fullname == 'mypy_extensions.trait'
def is_trait(cdef: ClassDef) -> bool:
return any(is_trait_decorator(d) for d in cdef.decorators)
def is_dataclass_decorator(d: Expression) -> bool:
return (
(isinstance(d, RefExpr) and d.fullname == 'dataclasses.dataclass')
or (
isinstance(d, CallExpr)
and isinstance(d.callee, RefExpr)
and d.callee.fullname == 'dataclasses.dataclass'
)
)
def is_dataclass(cdef: ClassDef) -> bool:
return any(is_dataclass_decorator(d) for d in cdef.decorators)
def is_extension_class(cdef: ClassDef) -> bool:
if any(not is_trait_decorator(d) and not is_dataclass_decorator(d)
for d in cdef.decorators):
return False
elif (cdef.info.metaclass_type and cdef.info.metaclass_type.type.fullname() not in (
'abc.ABCMeta', 'typing.TypingMeta', 'typing.GenericMeta')):
return False
return True
def mark_non_ext_classes(class_map: Dict[TypeInfo, ClassIR]) -> None:
"""
Mark which classes should be compiled as non-extension classes.
Classes in the chain of base classes of a non-extension class
will all be marked as non-extension because currently
non-extension classes cannot inherit from extension classes.
"""
visit_first = list(class_map.keys())
visit_second = [] # type: List[TypeInfo]
# First pass to gather all non-extension classes without
# considering base class chains
for typ in visit_first:
ir = class_map[typ]
ir.is_ext_class = is_extension_class(typ.defn)
if not ir.is_ext_class:
visit_second.append(typ)
# FIXME: Just reject these?
# Second pass to propagate non-extension markings up the base class
# chains of classes marked as non-extension classes during the first pass.
for typ in visit_second:
todo = [typ]
while todo:
child = todo.pop()
for parent in child.bases:
if parent.type in class_map:
parent_ir = class_map[parent.type]
if not parent_ir.is_ext_class:
continue
parent_ir.is_ext_class = False
todo.append(parent.type)
def get_func_def(op: Union[FuncDef, Decorator, OverloadedFuncDef]) -> FuncDef:
if isinstance(op, OverloadedFuncDef):
assert op.impl
op = op.impl
if isinstance(op, Decorator):
op = op.func
return op
def specialize_parent_vtable(cls: ClassIR, parent: ClassIR) -> VTableEntries:
"""Generate the part of a vtable corresponding to a parent class or trait"""
updated = []
for entry in parent.vtable_entries:
if isinstance(entry, VTableMethod):
# Find the original method corresponding to this vtable entry.
# (This may not be the method in the entry, if it was overridden.)
orig_parent_method = entry.cls.get_method(entry.name)
assert orig_parent_method
method_cls = cls.get_method_and_class(entry.name)
if method_cls:
child_method, defining_cls = method_cls
# TODO: emit a wrapper for __init__ that raises or something
if (is_same_method_signature(orig_parent_method.sig, child_method.sig)
or orig_parent_method.name == '__init__'):
entry = VTableMethod(entry.cls, entry.name, child_method)
else:
entry = VTableMethod(entry.cls, entry.name,
defining_cls.glue_methods[(entry.cls, entry.name)])
else:
# If it is an attribute from a trait, we need to find out
# the real class it got mixed in at and point to that.
if parent.is_trait:
_, origin_cls = cls.attr_details(entry.name)
entry = VTableAttr(origin_cls, entry.name, entry.is_setter)
updated.append(entry)
return updated
def compute_vtable(cls: ClassIR) -> None:
"""Compute the vtable structure for a class."""
if cls.vtable is not None: return
if not cls.is_generated:
cls.has_dict = any(x.inherits_python for x in cls.mro)
for t in cls.mro[1:]:
# Make sure all ancestors are processed first
compute_vtable(t)
# Merge attributes from traits into the class
if not t.is_trait:
continue
for name, typ in t.attributes.items():
if not cls.is_trait and not any(name in b.attributes for b in cls.base_mro):
cls.attributes[name] = typ
cls.vtable = {}
if cls.base:
assert cls.base.vtable is not None
cls.vtable.update(cls.base.vtable)
cls.vtable_entries = specialize_parent_vtable(cls, cls.base)
# Include the vtable from the parent classes, but handle method overrides.
entries = cls.vtable_entries
# Traits need to have attributes in the vtable, since the
# attributes can be at different places in different classes, but
# regular classes can just directly get them.
if cls.is_trait:
# Traits also need to pull in vtable entries for non-trait
# parent classes explicitly.
for t in cls.mro:
for attr in t.attributes:
if attr in cls.vtable:
continue
cls.vtable[attr] = len(entries)
entries.append(VTableAttr(t, attr, is_setter=False))
entries.append(VTableAttr(t, attr, is_setter=True))
all_traits = [t for t in cls.mro if t.is_trait]
for t in [cls] + cls.traits:
for fn in itertools.chain(t.methods.values()):
# TODO: don't generate a new entry when we overload without changing the type
if fn == cls.get_method(fn.name):
cls.vtable[fn.name] = len(entries)
entries.append(VTableMethod(t, fn.name, fn))
# Compute vtables for all of the traits that the class implements
if not cls.is_trait:
for trait in all_traits:
compute_vtable(trait)
cls.trait_vtables[trait] = specialize_parent_vtable(cls, trait)
class Mapper:
"""Keep track of mappings from mypy concepts to IR concepts.
This state is shared across all modules being compiled in all
compilation groups.
"""
def __init__(self, group_map: Dict[str, Optional[str]]) -> None:
self.group_map = group_map
self.type_to_ir = {} # type: Dict[TypeInfo, ClassIR]
self.func_to_decl = {} # type: Dict[SymbolNode, FuncDecl]
# LiteralsMap maps literal values to a static name. Each
# compilation group has its own LiteralsMap. (Since they can't
# share literals.)
self.literals = {
v: OrderedDict() for v in group_map.values()
} # type: Dict[Optional[str], LiteralsMap]
def type_to_rtype(self, typ: Optional[Type]) -> RType:
if typ is None:
return object_rprimitive
typ = get_proper_type(typ)
if isinstance(typ, Instance):
if typ.type.fullname() == 'builtins.int':
return int_rprimitive
elif typ.type.fullname() == 'builtins.float':
return float_rprimitive
elif typ.type.fullname() == 'builtins.str':
return str_rprimitive
elif typ.type.fullname() == 'builtins.bool':
return bool_rprimitive
elif typ.type.fullname() == 'builtins.list':
return list_rprimitive
# Dict subclasses are at least somewhat common and we
# specifically support them, so make sure that dict operations
# get optimized on them.
elif any(cls.fullname() == 'builtins.dict' for cls in typ.type.mro):
return dict_rprimitive
elif typ.type.fullname() == 'builtins.set':
return set_rprimitive
elif typ.type.fullname() == 'builtins.tuple':
return tuple_rprimitive # Varying-length tuple
elif typ.type in self.type_to_ir:
return RInstance(self.type_to_ir[typ.type])
else:
return object_rprimitive
elif isinstance(typ, TupleType):
# Use our unboxed tuples for raw tuples but fall back to
# being boxed for NamedTuple.
if typ.partial_fallback.type.fullname() == 'builtins.tuple':
return RTuple([self.type_to_rtype(t) for t in typ.items])
else:
return tuple_rprimitive
elif isinstance(typ, CallableType):
return object_rprimitive
elif isinstance(typ, NoneTyp):
return none_rprimitive
elif isinstance(typ, UnionType):
return RUnion([self.type_to_rtype(item)
for item in typ.items])
elif isinstance(typ, AnyType):
return object_rprimitive
elif isinstance(typ, TypeType):
return object_rprimitive
elif isinstance(typ, TypeVarType):
# Erase type variable to upper bound.
# TODO: Erase to union if object has value restriction?
return self.type_to_rtype(typ.upper_bound)
elif isinstance(typ, PartialType):
assert typ.var.type is not None
return self.type_to_rtype(typ.var.type)
elif isinstance(typ, Overloaded):
return object_rprimitive
elif isinstance(typ, TypedDictType):
return dict_rprimitive
elif isinstance(typ, LiteralType):
return self.type_to_rtype(typ.fallback)
elif isinstance(typ, (UninhabitedType, UnboundType)):
# Sure, whatever!
return object_rprimitive
# I think we've covered everything that is supposed to
# actually show up, so anything else is a bug somewhere.
assert False, 'unexpected type %s' % type(typ)
def get_arg_rtype(self, typ: Type, kind: int) -> RType:
if kind == ARG_STAR:
return tuple_rprimitive
elif kind == ARG_STAR2:
return dict_rprimitive
else:
return self.type_to_rtype(typ)
def fdef_to_sig(self, fdef: FuncDef) -> FuncSignature:
if isinstance(fdef.type, CallableType):
arg_types = [self.get_arg_rtype(typ, kind)
for typ, kind in zip(fdef.type.arg_types, fdef.type.arg_kinds)]
ret = self.type_to_rtype(fdef.type.ret_type)
else:
# Handle unannotated functions
arg_types = [object_rprimitive for arg in fdef.arguments]
ret = object_rprimitive
args = [RuntimeArg(arg.variable.name(), arg_type, arg.kind)
for arg, arg_type in zip(fdef.arguments, arg_types)]
# We force certain dunder methods to return objects to support letting them
# return NotImplemented. It also avoids some pointless boxing and unboxing,
# since tp_richcompare needs an object anyways.
if fdef.name() in ('__eq__', '__ne__', '__lt__', '__gt__', '__le__', '__ge__'):
ret = object_rprimitive
return FuncSignature(args, ret)
def literal_static_name(self, module: str,
value: Union[int, float, complex, str, bytes]) -> str:
# Literals are shared between modules in a compilation group
# but not outside the group.
literals = self.literals[self.group_map.get(module)]
# Include type to distinguish between 1 and 1.0, and so on.
key = (type(value), value)
if key not in literals:
if isinstance(value, str):
prefix = 'unicode_'
else:
prefix = type(value).__name__ + '_'
literals[key] = prefix + str(len(literals))
return literals[key]
def prepare_func_def(module_name: str, class_name: Optional[str],
fdef: FuncDef, mapper: Mapper) -> FuncDecl:
kind = FUNC_STATICMETHOD if fdef.is_static else (
FUNC_CLASSMETHOD if fdef.is_class else FUNC_NORMAL)
decl = FuncDecl(fdef.name(), class_name, module_name, mapper.fdef_to_sig(fdef), kind)
mapper.func_to_decl[fdef] = decl
return decl
def prepare_method_def(ir: ClassIR, module_name: str, cdef: ClassDef, mapper: Mapper,
node: Union[FuncDef, Decorator]) -> None:
if isinstance(node, FuncDef):
ir.method_decls[node.name()] = prepare_func_def(module_name, cdef.name, node, mapper)
elif isinstance(node, Decorator):
# TODO: do something about abstract methods here. Currently, they are handled just like
# normal methods.
decl = prepare_func_def(module_name, cdef.name, node.func, mapper)
if not node.decorators:
ir.method_decls[node.name()] = decl
elif isinstance(node.decorators[0], MemberExpr) and node.decorators[0].name == 'setter':
# Make property setter name different than getter name so there are no
# name clashes when generating C code, and property lookup at the IR level
# works correctly.
decl.name = PROPSET_PREFIX + decl.name
decl.is_prop_setter = True
ir.method_decls[PROPSET_PREFIX + node.name()] = decl
if node.func.is_property:
assert node.func.type
decl.is_prop_getter = True
ir.property_types[node.name()] = decl.sig.ret_type
def is_valid_multipart_property_def(prop: OverloadedFuncDef) -> bool:
# Checks to ensure supported property decorator semantics
if len(prop.items) == 2:
getter = prop.items[0]
setter = prop.items[1]
if isinstance(getter, Decorator) and isinstance(setter, Decorator):
if getter.func.is_property and len(setter.decorators) == 1:
if isinstance(setter.decorators[0], MemberExpr):
if setter.decorators[0].name == "setter":
return True
return False
def can_subclass_builtin(builtin_base: str) -> bool:
# BaseException and dict are special cased.
return builtin_base in (
('builtins.Exception', 'builtins.LookupError', 'builtins.IndexError',
'builtins.Warning', 'builtins.UserWarning', 'builtins.ValueError',
'builtins.object', ))
def prepare_class_def(path: str, module_name: str, cdef: ClassDef,
errors: Errors, mapper: Mapper) -> None:
ir = mapper.type_to_ir[cdef.info]
info = cdef.info
# We sort the table for determinism here on Python 3.5
for name, node in sorted(info.names.items()):
# Currenly all plugin generated methods are dummies and not included.
if node.plugin_generated:
continue
if isinstance(node.node, Var):
assert node.node.type, "Class member %s missing type" % name
if not node.node.is_classvar and name != '__slots__':
ir.attributes[name] = mapper.type_to_rtype(node.node.type)
elif isinstance(node.node, (FuncDef, Decorator)):
prepare_method_def(ir, module_name, cdef, mapper, node.node)
elif isinstance(node.node, OverloadedFuncDef):
# Handle case for property with both a getter and a setter
if node.node.is_property:
if is_valid_multipart_property_def(node.node):
for item in node.node.items:
prepare_method_def(ir, module_name, cdef, mapper, item)
else:
errors.error("Unsupported property decorator semantics", path, cdef.line)
# Handle case for regular function overload
else:
assert node.node.impl
prepare_method_def(ir, module_name, cdef, mapper, node.node.impl)
# Check for subclassing from builtin types
for cls in info.mro:
# Special case exceptions and dicts
# XXX: How do we handle *other* things??
if cls.fullname() == 'builtins.BaseException':
ir.builtin_base = 'PyBaseExceptionObject'
elif cls.fullname() == 'builtins.dict':
ir.builtin_base = 'PyDictObject'
elif cls.fullname().startswith('builtins.'):
if not can_subclass_builtin(cls.fullname()):
# Note that if we try to subclass a C extension class that
# isn't in builtins, bad things will happen and we won't
# catch it here! But this should catch a lot of the most
# common pitfalls.
errors.error("Inheriting from most builtin types is unimplemented",
path, cdef.line)
if ir.builtin_base:
ir.attributes.clear()
# Set up a constructor decl
init_node = cdef.info['__init__'].node
if not ir.is_trait and not ir.builtin_base and isinstance(init_node, FuncDef):
init_sig = mapper.fdef_to_sig(init_node)
defining_ir = mapper.type_to_ir.get(init_node.info)
# If there is a nontrivial __init__ that wasn't defined in an
# extension class, we need to make the constructor take *args,
# **kwargs so it can call tp_init.
if ((defining_ir is None or not defining_ir.is_ext_class
or cdef.info['__init__'].plugin_generated)
and init_node.info.fullname() != 'builtins.object'):
init_sig = FuncSignature(
[init_sig.args[0],
RuntimeArg("args", tuple_rprimitive, ARG_STAR),
RuntimeArg("kwargs", dict_rprimitive, ARG_STAR2)],
init_sig.ret_type)
ctor_sig = FuncSignature(init_sig.args[1:], RInstance(ir))
ir.ctor = FuncDecl(cdef.name, None, module_name, ctor_sig)
mapper.func_to_decl[cdef.info] = ir.ctor
# Set up the parent class
bases = [mapper.type_to_ir[base.type] for base in info.bases
if base.type in mapper.type_to_ir]
if not all(c.is_trait for c in bases[1:]):
errors.error("Non-trait bases must appear first in parent list", path, cdef.line)
ir.traits = [c for c in bases if c.is_trait]
mro = []
base_mro = []
for cls in info.mro:
if cls not in mapper.type_to_ir:
if cls.fullname() != 'builtins.object':
ir.inherits_python = True
continue
base_ir = mapper.type_to_ir[cls]
if not base_ir.is_trait:
base_mro.append(base_ir)
mro.append(base_ir)
if cls.defn.removed_base_type_exprs or not base_ir.is_ext_class:
ir.inherits_python = True
base_idx = 1 if not ir.is_trait else 0
if len(base_mro) > base_idx:
ir.base = base_mro[base_idx]
if not all(base_mro[i].base == base_mro[i + 1] for i in range(len(base_mro) - 1)):
errors.error("Non-trait MRO must be linear", path, cdef.line)
ir.mro = mro
ir.base_mro = base_mro
for base in bases:
if base.children is not None:
base.children.append(ir)
if is_dataclass(cdef):
ir.is_augmented = True
def prepare_non_ext_class_def(path: str, module_name: str, cdef: ClassDef,
errors: Errors, mapper: Mapper) -> None:
ir = mapper.type_to_ir[cdef.info]
info = cdef.info
for name, node in info.names.items():
if isinstance(node.node, (FuncDef, Decorator)):
prepare_method_def(ir, module_name, cdef, mapper, node.node)
elif isinstance(node.node, OverloadedFuncDef):
# Handle case for property with both a getter and a setter
if node.node.is_property:
errors.error("Non-extension classes do not support property setters",
path, cdef.line)
# Handle case for regular function overload
else:
errors.error("Non-extension classes do not support overlaoded functions",
path, cdef.line)
def concrete_arg_kind(kind: int) -> int:
"""Find the concrete version of an arg kind that is being passed."""
if kind == ARG_OPT:
return ARG_POS
elif kind == ARG_NAMED_OPT:
return ARG_NAMED
else:
return kind
class FuncInfo(object):
"""Contains information about functions as they are generated."""
def __init__(self,
fitem: FuncItem = INVALID_FUNC_DEF,
name: str = '',
class_name: Optional[str] = None,
namespace: str = '',
is_nested: bool = False,
contains_nested: bool = False,
is_decorated: bool = False,
in_non_ext: bool = False) -> None:
self.fitem = fitem
self.name = name if not is_decorated else decorator_helper_name(name)
self.class_name = class_name
self.ns = namespace
# Callable classes implement the '__call__' method, and are used to represent functions
# that are nested inside of other functions.
self._callable_class = None # type: Optional[ImplicitClass]
# Environment classes are ClassIR instances that contain attributes representing the
# variables in the environment of the function they correspond to. Environment classes are
# generated for functions that contain nested functions.
self._env_class = None # type: Optional[ClassIR]
# Generator classes implement the '__next__' method, and are used to represent generators
# returned by generator functions.
self._generator_class = None # type: Optional[GeneratorClass]
# Environment class registers are the local registers associated with instances of an
# environment class, used for getting and setting attributes. curr_env_reg is the register
# associated with the current environment.
self._curr_env_reg = None # type: Optional[Value]
# These are flags denoting whether a given function is nested, contains a nested function,
# is decorated, or is within a non-extension class.
self.is_nested = is_nested
self.contains_nested = contains_nested
self.is_decorated = is_decorated
self.in_non_ext = in_non_ext
# TODO: add field for ret_type: RType = none_rprimitive
def namespaced_name(self) -> str:
return '_'.join(x for x in [self.name, self.class_name, self.ns] if x)
@property
def is_generator(self) -> bool:
return self.fitem.is_generator or self.fitem.is_coroutine
@property
def callable_class(self) -> 'ImplicitClass':
assert self._callable_class is not None
return self._callable_class
@callable_class.setter
def callable_class(self, cls: 'ImplicitClass') -> None:
self._callable_class = cls
@property
def env_class(self) -> ClassIR:
assert self._env_class is not None
return self._env_class
@env_class.setter
def env_class(self, ir: ClassIR) -> None:
self._env_class = ir
@property
def generator_class(self) -> 'GeneratorClass':
assert self._generator_class is not None
return self._generator_class
@generator_class.setter
def generator_class(self, cls: 'GeneratorClass') -> None:
self._generator_class = cls
@property
def curr_env_reg(self) -> Value:
assert self._curr_env_reg is not None
return self._curr_env_reg
class ImplicitClass(object):
"""Contains information regarding classes that are generated as a result of nested functions or
generated functions, but not explicitly defined in the source code.
"""
def __init__(self, ir: ClassIR) -> None:
# The ClassIR instance associated with this class.
self.ir = ir
# The register associated with the 'self' instance for this generator class.
self._self_reg = None # type: Optional[Value]
# Environment class registers are the local registers associated with instances of an
# environment class, used for getting and setting attributes. curr_env_reg is the register
# associated with the current environment. prev_env_reg is the self.__mypyc_env__ field
# associated with the previous environment.
self._curr_env_reg = None # type: Optional[Value]
self._prev_env_reg = None # type: Optional[Value]
@property
def self_reg(self) -> Value:
assert self._self_reg is not None
return self._self_reg
@self_reg.setter
def self_reg(self, reg: Value) -> None:
self._self_reg = reg
@property
def curr_env_reg(self) -> Value:
assert self._curr_env_reg is not None
return self._curr_env_reg
@curr_env_reg.setter
def curr_env_reg(self, reg: Value) -> None:
self._curr_env_reg = reg
@property
def prev_env_reg(self) -> Value:
assert self._prev_env_reg is not None
return self._prev_env_reg
@prev_env_reg.setter
def prev_env_reg(self, reg: Value) -> None:
self._prev_env_reg = reg
class GeneratorClass(ImplicitClass):
def __init__(self, ir: ClassIR) -> None:
super().__init__(ir)
# This register holds the label number that the '__next__' function should go to the next
# time it is called.
self._next_label_reg = None # type: Optional[Value]
self._next_label_target = None # type: Optional[AssignmentTarget]
# These registers hold the error values for the generator object for the case that the
# 'throw' function is called.
self.exc_regs = None # type: Optional[Tuple[Value, Value, Value]]
# Holds the arg passed to send
self.send_arg_reg = None # type: Optional[Value]
# The switch block is used to decide which instruction to go using the value held in the
# next-label register.
self.switch_block = BasicBlock()
self.blocks = [] # type: List[BasicBlock]
@property
def next_label_reg(self) -> Value:
assert self._next_label_reg is not None
return self._next_label_reg
@next_label_reg.setter
def next_label_reg(self, reg: Value) -> None:
self._next_label_reg = reg
@property
def next_label_target(self) -> AssignmentTarget:
assert self._next_label_target is not None
return self._next_label_target
@next_label_target.setter
def next_label_target(self, target: AssignmentTarget) -> None:
self._next_label_target = target
# Infrastructure for special casing calls to builtin functions in a
# programmatic way. Most special cases should be handled using the
# data driven "primitive ops" system, but certain operations require
# special handling that has access to the AST/IR directly and can make
# decisions/optimizations based on it.
#
# For example, we use specializers to statically emit the length of a
# fixed length tuple and to emit optimized code for any/all calls with
# generator comprehensions as the argument.
#
# Specalizers are attempted before compiling the arguments to the
# function. Specializers can return None to indicate that they failed
# and the call should be compiled normally. Otherwise they should emit
# code for the call and return a value containing the result.
#
# Specializers take three arguments: the IRBuilder, the CallExpr being
# compiled, and the RefExpr that is the left hand side of the call.
#
# Specializers can operate on methods as well, and are keyed on the
# name and RType in that case.
Specializer = Callable[['IRBuilder', CallExpr, RefExpr], Optional[Value]]
specializers = {} # type: Dict[Tuple[str, Optional[RType]], Specializer]
def specialize_function(
name: str, typ: Optional[RType] = None) -> Callable[[Specializer], Specializer]:
"""Decorator to register a function as being a specializer."""
def wrapper(f: Specializer) -> Specializer:
specializers[name, typ] = f
return f
return wrapper
class NonlocalControl:
"""Represents a stack frame of constructs that modify nonlocal control flow.
The nonlocal control flow constructs are break, continue, and
return, and their behavior is modified by a number of other
constructs. The most obvious is loop, which override where break
and continue jump to, but also `except` (which needs to clear
exc_info when left) and (eventually) finally blocks (which need to
ensure that the finally block is always executed when leaving the
try/except blocks).
"""
@abstractmethod
def gen_break(self, builder: 'IRBuilder', line: int) -> None: pass
@abstractmethod
def gen_continue(self, builder: 'IRBuilder', line: int) -> None: pass
@abstractmethod
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None: pass
class BaseNonlocalControl(NonlocalControl):
def gen_break(self, builder: 'IRBuilder', line: int) -> None:
assert False, "break outside of loop"
def gen_continue(self, builder: 'IRBuilder', line: int) -> None:
assert False, "continue outside of loop"
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None:
builder.add(Return(value))
class LoopNonlocalControl(NonlocalControl):
def __init__(self, outer: NonlocalControl,
continue_block: BasicBlock, break_block: BasicBlock) -> None:
self.outer = outer
self.continue_block = continue_block
self.break_block = break_block
def gen_break(self, builder: 'IRBuilder', line: int) -> None:
builder.add(Goto(self.break_block))
def gen_continue(self, builder: 'IRBuilder', line: int) -> None:
builder.add(Goto(self.continue_block))
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None:
self.outer.gen_return(builder, value, line)
class GeneratorNonlocalControl(BaseNonlocalControl):
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None:
# Assign an invalid next label number so that the next time __next__ is called, we jump to
# the case in which StopIteration is raised.
builder.assign(builder.fn_info.generator_class.next_label_target,
builder.add(LoadInt(-1)),
line)
# Raise a StopIteration containing a field for the value that should be returned. Before
# doing so, create a new block without an error handler set so that the implicitly thrown
# StopIteration isn't caught by except blocks inside of the generator function.
builder.error_handlers.append(None)
builder.goto_new_block()
# Skip creating a traceback frame when we raise here, because
# we don't care about the traceback frame and it is kind of
# expensive since raising StopIteration is an extremely common case.
# Also we call a special internal function to set StopIteration instead of
# using RaiseStandardError because the obvious thing doesn't work if the
# value is a tuple (???).
builder.primitive_op(set_stop_iteration_value, [value], NO_TRACEBACK_LINE_NO)
builder.add(Unreachable())
builder.error_handlers.pop()
class CleanupNonlocalControl(NonlocalControl):
"""Abstract nonlocal control that runs some cleanup code. """
def __init__(self, outer: NonlocalControl) -> None:
self.outer = outer
@abstractmethod
def gen_cleanup(self, builder: 'IRBuilder', line: int) -> None: ...
def gen_break(self, builder: 'IRBuilder', line: int) -> None:
self.gen_cleanup(builder, line)
self.outer.gen_break(builder, line)
def gen_continue(self, builder: 'IRBuilder', line: int) -> None:
self.gen_cleanup(builder, line)
self.outer.gen_continue(builder, line)
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None:
self.gen_cleanup(builder, line)
self.outer.gen_return(builder, value, line)
class TryFinallyNonlocalControl(NonlocalControl):
def __init__(self, target: BasicBlock) -> None:
self.target = target
self.ret_reg = None # type: Optional[Register]
def gen_break(self, builder: 'IRBuilder', line: int) -> None:
builder.error("break inside try/finally block is unimplemented", line)
def gen_continue(self, builder: 'IRBuilder', line: int) -> None:
builder.error("continue inside try/finally block is unimplemented", line)
def gen_return(self, builder: 'IRBuilder', value: Value, line: int) -> None:
if self.ret_reg is None:
self.ret_reg = builder.alloc_temp(builder.ret_types[-1])
builder.add(Assign(self.ret_reg, value))
builder.add(Goto(self.target))
class ExceptNonlocalControl(CleanupNonlocalControl):
"""Nonlocal control for except blocks.
Just makes sure that sys.exc_info always gets restored when we leave.
This is super annoying.
"""
def __init__(self, outer: NonlocalControl, saved: Union[Value, AssignmentTarget]) -> None:
super().__init__(outer)
self.saved = saved
def gen_cleanup(self, builder: 'IRBuilder', line: int) -> None:
builder.primitive_op(restore_exc_info_op, [builder.read(self.saved)], line)
class FinallyNonlocalControl(CleanupNonlocalControl):
"""Nonlocal control for finally blocks.
Just makes sure that sys.exc_info always gets restored when we
leave and the return register is decrefed if it isn't null.
"""
def __init__(self, outer: NonlocalControl, ret_reg: Optional[Value], saved: Value) -> None:
super().__init__(outer)
self.ret_reg = ret_reg
self.saved = saved
def gen_cleanup(self, builder: 'IRBuilder', line: int) -> None:
# Do an error branch on the return value register, which
# may be undefined. This will allow it to be properly
# decrefed if it is not null. This is kind of a hack.
if self.ret_reg:
target = BasicBlock()
builder.add(Branch(self.ret_reg, target, target, Branch.IS_ERROR))
builder.activate_block(target)
# Restore the old exc_info
target, cleanup = BasicBlock(), BasicBlock()
builder.add(Branch(self.saved, target, cleanup, Branch.IS_ERROR))
builder.activate_block(cleanup)
builder.primitive_op(restore_exc_info_op, [self.saved], line)
builder.goto_and_activate(target)
class IRBuilder(ExpressionVisitor[Value], StatementVisitor[None]):
def __init__(self,
current_module: str,
types: Dict[Expression, Type],
graph: Graph,
errors: Errors,
mapper: Mapper,
modules: List[str],
pbv: PreBuildVisitor,
options: CompilerOptions) -> None:
self.current_module = current_module
self.types = types
self.graph = graph
self.environment = Environment()
self.environments = [self.environment]
self.ret_types = [] # type: List[RType]
self.blocks = [] # type: List[List[BasicBlock]]
self.functions = [] # type: List[FuncIR]
self.classes = [] # type: List[ClassIR]
self.final_names = [] # type: List[Tuple[str, RType]]
self.modules = set(modules)
self.callable_class_names = set() # type: Set[str]
self.options = options
# These variables keep track of the number of lambdas, implicit indices, and implicit
# iterators instantiated so we avoid name conflicts. The indices and iterators are
# instantiated from for-loops.
self.lambda_counter = 0
self.temp_counter = 0
# These variables are populated from the first-pass PreBuildVisitor.
self.free_variables = pbv.free_variables
self.prop_setters = pbv.prop_setters
self.encapsulating_funcs = pbv.encapsulating_funcs
self.nested_fitems = pbv.nested_funcs.keys()
self.fdefs_to_decorators = pbv.funcs_to_decorators
# This list operates similarly to a function call stack for nested functions. Whenever a
# function definition begins to be generated, a FuncInfo instance is added to the stack,
# and information about that function (e.g. whether it is nested, its environment class to
# be generated) is stored in that FuncInfo instance. When the function is done being
# generated, its corresponding FuncInfo is popped off the stack.
self.fn_info = FuncInfo(INVALID_FUNC_DEF, '', '')
self.fn_infos = [self.fn_info] # type: List[FuncInfo]
# This list operates as a stack of constructs that modify the
# behavior of nonlocal control flow constructs.
self.nonlocal_control = [] # type: List[NonlocalControl]
# Stack of except handler entry blocks
self.error_handlers = [None] # type: List[Optional[BasicBlock]]
self.errors = errors
self.mapper = mapper
# Notionally a list of all of the modules imported by the
# module being compiled, but stored as an OrderedDict so we
# can also do quick lookups.
self.imports = OrderedDict() # type: OrderedDict[str, None]
def visit_mypy_file(self, mypyfile: MypyFile) -> None:
if mypyfile.fullname() in ('typing', 'abc'):
# These module are special; their contents are currently all
# built-in primitives.
return
self.module_path = mypyfile.path
self.module_name = mypyfile.fullname()
classes = [node for node in mypyfile.defs if isinstance(node, ClassDef)]
# Collect all classes.
for cls in classes:
ir = self.mapper.type_to_ir[cls.info]
self.classes.append(ir)
self.enter(FuncInfo(name='<top level>'))
# Make sure we have a builtins import
self.gen_import('builtins', -1)
# Generate ops.
for node in mypyfile.defs:
self.accept(node)
self.maybe_add_implicit_return()
# Generate special function representing module top level.
blocks, env, ret_type, _ = self.leave()
sig = FuncSignature([], none_rprimitive)
func_ir = FuncIR(FuncDecl(TOP_LEVEL_NAME, None, self.module_name, sig), blocks, env,
traceback_name="<module>")
self.functions.append(func_ir)
def handle_ext_method(self, cdef: ClassDef, fdef: FuncDef) -> None:
# Perform the function of visit_method for methods inside extension classes.
name = fdef.name()
class_ir = self.mapper.type_to_ir[cdef.info]
func_ir, func_reg = self.gen_func_item(fdef, name, self.mapper.fdef_to_sig(fdef), cdef)
self.functions.append(func_ir)
if self.is_decorated(fdef):
# Obtain the the function name in order to construct the name of the helper function.
_, _, name = fdef.fullname().rpartition('.')
helper_name = decorator_helper_name(name)
# Read the PyTypeObject representing the class, get the callable object
# representing the non-decorated method
typ = self.load_native_type_object(cdef.fullname)
orig_func = self.py_get_attr(typ, helper_name, fdef.line)
# Decorate the non-decorated method
decorated_func = self.load_decorated_func(fdef, orig_func)
# Set the callable object representing the decorated method as an attribute of the
# extension class.
self.primitive_op(py_setattr_op,
[typ, self.load_static_unicode(name), decorated_func], fdef.line)
if fdef.is_property:
# If there is a property setter, it will be processed after the getter,
# We populate the optional setter field with none for now.
assert name not in class_ir.properties
class_ir.properties[name] = (func_ir, None)
elif fdef in self.prop_setters:
# The respective property getter must have been processed already
assert name in class_ir.properties
getter_ir, _ = class_ir.properties[name]
class_ir.properties[name] = (getter_ir, func_ir)
class_ir.methods[func_ir.decl.name] = func_ir
# If this overrides a parent class method with a different type, we need
# to generate a glue method to mediate between them.
for cls in class_ir.mro[1:]:
if (name in cls.method_decls and name != '__init__'
and not is_same_method_signature(class_ir.method_decls[name].sig,
cls.method_decls[name].sig)):
# TODO: Support contravariant subtyping in the input argument for
# property setters. Need to make a special glue method for handling this,
# similar to gen_glue_property.
if fdef.is_property:
f = self.gen_glue_property(cls.method_decls[name].sig, func_ir, class_ir,
cls, fdef.line)
else:
f = self.gen_glue_method(cls.method_decls[name].sig, func_ir, class_ir,
cls, fdef.line)
class_ir.glue_methods[(cls, name)] = f
self.functions.append(f)
def handle_non_ext_method(
self, non_ext: NonExtClassInfo, cdef: ClassDef, fdef: FuncDef) -> None:
# Perform the function of visit_method for methods inside non-extension classes.
name = fdef.name()
func_ir, func_reg = self.gen_func_item(fdef, name, self.mapper.fdef_to_sig(fdef), cdef)
assert func_reg is not None
self.functions.append(func_ir)
if self.is_decorated(fdef):
# The undecorated method is a generated callable class
orig_func = func_reg
func_reg = self.load_decorated_func(fdef, orig_func)
# TODO: Support property setters in non-extension classes
if fdef.is_property:
prop = self.load_module_attr_by_fullname('builtins.property', fdef.line)
func_reg = self.py_call(prop, [func_reg], fdef.line)
elif self.mapper.func_to_decl[fdef].kind == FUNC_CLASSMETHOD:
cls_meth = self.load_module_attr_by_fullname('builtins.classmethod', fdef.line)
func_reg = self.py_call(cls_meth, [func_reg], fdef.line)
elif self.mapper.func_to_decl[fdef].kind == FUNC_STATICMETHOD:
stat_meth = self.load_module_attr_by_fullname('builtins.staticmethod', fdef.line)
func_reg = self.py_call(stat_meth, [func_reg], fdef.line)
self.add_to_non_ext_dict(non_ext, name, func_reg, fdef.line)
def visit_method(
self, cdef: ClassDef, non_ext: Optional[NonExtClassInfo], fdef: FuncDef) -> None:
if non_ext:
self.handle_non_ext_method(non_ext, cdef, fdef)
else:
self.handle_ext_method(cdef, fdef)
def is_constant(self, e: Expression) -> bool:
"""Check whether we allow an expression to appear as a default value.
We don't currently properly support storing the evaluated
values for default arguments and default attribute values, so
we restrict what expressions we allow. We allow literals of
primitives types, None, and references to Final global
variables.
"""
return (isinstance(e, (StrExpr, BytesExpr, IntExpr, FloatExpr))
or (isinstance(e, UnaryExpr) and e.op == '-'
and isinstance(e.expr, (IntExpr, FloatExpr)))
or (isinstance(e, TupleExpr)
and all(self.is_constant(e) for e in e.items))
or (isinstance(e, RefExpr) and e.kind == GDEF
and (e.fullname in ('builtins.True', 'builtins.False', 'builtins.None')
or (isinstance(e.node, Var) and e.node.is_final))))
def generate_attr_defaults(self, cdef: ClassDef) -> None:
"""Generate an initialization method for default attr values (from class vars)"""
cls = self.mapper.type_to_ir[cdef.info]
if cls.builtin_base:
return
# Pull out all assignments in classes in the mro so we can initialize them
# TODO: Support nested statements
default_assignments = []
for info in reversed(cdef.info.mro):
if info not in self.mapper.type_to_ir:
continue
for stmt in info.defn.defs.body:
if (isinstance(stmt, AssignmentStmt)
and isinstance(stmt.lvalues[0], NameExpr)
and not is_class_var(stmt.lvalues[0])
and not isinstance(stmt.rvalue, TempNode)):
if stmt.lvalues[0].name == '__slots__':
continue
# Skip type annotated assignments in dataclasses
if is_dataclass(cdef) and stmt.type:
continue
default_assignments.append(stmt)
if not default_assignments:
return
self.enter(FuncInfo())
self.ret_types[-1] = bool_rprimitive
rt_args = (RuntimeArg(SELF_NAME, RInstance(cls)),)
self_var = self.read(self.add_self_to_env(cls), -1)
for stmt in default_assignments:
lvalue = stmt.lvalues[0]
assert isinstance(lvalue, NameExpr)
if not stmt.is_final_def and not self.is_constant(stmt.rvalue):
self.warning('Unsupported default attribute value', stmt.rvalue.line)
# If the attribute is initialized to None and type isn't optional,
# don't initialize it to anything.
attr_type = cls.attr_type(lvalue.name)
if isinstance(stmt.rvalue, RefExpr) and stmt.rvalue.fullname == 'builtins.None':
if (not is_optional_type(attr_type) and not is_object_rprimitive(attr_type)
and not is_none_rprimitive(attr_type)):
continue
val = self.coerce(self.accept(stmt.rvalue), attr_type, stmt.line)
self.add(SetAttr(self_var, lvalue.name, val, -1))
self.add(Return(self.primitive_op(true_op, [], -1)))
blocks, env, ret_type, _ = self.leave()
ir = FuncIR(
FuncDecl('__mypyc_defaults_setup',
cls.name, self.module_name,
FuncSignature(rt_args, ret_type)),
blocks, env)
self.functions.append(ir)
cls.methods[ir.name] = ir
def finish_non_ext_dict(self, non_ext: NonExtClassInfo, line: int) -> None:
# Add __annotations__ to the class dict.
self.primitive_op(dict_set_item_op,
[non_ext.dict, self.load_static_unicode('__annotations__'),
non_ext.anns], -1)
# We add a __doc__ attribute so if the non-extension class is decorated with the
# dataclass decorator, dataclass will not try to look for __text_signature__.
# https://github.com/python/cpython/blob/3.7/Lib/dataclasses.py#L957
filler_doc_str = 'mypyc filler docstring'
self.add_to_non_ext_dict(
non_ext, '__doc__', self.load_static_unicode(filler_doc_str), line)
self.add_to_non_ext_dict(
non_ext, '__module__', self.load_static_unicode(self.module_name), line)
def load_non_ext_class(self, ir: ClassIR, non_ext: NonExtClassInfo, line: int) -> Value:
cls_name = self.load_static_unicode(ir.name)
self.finish_non_ext_dict(non_ext, line)
metaclass = self.primitive_op(type_object_op, [], line)
metaclass = self.primitive_op(py_calc_meta_op, [metaclass, non_ext.bases], line)
class_type_obj = self.py_call(metaclass,
[cls_name, non_ext.bases, non_ext.dict],
line)
return class_type_obj
def load_decorated_class(self, cdef: ClassDef, type_obj: Value) -> Value:
"""
Given a decorated ClassDef and a register containing a non-extension representation of the
ClassDef created via the type constructor, applies the corresponding decorator functions
on that decorated ClassDef and returns a register containing the decorated ClassDef.
"""
decorators = cdef.decorators
dec_class = type_obj
for d in reversed(decorators):
decorator = d.accept(self)
assert isinstance(decorator, Value)
dec_class = self.py_call(decorator, [dec_class], dec_class.line)
return dec_class
def populate_non_ext_bases(self, cdef: ClassDef) -> Value:
"""
Populate the base-class tuple passed to the metaclass constructor
for non-extension classes.
"""
ir = self.mapper.type_to_ir[cdef.info]
bases = []
for cls in cdef.info.mro[1:]:
if cls.fullname() == 'builtins.object':
continue
# Add the current class to the base classes list of concrete subclasses
if cls in self.mapper.type_to_ir:
base_ir = self.mapper.type_to_ir[cls]
if base_ir.children is not None:
base_ir.children.append(ir)
base = self.load_global_str(cls.name(), cdef.line)
bases.append(base)
return self.primitive_op(new_tuple_op, bases, cdef.line)
def add_to_non_ext_dict(self, non_ext: NonExtClassInfo,
key: str, val: Value, line: int) -> None:
# Add an attribute entry into the class dict of a non-extension class.
key_unicode = self.load_static_unicode(key)
self.primitive_op(dict_set_item_op, [non_ext.dict, key_unicode, val], line)
def add_non_ext_class_attr(self, non_ext: NonExtClassInfo, lvalue: NameExpr,
stmt: AssignmentStmt, cdef: ClassDef,
attr_to_cache: List[Lvalue]) -> None:
"""
Add a class attribute to __annotations__ of a non-extension class. If the
attribute is assigned to a value, it is also added to __dict__.
"""
# We populate __annotations__ because dataclasses uses it to determine
# which attributes to compute on.
# TODO: Maybe generate more precise types for annotations
key = self.load_static_unicode(lvalue.name)
typ = self.primitive_op(type_object_op, [], stmt.line)
self.primitive_op(dict_set_item_op, [non_ext.anns, key, typ], stmt.line)
# Only add the attribute to the __dict__ if the assignment is of the form:
# x: type = value (don't add attributes of the form 'x: type' to the __dict__).
if not isinstance(stmt.rvalue, TempNode):
rvalue = self.accept(stmt.rvalue)
self.add_to_non_ext_dict(non_ext, lvalue.name, rvalue, stmt.line)
# We cache enum attributes to speed up enum attribute lookup since they
# are final.
if cdef.info.bases and cdef.info.bases[0].type.fullname() == 'enum.Enum':
attr_to_cache.append(lvalue)
def setup_non_ext_dict(self, cdef: ClassDef, bases: Value) -> Value:
"""
Initialize the class dictionary for a non-extension class. This class dictionary
is passed to the metaclass constructor.
"""
# Check if the metaclass defines a __prepare__ method, and if so, call it.
metaclass = self.primitive_op(type_object_op, [], cdef.line)
metaclass = self.primitive_op(py_calc_meta_op, [metaclass, bases],
cdef.line)
has_prepare = self.primitive_op(py_hasattr_op,
[metaclass,
self.load_static_unicode('__prepare__')], cdef.line)
non_ext_dict = self.alloc_temp(dict_rprimitive)
true_block, false_block, exit_block, = BasicBlock(), BasicBlock(), BasicBlock()
self.add_bool_branch(has_prepare, true_block, false_block)
self.activate_block(true_block)
cls_name = self.load_static_unicode(cdef.name)
prepare_meth = self.py_get_attr(metaclass, '__prepare__', cdef.line)
prepare_dict = self.py_call(prepare_meth, [cls_name, bases], cdef.line)
self.assign(non_ext_dict, prepare_dict, cdef.line)
self.goto(exit_block)
self.activate_block(false_block)
self.assign(non_ext_dict, self.primitive_op(new_dict_op, [], cdef.line), cdef.line)
self.goto(exit_block)
self.activate_block(exit_block)
return non_ext_dict
def cache_class_attrs(self, attrs_to_cache: List[Lvalue], cdef: ClassDef) -> None:
"""Add class attributes to be cached to the global cache"""
typ = self.load_native_type_object(cdef.fullname)
for lval in attrs_to_cache:
assert isinstance(lval, NameExpr)
rval = self.py_get_attr(typ, lval.name, cdef.line)
self.init_final_static(lval, rval, cdef.name)
def dataclass_non_ext_info(self, cdef: ClassDef) -> Optional[NonExtClassInfo]:
"""Set up a NonExtClassInfo to track dataclass attributes.
In addition to setting up a normal extension class for dataclasses,
we also collect its class attributes like a non-extension class so
that we can hand them to the dataclass decorator.
"""
if is_dataclass(cdef):
return NonExtClassInfo(
self.primitive_op(new_dict_op, [], cdef.line),
self.add(TupleSet([], cdef.line)),
self.primitive_op(new_dict_op, [], cdef.line),
)
else:
return None
def dataclass_finalize(
self, cdef: ClassDef, non_ext: NonExtClassInfo, type_obj: Value) -> None:
"""Generate code to finish instantiating a dataclass.
This works by replacing all of the attributes on the class
(which will be descriptors) with whatever they would be in a
non-extension class, calling dataclass, then switching them back.
The resulting class is an extension class and instances of it do not
have a __dict__ (unless something else requires it).
All methods written explicitly in the source are compiled and
may be called through the vtable while the methods generated
by dataclasses are interpreted and may not be.
(If we just called dataclass without doing this, it would think that all
of the descriptors for our attributes are default values and generate an
incorrect constructor. We need to do the switch so that dataclass gets the
appropriate defaults.)
"""
self.finish_non_ext_dict(non_ext, cdef.line)
dec = self.accept(next(d for d in cdef.decorators if is_dataclass_decorator(d)))
self.primitive_op(
dataclass_sleight_of_hand, [dec, type_obj, non_ext.dict, non_ext.anns], cdef.line)
def visit_class_def(self, cdef: ClassDef) -> None:
ir = self.mapper.type_to_ir[cdef.info]
# Currently, we only create non-extension classes for classes that are
# decorated or inherit from Enum. Classes decorated with @trait do not
# apply here, and are handled in a different way.
if ir.is_ext_class:
# If the class is not decorated, generate an extension class for it.
type_obj = self.allocate_class(cdef) # type: Optional[Value]
non_ext = None # type: Optional[NonExtClassInfo]
dataclass_non_ext = self.dataclass_non_ext_info(cdef)
else:
non_ext_bases = self.populate_non_ext_bases(cdef)
non_ext_dict = self.setup_non_ext_dict(cdef, non_ext_bases)
# We populate __annotations__ for non-extension classes
# because dataclasses uses it to determine which attributes to compute on.
# TODO: Maybe generate more precise types for annotations
non_ext_anns = self.primitive_op(new_dict_op, [], cdef.line)
non_ext = NonExtClassInfo(non_ext_dict, non_ext_bases, non_ext_anns)
dataclass_non_ext = None
type_obj = None
attrs_to_cache = [] # type: List[Lvalue]
for stmt in cdef.defs.body:
if isinstance(stmt, OverloadedFuncDef) and stmt.is_property:
if not ir.is_ext_class:
# properties with both getters and setters in non_extension
# classes not supported
self.error("Property setters not supported in non-exstension classes",
stmt.line)
for item in stmt.items:
with self.catch_errors(stmt.line):
self.visit_method(cdef, non_ext, get_func_def(item))
elif isinstance(stmt, (FuncDef, Decorator, OverloadedFuncDef)):
# Ignore plugin generated methods (since they have no
# bodies to compile and will need to have the bodies
# provided by some other mechanism.)
if cdef.info.names[stmt.name()].plugin_generated:
continue
with self.catch_errors(stmt.line):
self.visit_method(cdef, non_ext, get_func_def(stmt))
elif isinstance(stmt, PassStmt):
continue
elif isinstance(stmt, AssignmentStmt):
if len(stmt.lvalues) != 1:
self.error("Multiple assignment in class bodies not supported", stmt.line)
continue
lvalue = stmt.lvalues[0]
if not isinstance(lvalue, NameExpr):
self.error("Only assignment to variables is supported in class bodies",
stmt.line)
continue
# We want to collect class variables in a dictionary for both real
# non-extension classes and fake dataclass ones.
var_non_ext = non_ext or dataclass_non_ext
if var_non_ext:
self.add_non_ext_class_attr(var_non_ext, lvalue, stmt, cdef, attrs_to_cache)
if non_ext:
continue
# Variable declaration with no body
if isinstance(stmt.rvalue, TempNode):
continue
# Only treat marked class variables as class variables.
if not (is_class_var(lvalue) or stmt.is_final_def):
continue
typ = self.load_native_type_object(cdef.fullname)
value = self.accept(stmt.rvalue)
self.primitive_op(
py_setattr_op, [typ, self.load_static_unicode(lvalue.name), value], stmt.line)
if self.non_function_scope() and stmt.is_final_def:
self.init_final_static(lvalue, value, cdef.name)
elif isinstance(stmt, ExpressionStmt) and isinstance(stmt.expr, StrExpr):
# Docstring. Ignore
pass
else:
self.error("Unsupported statement in class body", stmt.line)
if not non_ext: # That is, an extension class
self.generate_attr_defaults(cdef)
self.create_ne_from_eq(cdef)
if dataclass_non_ext:
assert type_obj
self.dataclass_finalize(cdef, dataclass_non_ext, type_obj)
else:
# Dynamically create the class via the type constructor
non_ext_class = self.load_non_ext_class(ir, non_ext, cdef.line)
non_ext_class = self.load_decorated_class(cdef, non_ext_class)
# Save the decorated class
self.add(InitStatic(non_ext_class, cdef.name, self.module_name, NAMESPACE_TYPE))
# Add the non-extension class to the dict
self.primitive_op(dict_set_item_op,
[self.load_globals_dict(), self.load_static_unicode(cdef.name),
non_ext_class], cdef.line)
# Cache any cachable class attributes
self.cache_class_attrs(attrs_to_cache, cdef)
# Set this attribute back to None until the next non-extension class is visited.
self.non_ext_info = None
def create_mypyc_attrs_tuple(self, ir: ClassIR, line: int) -> Value:
attrs = [name for ancestor in ir.mro for name in ancestor.attributes]
if ir.inherits_python:
attrs.append('__dict__')
return self.primitive_op(new_tuple_op,
[self.load_static_unicode(attr) for attr in attrs],
line)
def allocate_class(self, cdef: ClassDef) -> Value:
# OK AND NOW THE FUN PART
base_exprs = cdef.base_type_exprs + cdef.removed_base_type_exprs
if base_exprs:
bases = [self.accept(x) for x in base_exprs]
tp_bases = self.primitive_op(new_tuple_op, bases, cdef.line)
else:
tp_bases = self.add(LoadErrorValue(object_rprimitive, is_borrowed=True))
modname = self.load_static_unicode(self.module_name)
template = self.add(LoadStatic(object_rprimitive, cdef.name + "_template",
self.module_name, NAMESPACE_TYPE))
# Create the class
tp = self.primitive_op(pytype_from_template_op,
[template, tp_bases, modname], cdef.line)
# Immediately fix up the trait vtables, before doing anything with the class.
ir = self.mapper.type_to_ir[cdef.info]
if not ir.is_trait and not ir.builtin_base:
self.add(Call(
FuncDecl(cdef.name + '_trait_vtable_setup',
None, self.module_name,
FuncSignature([], bool_rprimitive)), [], -1))
# Populate a '__mypyc_attrs__' field containing the list of attrs
self.primitive_op(py_setattr_op, [
tp, self.load_static_unicode('__mypyc_attrs__'),
self.create_mypyc_attrs_tuple(self.mapper.type_to_ir[cdef.info], cdef.line)],
cdef.line)
# Save the class
self.add(InitStatic(tp, cdef.name, self.module_name, NAMESPACE_TYPE))
# Add it to the dict
self.primitive_op(dict_set_item_op,
[self.load_globals_dict(), self.load_static_unicode(cdef.name),
tp], cdef.line)
return tp
def gen_import(self, id: str, line: int) -> None:
self.imports[id] = None
needs_import, out = BasicBlock(), BasicBlock()
first_load = self.load_module(id)
comparison = self.binary_op(first_load, self.none_object(), 'is not', line)
self.add_bool_branch(comparison, out, needs_import)
self.activate_block(needs_import)
value = self.primitive_op(import_op, [self.load_static_unicode(id)], line)
self.add(InitStatic(value, id, namespace=NAMESPACE_MODULE))
self.goto_and_activate(out)
def visit_import(self, node: Import) -> None:
if node.is_mypy_only:
return
globals = self.load_globals_dict()
for node_id, as_name in node.ids:
self.gen_import(node_id, node.line)
# Update the globals dict with the appropriate module:
# * For 'import foo.bar as baz' we add 'foo.bar' with the name 'baz'
# * For 'import foo.bar' we add 'foo' with the name 'foo'
# Typically we then ignore these entries and access things directly
# via the module static, but we will use the globals version for modules
# that mypy couldn't find, since it doesn't analyze module references
# from those properly.
# Miscompiling imports inside of functions, like below in import from.
if as_name:
name = as_name
base = node_id
else:
base = name = node_id.split('.')[0]
# Python 3.7 has a nice 'PyImport_GetModule' function that we can't use :(
mod_dict = self.primitive_op(get_module_dict_op, [], node.line)
obj = self.primitive_op(dict_get_item_op,
[mod_dict, self.load_static_unicode(base)], node.line)
self.translate_special_method_call(
globals, '__setitem__', [self.load_static_unicode(name), obj],
result_type=None, line=node.line)
def visit_import_from(self, node: ImportFrom) -> None:
if node.is_mypy_only:
return
module_state = self.graph[self.module_name]
if module_state.ancestors is not None and module_state.ancestors:
module_package = module_state.ancestors[0]
else:
module_package = ''
id = importlib.util.resolve_name('.' * node.relative + node.id, module_package)
self.gen_import(id, node.line)
module = self.load_module(id)
# Copy everything into our module's dict.
# Note that we miscompile import from inside of functions here,
# since that case *shouldn't* load it into the globals dict.
# This probably doesn't matter much and the code runs basically right.
globals = self.load_globals_dict()
for name, maybe_as_name in node.names:
# If one of the things we are importing is a module,
# import it as a module also.
fullname = id + '.' + name
if fullname in self.graph or fullname in module_state.suppressed:
self.gen_import(fullname, node.line)
as_name = maybe_as_name or name
obj = self.py_get_attr(module, name, node.line)
self.translate_special_method_call(
globals, '__setitem__', [self.load_static_unicode(as_name), obj],
result_type=None, line=node.line)
def visit_import_all(self, node: ImportAll) -> None:
if node.is_mypy_only:
return
self.gen_import(node.id, node.line)
def gen_glue_method(self, sig: FuncSignature, target: FuncIR,
cls: ClassIR, base: ClassIR, line: int) -> FuncIR:
"""Generate glue methods that mediate between different method types in subclasses.
For example, if we have:
class A:
def f(self, x: int) -> object: ...
then it is totally permissible to have a subclass
class B(A):
def f(self, x: object) -> int: ...
since '(object) -> int' is a subtype of '(int) -> object' by the usual
contra/co-variant function subtyping rules.
The trickiness here is that int and object have different
runtime representations in mypyc, so A.f and B.f have
different signatures at the native C level. To deal with this,
we need to generate glue methods that mediate between the
different versions by coercing the arguments and return
values.
"""
self.enter(FuncInfo())
self.ret_types[-1] = sig.ret_type
rt_args = list(sig.args)
if target.decl.kind == FUNC_NORMAL:
rt_args[0] = RuntimeArg(sig.args[0].name, RInstance(cls))
# The environment operates on Vars, so we make some up
fake_vars = [(Var(arg.name), arg.type) for arg in rt_args]
args = [self.read(self.environment.add_local_reg(var, type, is_arg=True), line)
for var, type in fake_vars]
arg_names = [arg.name for arg in rt_args]
arg_kinds = [concrete_arg_kind(arg.kind) for arg in rt_args]
retval = self.call(target.decl, args, arg_kinds, arg_names, line)
retval = self.coerce(retval, sig.ret_type, line)
self.add(Return(retval))
blocks, env, ret_type, _ = self.leave()
return FuncIR(
FuncDecl(target.name + '__' + base.name + '_glue',
cls.name, self.module_name,
FuncSignature(rt_args, ret_type),
target.decl.kind),
blocks, env)
def gen_glue_property(self, sig: FuncSignature, target: FuncIR, cls: ClassIR, base: ClassIR,
line: int) -> FuncIR:
"""Similarly to methods, properties of derived types can be covariantly subtyped. Thus,
properties also require glue. However, this only requires the return type to change.
Further, instead of a method call, an attribute get is performed."""
self.enter(FuncInfo())
rt_arg = RuntimeArg(SELF_NAME, RInstance(cls))
arg = self.read(self.add_self_to_env(cls), line)
self.ret_types[-1] = sig.ret_type
retval = self.add(GetAttr(arg, target.name, line))
retbox = self.coerce(retval, sig.ret_type, line)
self.add(Return(retbox))
blocks, env, return_type, _ = self.leave()
return FuncIR(
FuncDecl(target.name + '__' + base.name + '_glue',
cls.name, self.module_name, FuncSignature([rt_arg], return_type)),
blocks, env)
def assign_if_null(self, target: AssignmentTargetRegister,
get_val: Callable[[], Value], line: int) -> None:
"""Generate blocks for registers that NULL values."""
error_block, body_block = BasicBlock(), BasicBlock()
self.add(Branch(target.register, error_block, body_block, Branch.IS_ERROR))
self.activate_block(error_block)
self.add(Assign(target.register, self.coerce(get_val(), target.register.type, line)))
self.goto(body_block)
self.activate_block(body_block)
def gen_glue_ne_method(self, cls: ClassIR, line: int) -> FuncIR:
"""Generate a __ne__ method from a __eq__ method. """
self.enter(FuncInfo())
rt_args = (RuntimeArg("self", RInstance(cls)), RuntimeArg("rhs", object_rprimitive))
# The environment operates on Vars, so we make some up
fake_vars = [(Var(arg.name), arg.type) for arg in rt_args]
args = [self.read(self.environment.add_local_reg(var, type, is_arg=True), line)
for var, type in fake_vars] # type: List[Value]
self.ret_types[-1] = object_rprimitive
# If __eq__ returns NotImplemented, then __ne__ should also
not_implemented_block, regular_block = BasicBlock(), BasicBlock()
eqval = self.add(MethodCall(args[0], '__eq__', [args[1]], line))
not_implemented = self.primitive_op(not_implemented_op, [], line)
self.add(Branch(
self.binary_op(eqval, not_implemented, 'is', line),
not_implemented_block,
regular_block,
Branch.BOOL_EXPR))
self.activate_block(regular_block)
retval = self.coerce(self.unary_op(eqval, 'not', line), object_rprimitive, line)
self.add(Return(retval))
self.activate_block(not_implemented_block)
self.add(Return(not_implemented))
blocks, env, ret_type, _ = self.leave()
return FuncIR(
FuncDecl('__ne__', cls.name, self.module_name,
FuncSignature(rt_args, ret_type)),
blocks, env)
def create_ne_from_eq(self, cdef: ClassDef) -> None:
cls = self.mapper.type_to_ir[cdef.info]
if cls.has_method('__eq__') and not cls.has_method('__ne__'):
f = self.gen_glue_ne_method(cls, cdef.line)
cls.method_decls['__ne__'] = f.decl
cls.methods['__ne__'] = f
self.functions.append(f)
def calculate_arg_defaults(self,
fn_info: FuncInfo,
env: Environment,
func_reg: Optional[Value]) -> None:
"""Calculate default argument values and store them.
They are stored in statics for top level functions and in
the function objects for nested functions (while constants are
still stored computed on demand).
"""
fitem = fn_info.fitem
for arg in fitem.arguments:
# Constant values don't get stored but just recomputed
if arg.initializer and not self.is_constant(arg.initializer):
value = self.coerce(self.accept(arg.initializer),
env.lookup(arg.variable).type, arg.line)
if not fn_info.is_nested:
name = fitem.fullname() + '.' + arg.variable.name()
self.add(InitStatic(value, name, self.module_name))
else:
assert func_reg is not None
self.add(SetAttr(func_reg, arg.variable.name(), value, arg.line))
def gen_arg_defaults(self) -> None:
"""Generate blocks for arguments that have default values.
If the passed value is an error value, then assign the default
value to the argument.
"""
fitem = self.fn_info.fitem
for arg in fitem.arguments:
if arg.initializer:
target = self.environment.lookup(arg.variable)
def get_default() -> Value:
assert arg.initializer is not None
# If it is constant, don't bother storing it
if self.is_constant(arg.initializer):
return self.accept(arg.initializer)
# Because gen_arg_defaults runs before calculate_arg_defaults, we
# add the static/attribute to final_names/the class here.
elif not self.fn_info.is_nested:
name = fitem.fullname() + '.' + arg.variable.name()
self.final_names.append((name, target.type))
return self.add(LoadStatic(target.type, name, self.module_name))
else:
name = arg.variable.name()
self.fn_info.callable_class.ir.attributes[name] = target.type
return self.add(
GetAttr(self.fn_info.callable_class.self_reg, name, arg.line))
assert isinstance(target, AssignmentTargetRegister)
self.assign_if_null(target,
get_default,
arg.initializer.line)
def gen_func_item(self,
fitem: FuncItem,
name: str,
sig: FuncSignature,
cdef: Optional[ClassDef] = None,
) -> Tuple[FuncIR, Optional[Value]]:
# TODO: do something about abstract methods.
"""Generates and returns the FuncIR for a given FuncDef.
If the given FuncItem is a nested function, then we generate a callable class representing
the function and use that instead of the actual function. if the given FuncItem contains a
nested function, then we generate an environment class so that inner nested functions can
access the environment of the given FuncDef.
Consider the following nested function.
def a() -> None:
def b() -> None:
def c() -> None:
return None
return None
return None
The classes generated would look something like the following.
has pointer to +-------+
+--------------------------> | a_env |
| +-------+
| ^
| | has pointer to
+-------+ associated with +-------+
| b_obj | -------------------> | b_env |
+-------+ +-------+
^
|
+-------+ has pointer to |
| c_obj | --------------------------+
+-------+
"""
func_reg = None # type: Optional[Value]
# We treat lambdas as always being nested because we always generate
# a class for lambdas, no matter where they are. (It would probably also
# work to special case toplevel lambdas and generate a non-class function.)
is_nested = fitem in self.nested_fitems or isinstance(fitem, LambdaExpr)
contains_nested = fitem in self.encapsulating_funcs.keys()
is_decorated = fitem in self.fdefs_to_decorators
in_non_ext = False
class_name = None
if cdef:
ir = self.mapper.type_to_ir[cdef.info]
in_non_ext = not ir.is_ext_class
class_name = cdef.name
self.enter(FuncInfo(fitem, name, class_name, self.gen_func_ns(),
is_nested, contains_nested, is_decorated, in_non_ext))
# Functions that contain nested functions need an environment class to store variables that
# are free in their nested functions. Generator functions need an environment class to
# store a variable denoting the next instruction to be executed when the __next__ function
# is called, along with all the variables inside the function itself.
if self.fn_info.contains_nested or self.fn_info.is_generator:
self.setup_env_class()
if self.fn_info.is_nested or self.fn_info.in_non_ext:
self.setup_callable_class()
if self.fn_info.is_generator:
# Do a first-pass and generate a function that just returns a generator object.
self.gen_generator_func()
blocks, env, ret_type, fn_info = self.leave()
func_ir, func_reg = self.gen_func_ir(blocks, sig, env, fn_info, cdef)
# Re-enter the FuncItem and visit the body of the function this time.
self.enter(fn_info)
self.setup_env_for_generator_class()
self.load_outer_envs(self.fn_info.generator_class)
if self.fn_info.is_nested and isinstance(fitem, FuncDef):
self.setup_func_for_recursive_call(fitem, self.fn_info.generator_class)
self.create_switch_for_generator_class()
self.add_raise_exception_blocks_to_generator_class(fitem.line)
else:
self.load_env_registers()
self.gen_arg_defaults()
if self.fn_info.contains_nested and not self.fn_info.is_generator:
self.finalize_env_class()
self.ret_types[-1] = sig.ret_type
# Add all variables and functions that are declared/defined within this
# function and are referenced in functions nested within this one to this
# function's environment class so the nested functions can reference
# them even if they are declared after the nested function's definition.
# Note that this is done before visiting the body of this function.
env_for_func = self.fn_info # type: Union[FuncInfo, ImplicitClass]
if self.fn_info.is_generator:
env_for_func = self.fn_info.generator_class
elif self.fn_info.is_nested or self.fn_info.in_non_ext:
env_for_func = self.fn_info.callable_class
if self.fn_info.fitem in self.free_variables:
# Sort the variables to keep things deterministic
for var in sorted(self.free_variables[self.fn_info.fitem], key=lambda x: x.name()):
if isinstance(var, Var):
rtype = self.type_to_rtype(var.type)
self.add_var_to_env_class(var, rtype, env_for_func, reassign=False)
if self.fn_info.fitem in self.encapsulating_funcs:
for nested_fn in self.encapsulating_funcs[self.fn_info.fitem]:
if isinstance(nested_fn, FuncDef):
# The return type is 'object' instead of an RInstance of the
# callable class because differently defined functions with
# the same name and signature across conditional blocks
# will generate different callable classes, so the callable
# class that gets instantiated must be generic.
self.add_var_to_env_class(nested_fn, object_rprimitive,
env_for_func, reassign=False)
self.accept(fitem.body)
self.maybe_add_implicit_return()
if self.fn_info.is_generator:
self.populate_switch_for_generator_class()
blocks, env, ret_type, fn_info = self.leave()
if fn_info.is_generator:
helper_fn_decl = self.add_helper_to_generator_class(blocks, sig, env, fn_info)
self.add_next_to_generator_class(fn_info, helper_fn_decl, sig)
self.add_send_to_generator_class(fn_info, helper_fn_decl, sig)
self.add_iter_to_generator_class(fn_info)
self.add_throw_to_generator_class(fn_info, helper_fn_decl, sig)
self.add_close_to_generator_class(fn_info)
if fitem.is_coroutine:
self.add_await_to_generator_class(fn_info)
else:
func_ir, func_reg = self.gen_func_ir(blocks, sig, env, fn_info, cdef)
self.calculate_arg_defaults(fn_info, env, func_reg)
return (func_ir, func_reg)
def gen_func_ir(self,
blocks: List[BasicBlock],
sig: FuncSignature,
env: Environment,
fn_info: FuncInfo,
cdef: Optional[ClassDef]) -> Tuple[FuncIR, Optional[Value]]:
"""Generates the FuncIR for a function given the blocks, environment, and function info of
a particular function and returns it. If the function is nested, also returns the register
containing the instance of the corresponding callable class.
"""
func_reg = None # type: Optional[Value]
if fn_info.is_nested or fn_info.in_non_ext:
func_ir = self.add_call_to_callable_class(blocks, sig, env, fn_info)
self.add_get_to_callable_class(fn_info)
func_reg = self.instantiate_callable_class(fn_info)
else:
assert isinstance(fn_info.fitem, FuncDef)
if fn_info.is_decorated:
class_name = None if cdef is None else cdef.name
func_decl = FuncDecl(fn_info.name, class_name, self.module_name, sig)
func_ir = FuncIR(func_decl, blocks, env, fn_info.fitem.line,
traceback_name=fn_info.fitem.name())
else:
func_ir = FuncIR(self.mapper.func_to_decl[fn_info.fitem], blocks, env,
fn_info.fitem.line, traceback_name=fn_info.fitem.name())
return (func_ir, func_reg)
def load_decorated_func(self, fdef: FuncDef, orig_func_reg: Value) -> Value:
"""
Given a decorated FuncDef and the register containing an instance of the callable class
representing that FuncDef, applies the corresponding decorator functions on that decorated
FuncDef and returns a register containing an instance of the callable class representing
the decorated function.
"""
if not self.is_decorated(fdef):
# If there are no decorators associated with the function, then just return the
# original function.
return orig_func_reg
decorators = self.fdefs_to_decorators[fdef]
func_reg = orig_func_reg
for d in reversed(decorators):
decorator = d.accept(self)
assert isinstance(decorator, Value)
func_reg = self.py_call(decorator, [func_reg], func_reg.line)
return func_reg
def maybe_add_implicit_return(self) -> None:
if is_none_rprimitive(self.ret_types[-1]) or is_object_rprimitive(self.ret_types[-1]):
self.add_implicit_return()
else:
self.add_implicit_unreachable()
def visit_func_def(self, fdef: FuncDef) -> None:
func_ir, func_reg = self.gen_func_item(fdef, fdef.name(), self.mapper.fdef_to_sig(fdef))
# If the function that was visited was a nested function, then either look it up in our
# current environment or define it if it was not already defined.
if func_reg:
self.assign(self.get_func_target(fdef), func_reg, fdef.line)
self.functions.append(func_ir)
def visit_overloaded_func_def(self, o: OverloadedFuncDef) -> None:
# Handle regular overload case
assert o.impl
self.accept(o.impl)
def add_implicit_return(self) -> None:
block = self.blocks[-1][-1]
if not block.ops or not isinstance(block.ops[-1], ControlOp):
retval = self.coerce(self.none(), self.ret_types[-1], -1)
self.nonlocal_control[-1].gen_return(self, retval, self.fn_info.fitem.line)
def add_implicit_unreachable(self) -> None:
block = self.blocks[-1][-1]
if not block.ops or not isinstance(block.ops[-1], ControlOp):
self.add(Unreachable())
def visit_block(self, block: Block) -> None:
if not block.is_unreachable:
for stmt in block.body:
self.accept(stmt)
# Raise a RuntimeError if we hit a non-empty unreachable block.
# Don't complain about empty unreachable blocks, since mypy inserts
# those after `if MYPY`.
elif block.body:
self.add(RaiseStandardError(RaiseStandardError.RUNTIME_ERROR,
'Reached allegedly unreachable code!',
block.line))
self.add(Unreachable())
def visit_expression_stmt(self, stmt: ExpressionStmt) -> None:
self.accept(stmt.expr)
def visit_return_stmt(self, stmt: ReturnStmt) -> None:
if stmt.expr:
retval = self.accept(stmt.expr)
else:
retval = self.none()
retval = self.coerce(retval, self.ret_types[-1], stmt.line)
self.nonlocal_control[-1].gen_return(self, retval, stmt.line)
def disallow_class_assignments(self, lvalues: List[Lvalue], line: int) -> None:
# Some best-effort attempts to disallow assigning to class
# variables that aren't marked ClassVar, since we blatantly
# miscompile the interaction between instance and class
# variables.
for lvalue in lvalues:
if (isinstance(lvalue, MemberExpr)
and isinstance(lvalue.expr, RefExpr)
and isinstance(lvalue.expr.node, TypeInfo)):
var = lvalue.expr.node[lvalue.name].node
if isinstance(var, Var) and not var.is_classvar:
self.error(
"Only class variables defined as ClassVar can be assigned to",
line)
def non_function_scope(self) -> bool:
# Currently the stack always has at least two items: dummy and top-level.
return len(self.fn_infos) <= 2
def init_final_static(self, lvalue: Lvalue, rvalue_reg: Value,
class_name: Optional[str] = None) -> None:
assert isinstance(lvalue, NameExpr)
assert isinstance(lvalue.node, Var)
if lvalue.node.final_value is None:
if class_name is None:
name = lvalue.name
else:
name = '{}.{}'.format(class_name, lvalue.name)
assert name is not None, "Full name not set for variable"
self.final_names.append((name, rvalue_reg.type))
self.add(InitStatic(rvalue_reg, name, self.module_name))
def load_final_static(self, fullname: str, typ: RType, line: int,
error_name: Optional[str] = None) -> Value:
if error_name is None:
error_name = fullname
ok_block, error_block = BasicBlock(), BasicBlock()
split_name = split_target(self.graph, fullname)
assert split_name is not None
value = self.add(LoadStatic(typ, split_name[1], split_name[0], line=line))
self.add(Branch(value, error_block, ok_block, Branch.IS_ERROR, rare=True))
self.activate_block(error_block)
self.add(RaiseStandardError(RaiseStandardError.VALUE_ERROR,
'value for final name "{}" was not set'.format(error_name),
line))
self.add(Unreachable())
self.activate_block(ok_block)
return value
def load_final_literal_value(self, val: Union[int, str, bytes, float, bool],
line: int) -> Value:
"""Load value of a final name or class-level attribute."""
if isinstance(val, bool):
if val:
return self.primitive_op(true_op, [], line)
else:
return self.primitive_op(false_op, [], line)
elif isinstance(val, int):
# TODO: take care of negative integer initializers
# (probably easier to fix this in mypy itself).
if val > MAX_LITERAL_SHORT_INT:
return self.load_static_int(val)
return self.add(LoadInt(val))
elif isinstance(val, float):
return self.load_static_float(val)
elif isinstance(val, str):
return self.load_static_unicode(val)
elif isinstance(val, bytes):
return self.load_static_bytes(val)
else:
assert False, "Unsupported final literal value"
def visit_assignment_stmt(self, stmt: AssignmentStmt) -> None:
assert len(stmt.lvalues) >= 1
self.disallow_class_assignments(stmt.lvalues, stmt.line)
lvalue = stmt.lvalues[0]
if stmt.type and isinstance(stmt.rvalue, TempNode):
# This is actually a variable annotation without initializer. Don't generate
# an assignment but we need to call get_assignment_target since it adds a
# name binding as a side effect.
self.get_assignment_target(lvalue, stmt.line)
return
line = stmt.rvalue.line
rvalue_reg = self.accept(stmt.rvalue)
if self.non_function_scope() and stmt.is_final_def:
self.init_final_static(lvalue, rvalue_reg)
for lvalue in stmt.lvalues:
target = self.get_assignment_target(lvalue)
self.assign(target, rvalue_reg, line)
def visit_operator_assignment_stmt(self, stmt: OperatorAssignmentStmt) -> None:
"""Operator assignment statement such as x += 1"""
self.disallow_class_assignments([stmt.lvalue], stmt.line)
target = self.get_assignment_target(stmt.lvalue)
target_value = self.read(target, stmt.line)
rreg = self.accept(stmt.rvalue)
# the Python parser strips the '=' from operator assignment statements, so re-add it
op = stmt.op + '='
res = self.binary_op(target_value, rreg, op, stmt.line)
# usually operator assignments are done in-place
# but when target doesn't support that we need to manually assign
self.assign(target, res, res.line)
def get_assignment_target(self, lvalue: Lvalue,
line: int = -1) -> AssignmentTarget:
if isinstance(lvalue, NameExpr):
# If we are visiting a decorator, then the SymbolNode we really want to be looking at
# is the function that is decorated, not the entire Decorator node itself.
symbol = lvalue.node
if isinstance(symbol, Decorator):
symbol = symbol.func
if symbol is None:
# New semantic analyzer doesn't create ad-hoc Vars for special forms.
assert lvalue.is_special_form
symbol = Var(lvalue.name)
if lvalue.kind == LDEF:
if symbol not in self.environment.symtable:
# If the function is a generator function, then first define a new variable
# in the current function's environment class. Next, define a target that
# refers to the newly defined variable in that environment class. Add the
# target to the table containing class environment variables, as well as the
# current environment.
if self.fn_info.is_generator:
return self.add_var_to_env_class(symbol, self.node_type(lvalue),
self.fn_info.generator_class,
reassign=False)
# Otherwise define a new local variable.
return self.environment.add_local_reg(symbol, self.node_type(lvalue))
else:
# Assign to a previously defined variable.
return self.environment.lookup(symbol)
elif lvalue.kind == GDEF:
globals_dict = self.load_globals_dict()
name = self.load_static_unicode(lvalue.name)
return AssignmentTargetIndex(globals_dict, name)
else:
assert False, lvalue.kind
elif isinstance(lvalue, IndexExpr):
# Indexed assignment x[y] = e
base = self.accept(lvalue.base)
index = self.accept(lvalue.index)
return AssignmentTargetIndex(base, index)
elif isinstance(lvalue, MemberExpr):
# Attribute assignment x.y = e
obj = self.accept(lvalue.expr)
return AssignmentTargetAttr(obj, lvalue.name)
elif isinstance(lvalue, TupleExpr):
# Multiple assignment a, ..., b = e
star_idx = None # type: Optional[int]
lvalues = []
for idx, item in enumerate(lvalue.items):
targ = self.get_assignment_target(item)
lvalues.append(targ)
if isinstance(item, StarExpr):
if star_idx is not None:
self.error("Two starred expressions in assignment", line)
star_idx = idx
return AssignmentTargetTuple(lvalues, star_idx)
elif isinstance(lvalue, StarExpr):
return self.get_assignment_target(lvalue.expr)
assert False, 'Unsupported lvalue: %r' % lvalue
def read(self, target: Union[Value, AssignmentTarget], line: int = -1) -> Value:
if isinstance(target, Value):
return target
if isinstance(target, AssignmentTargetRegister):
return target.register
if isinstance(target, AssignmentTargetIndex):
reg = self.gen_method_call(
target.base, '__getitem__', [target.index], target.type, line)
if reg is not None:
return reg
assert False, target.base.type
if isinstance(target, AssignmentTargetAttr):
if isinstance(target.obj.type, RInstance) and target.obj.type.class_ir.is_ext_class:
return self.add(GetAttr(target.obj, target.attr, line))
else:
return self.py_get_attr(target.obj, target.attr, line)
assert False, 'Unsupported lvalue: %r' % target
def assign(self, target: Union[Register, AssignmentTarget],
rvalue_reg: Value, line: int) -> None:
if isinstance(target, Register):
self.add(Assign(target, rvalue_reg))
elif isinstance(target, AssignmentTargetRegister):
rvalue_reg = self.coerce(rvalue_reg, target.type, line)
self.add(Assign(target.register, rvalue_reg))
elif isinstance(target, AssignmentTargetAttr):
if isinstance(target.obj_type, RInstance):
rvalue_reg = self.coerce(rvalue_reg, target.type, line)
self.add(SetAttr(target.obj, target.attr, rvalue_reg, line))
else:
key = self.load_static_unicode(target.attr)
boxed_reg = self.box(rvalue_reg)
self.add(PrimitiveOp([target.obj, key, boxed_reg], py_setattr_op, line))
elif isinstance(target, AssignmentTargetIndex):
target_reg2 = self.gen_method_call(
target.base, '__setitem__', [target.index, rvalue_reg], None, line)
assert target_reg2 is not None, target.base.type
elif isinstance(target, AssignmentTargetTuple):
if isinstance(rvalue_reg.type, RTuple) and target.star_idx is None:
rtypes = rvalue_reg.type.types
assert len(rtypes) == len(target.items)
for i in range(len(rtypes)):
item_value = self.add(TupleGet(rvalue_reg, i, line))
self.assign(target.items[i], item_value, line)
else:
self.process_iterator_tuple_assignment(target, rvalue_reg, line)
else:
assert False, 'Unsupported assignment target'
def process_iterator_tuple_assignment_helper(self,
litem: AssignmentTarget,
ritem: Value, line: int) -> None:
error_block, ok_block = BasicBlock(), BasicBlock()
self.add(Branch(ritem, error_block, ok_block, Branch.IS_ERROR))
self.activate_block(error_block)
self.add(RaiseStandardError(RaiseStandardError.VALUE_ERROR,
'not enough values to unpack', line))
self.add(Unreachable())
self.activate_block(ok_block)
self.assign(litem, ritem, line)
def process_iterator_tuple_assignment(self,
target: AssignmentTargetTuple,
rvalue_reg: Value,
line: int) -> None:
iterator = self.primitive_op(iter_op, [rvalue_reg], line)
# This may be the whole lvalue list if there is no starred value
split_idx = target.star_idx if target.star_idx is not None else len(target.items)
# Assign values before the first starred value
for litem in target.items[:split_idx]:
ritem = self.primitive_op(next_op, [iterator], line)
error_block, ok_block = BasicBlock(), BasicBlock()
self.add(Branch(ritem, error_block, ok_block, Branch.IS_ERROR))
self.activate_block(error_block)
self.add(RaiseStandardError(RaiseStandardError.VALUE_ERROR,
'not enough values to unpack', line))
self.add(Unreachable())
self.activate_block(ok_block)
self.assign(litem, ritem, line)
# Assign the starred value and all values after it
if target.star_idx is not None:
post_star_vals = target.items[split_idx + 1:]
iter_list = self.primitive_op(to_list, [iterator], line)
iter_list_len = self.primitive_op(list_len_op, [iter_list], line)
post_star_len = self.add(LoadInt(len(post_star_vals)))
condition = self.binary_op(post_star_len, iter_list_len, '<=', line)
error_block, ok_block = BasicBlock(), BasicBlock()
self.add(Branch(condition, ok_block, error_block, Branch.BOOL_EXPR))
self.activate_block(error_block)
self.add(RaiseStandardError(RaiseStandardError.VALUE_ERROR,
'not enough values to unpack', line))
self.add(Unreachable())
self.activate_block(ok_block)
for litem in reversed(post_star_vals):
ritem = self.primitive_op(list_pop_last, [iter_list], line)
self.assign(litem, ritem, line)
# Assign the starred value
self.assign(target.items[target.star_idx], iter_list, line)
# There is no starred value, so check if there are extra values in rhs that
# have not been assigned.
else:
extra = self.primitive_op(next_op, [iterator], line)
error_block, ok_block = BasicBlock(), BasicBlock()
self.add(Branch(extra, ok_block, error_block, Branch.IS_ERROR))
self.activate_block(error_block)
self.add(RaiseStandardError(RaiseStandardError.VALUE_ERROR,
'too many values to unpack', line))
self.add(Unreachable())
self.activate_block(ok_block)
def visit_if_stmt(self, stmt: IfStmt) -> None:
if_body, next = BasicBlock(), BasicBlock()
else_body = BasicBlock() if stmt.else_body else next
# If statements are normalized
assert len(stmt.expr) == 1
self.process_conditional(stmt.expr[0], if_body, else_body)
self.activate_block(if_body)
self.accept(stmt.body[0])
self.goto(next)
if stmt.else_body:
self.activate_block(else_body)
self.accept(stmt.else_body)
self.goto(next)
self.activate_block(next)
def push_loop_stack(self, continue_block: BasicBlock, break_block: BasicBlock) -> None:
self.nonlocal_control.append(
LoopNonlocalControl(self.nonlocal_control[-1], continue_block, break_block))
def pop_loop_stack(self) -> None:
self.nonlocal_control.pop()
def visit_while_stmt(self, s: WhileStmt) -> None:
body, next, top, else_block = BasicBlock(), BasicBlock(), BasicBlock(), BasicBlock()
normal_loop_exit = else_block if s.else_body is not None else next
self.push_loop_stack(top, next)
# Split block so that we get a handle to the top of the loop.
self.goto_and_activate(top)
self.process_conditional(s.expr, body, normal_loop_exit)
self.activate_block(body)
self.accept(s.body)
# Add branch to the top at the end of the body.
self.goto(top)
self.pop_loop_stack()
if s.else_body is not None:
self.activate_block(else_block)
self.accept(s.else_body)
self.goto(next)
self.activate_block(next)
def visit_for_stmt(self, s: ForStmt) -> None:
def body() -> None:
self.accept(s.body)
def else_block() -> None:
assert s.else_body is not None
self.accept(s.else_body)
self.for_loop_helper(s.index, s.expr, body,
else_block if s.else_body else None, s.line)
def spill(self, value: Value) -> AssignmentTarget:
"""Moves a given Value instance into the generator class' environment class."""
name = '{}{}'.format(TEMP_ATTR_NAME, self.temp_counter)
self.temp_counter += 1
target = self.add_var_to_env_class(Var(name), value.type, self.fn_info.generator_class)
# Shouldn't be able to fail, so -1 for line
self.assign(target, value, -1)
return target
def maybe_spill(self, value: Value) -> Union[Value, AssignmentTarget]:
"""
Moves a given Value instance into the environment class for generator functions. For
non-generator functions, leaves the Value instance as it is.
Returns an AssignmentTarget associated with the Value for generator functions and the
original Value itself for non-generator functions.
"""
if self.fn_info.is_generator:
return self.spill(value)
return value
def maybe_spill_assignable(self, value: Value) -> Union[Register, AssignmentTarget]:
"""
Moves a given Value instance into the environment class for generator functions. For
non-generator functions, allocate a temporary Register.
Returns an AssignmentTarget associated with the Value for generator functions and an
assignable Register for non-generator functions.
"""
if self.fn_info.is_generator:
return self.spill(value)
if isinstance(value, Register):
return value
# Allocate a temporary register for the assignable value.
reg = self.alloc_temp(value.type)
self.assign(reg, value, -1)
return reg
def for_loop_helper(self, index: Lvalue, expr: Expression,
body_insts: GenFunc, else_insts: Optional[GenFunc],
line: int) -> None:
"""Generate IR for a loop.
Args:
index: the loop index Lvalue
expr: the expression to iterate over
body_insts: a function that generates the body of the loop
else_insts: a function that generates the else block instructions
"""
# Body of the loop
body_block = BasicBlock()
# Block that steps to the next item
step_block = BasicBlock()
# Block for the else clause, if we need it
else_block = BasicBlock()
# Block executed after the loop
exit_block = BasicBlock()
# Determine where we want to exit, if our condition check fails.
normal_loop_exit = else_block if else_insts is not None else exit_block
for_gen = self.make_for_loop_generator(index, expr, body_block, normal_loop_exit, line)
self.push_loop_stack(step_block, exit_block)
condition_block = self.goto_new_block()
# Add loop condition check.
for_gen.gen_condition()
# Generate loop body.
self.activate_block(body_block)
for_gen.begin_body()
body_insts()
# We generate a separate step block (which might be empty).
self.goto_and_activate(step_block)
for_gen.gen_step()
# Go back to loop condition.
self.goto(condition_block)
for_gen.add_cleanup(normal_loop_exit)
self.pop_loop_stack()
if else_insts is not None:
self.activate_block(else_block)
else_insts()
self.goto(exit_block)
self.activate_block(exit_block)
def extract_int(self, e: Expression) -> Optional[int]:
if isinstance(e, IntExpr):
return e.value
elif isinstance(e, UnaryExpr) and e.op == '-' and isinstance(e.expr, IntExpr):
return -e.expr.value
else:
return None
def make_for_loop_generator(self,
index: Lvalue,
expr: Expression,
body_block: BasicBlock,
loop_exit: BasicBlock,
line: int,
nested: bool = False) -> ForGenerator:
"""Return helper object for generating a for loop over an iterable.
If "nested" is True, this is a nested iterator such as "e" in "enumerate(e)".
"""
if is_list_rprimitive(self.node_type(expr)):
# Special case "for x in <list>".
expr_reg = self.accept(expr)
target_list_type = get_proper_type(self.types[expr])
assert isinstance(target_list_type, Instance)
target_type = self.type_to_rtype(target_list_type.args[0])
for_list = ForList(self, index, body_block, loop_exit, line, nested)
for_list.init(expr_reg, target_type, reverse=False)
return for_list
if (isinstance(expr, CallExpr)
and isinstance(expr.callee, RefExpr)):
if (expr.callee.fullname == 'builtins.range'
and (len(expr.args) <= 2
or (len(expr.args) == 3
and self.extract_int(expr.args[2]) is not None))
and set(expr.arg_kinds) == {ARG_POS}):
# Special case "for x in range(...)".
# We support the 3 arg form but only for int literals, since it doesn't
# seem worth the hassle of supporting dynamically determining which
# direction of comparison to do.
if len(expr.args) == 1:
start_reg = self.add(LoadInt(0))
end_reg = self.accept(expr.args[0])
else:
start_reg = self.accept(expr.args[0])
end_reg = self.accept(expr.args[1])
if len(expr.args) == 3:
step = self.extract_int(expr.args[2])
assert step is not None
if step == 0:
self.error("range() step can't be zero", expr.args[2].line)
else:
step = 1
for_range = ForRange(self, index, body_block, loop_exit, line, nested)
for_range.init(start_reg, end_reg, step)
return for_range
elif (expr.callee.fullname == 'builtins.enumerate'
and len(expr.args) == 1
and expr.arg_kinds == [ARG_POS]
and isinstance(index, TupleExpr)
and len(index.items) == 2):
# Special case "for i, x in enumerate(y)".
lvalue1 = index.items[0]
lvalue2 = index.items[1]
for_enumerate = ForEnumerate(self, index, body_block, loop_exit, line,
nested)
for_enumerate.init(lvalue1, lvalue2, expr.args[0])
return for_enumerate
elif (expr.callee.fullname == 'builtins.zip'
and len(expr.args) >= 2
and set(expr.arg_kinds) == {ARG_POS}
and isinstance(index, TupleExpr)
and len(index.items) == len(expr.args)):
# Special case "for x, y in zip(a, b)".
for_zip = ForZip(self, index, body_block, loop_exit, line, nested)
for_zip.init(index.items, expr.args)
return for_zip
if (expr.callee.fullname == 'builtins.reversed'
and len(expr.args) == 1
and expr.arg_kinds == [ARG_POS]
and is_list_rprimitive(self.node_type(expr.args[0]))):
# Special case "for x in reversed(<list>)".
expr_reg = self.accept(expr.args[0])
target_list_type = get_proper_type(self.types[expr.args[0]])
assert isinstance(target_list_type, Instance)
target_type = self.type_to_rtype(target_list_type.args[0])
for_list = ForList(self, index, body_block, loop_exit, line, nested)
for_list.init(expr_reg, target_type, reverse=True)
return for_list
# Default to a generic for loop.
expr_reg = self.accept(expr)
for_obj = ForIterable(self, index, body_block, loop_exit, line, nested)
item_type = self._analyze_iterable_item_type(expr)
item_rtype = self.type_to_rtype(item_type)
for_obj.init(expr_reg, item_rtype)
return for_obj
def _analyze_iterable_item_type(self, expr: Expression) -> Type:
"""Return the item type given by 'expr' in an iterable context."""
# This logic is copied from mypy's TypeChecker.analyze_iterable_item_type.
iterable = get_proper_type(self.types[expr])
echk = self.graph[self.module_name].type_checker().expr_checker
iterator = echk.check_method_call_by_name('__iter__', iterable, [], [], expr)[0]
from mypy.join import join_types
if isinstance(iterable, TupleType):
joined = UninhabitedType() # type: Type
for item in iterable.items:
joined = join_types(joined, item)
return joined
else:
# Non-tuple iterable.
return echk.check_method_call_by_name('__next__', iterator, [], [], expr)[0]
def visit_break_stmt(self, node: BreakStmt) -> None:
self.nonlocal_control[-1].gen_break(self, node.line)
def visit_continue_stmt(self, node: ContinueStmt) -> None:
self.nonlocal_control[-1].gen_continue(self, node.line)
def visit_unary_expr(self, expr: UnaryExpr) -> Value:
return self.unary_op(self.accept(expr.expr), expr.op, expr.line)
def visit_op_expr(self, expr: OpExpr) -> Value:
if expr.op in ('and', 'or'):
return self.shortcircuit_expr(expr)
return self.binary_op(self.accept(expr.left), self.accept(expr.right), expr.op, expr.line)
def translate_eq_cmp(self,
lreg: Value,
rreg: Value,
expr_op: str,
line: int) -> Optional[Value]:
ltype = lreg.type
rtype = rreg.type
if not (isinstance(ltype, RInstance) and ltype == rtype):
return None
class_ir = ltype.class_ir
# Check whether any subclasses of the operand redefines __eq__
# or it might be redefined in a Python parent class or by
# dataclasses
cmp_varies_at_runtime = (
not class_ir.is_method_final('__eq__')
or not class_ir.is_method_final('__ne__')
or class_ir.inherits_python
or class_ir.is_augmented
)
if cmp_varies_at_runtime:
# We might need to call left.__eq__(right) or right.__eq__(left)
# depending on which is the more specific type.
return None
if not class_ir.has_method('__eq__'):
# There's no __eq__ defined, so just use object identity.
identity_ref_op = 'is' if expr_op == '==' else 'is not'
return self.binary_op(lreg, rreg, identity_ref_op, line)
return self.gen_method_call(
lreg,
op_methods[expr_op],
[rreg],
ltype,
line
)
def matching_primitive_op(self,
candidates: List[OpDescription],
args: List[Value],
line: int,
result_type: Optional[RType] = None) -> Optional[Value]:
# Find the highest-priority primitive op that matches.
matching = None # type: Optional[OpDescription]
for desc in candidates:
if len(desc.arg_types) != len(args):
continue
if all(is_subtype(actual.type, formal)
for actual, formal in zip(args, desc.arg_types)):
if matching:
assert matching.priority != desc.priority, 'Ambiguous:\n1) %s\n2) %s' % (
matching, desc)
if desc.priority > matching.priority:
matching = desc
else:
matching = desc
if matching:
target = self.primitive_op(matching, args, line)
if result_type and not is_runtime_subtype(target.type, result_type):
if is_none_rprimitive(result_type):
# Special case None return. The actual result may actually be a bool
# and so we can't just coerce it.
target = self.none()
else:
target = self.coerce(target, result_type, line)
return target
return None
def binary_op(self,
lreg: Value,
rreg: Value,
expr_op: str,
line: int) -> Value:
# Special case == and != when we can resolve the method call statically.
value = None
if expr_op in ('==', '!='):
value = self.translate_eq_cmp(lreg, rreg, expr_op, line)
if value is not None:
return value
ops = binary_ops.get(expr_op, [])
target = self.matching_primitive_op(ops, [lreg, rreg], line)
assert target, 'Unsupported binary operation: %s' % expr_op
return target
def unary_op(self,
lreg: Value,
expr_op: str,
line: int) -> Value:
ops = unary_ops.get(expr_op, [])
target = self.matching_primitive_op(ops, [lreg], line)
assert target, 'Unsupported unary operation: %s' % expr_op
return target
def visit_index_expr(self, expr: IndexExpr) -> Value:
base = self.accept(expr.base)
if isinstance(base.type, RTuple) and isinstance(expr.index, IntExpr):
return self.add(TupleGet(base, expr.index.value, expr.line))
index_reg = self.accept(expr.index)
return self.gen_method_call(
base, '__getitem__', [index_reg], self.node_type(expr), expr.line)
def visit_int_expr(self, expr: IntExpr) -> Value:
if expr.value > MAX_LITERAL_SHORT_INT:
return self.load_static_int(expr.value)
return self.add(LoadInt(expr.value))
def visit_float_expr(self, expr: FloatExpr) -> Value:
return self.load_static_float(expr.value)
def visit_complex_expr(self, expr: ComplexExpr) -> Value:
return self.load_static_complex(expr.value)
def visit_bytes_expr(self, expr: BytesExpr) -> Value:
value = bytes(expr.value, 'utf8').decode('unicode-escape').encode('raw-unicode-escape')
return self.load_static_bytes(value)
def is_native_ref_expr(self, expr: RefExpr) -> bool:
if expr.node is None:
return False
if '.' in expr.node.fullname():
return expr.node.fullname().rpartition('.')[0] in self.modules
return True
def is_native_module_ref_expr(self, expr: RefExpr) -> bool:
return self.is_native_ref_expr(expr) and expr.kind == GDEF
def is_synthetic_type(self, typ: TypeInfo) -> bool:
"""Is a type something other than just a class we've created?"""
return typ.is_named_tuple or typ.is_newtype or typ.typeddict_type is not None
def is_decorated(self, fdef: FuncDef) -> bool:
return fdef in self.fdefs_to_decorators
def is_free_variable(self, symbol: SymbolNode) -> bool:
fitem = self.fn_info.fitem
return fitem in self.free_variables and symbol in self.free_variables[fitem]
def get_final_ref(self, expr: MemberExpr) -> Optional[Tuple[str, Var, bool]]:
"""Check if `expr` is a final attribute.
This needs to be done differently for class and module attributes to
correctly determine fully qualified name. Return a tuple that consists of
the qualified name, the corresponding Var node, and a flag indicating whether
the final name was defined in a compiled module. Return None if `expr` does not
refer to a final attribute.
"""
final_var = None
if isinstance(expr.expr, RefExpr) and isinstance(expr.expr.node, TypeInfo):
# a class attribute
sym = expr.expr.node.get(expr.name)
if sym and isinstance(sym.node, Var):
# Enum attribute are treated as final since they are added to the global cache
expr_fullname = expr.expr.node.bases[0].type.fullname()
is_final = sym.node.is_final or expr_fullname == 'enum.Enum'
if is_final:
final_var = sym.node
fullname = '{}.{}'.format(sym.node.info.fullname(), final_var.name())
native = expr.expr.node.module_name in self.modules
elif self.is_module_member_expr(expr):
# a module attribute
if isinstance(expr.node, Var) and expr.node.is_final:
final_var = expr.node
fullname = expr.node.fullname()
native = self.is_native_ref_expr(expr)
if final_var is not None:
return fullname, final_var, native
return None
def emit_load_final(self, final_var: Var, fullname: str,
name: str, native: bool, typ: Type, line: int) -> Optional[Value]:
"""Emit code for loading value of a final name (if possible).
Args:
final_var: Var corresponding to the final name
fullname: its qualified name
name: shorter name to show in errors
native: whether the name was defined in a compiled module
typ: its type
line: line number where loading occurs
"""
if final_var.final_value is not None: # this is safe even for non-native names
return self.load_final_literal_value(final_var.final_value, line)
elif native:
return self.load_final_static(fullname, self.mapper.type_to_rtype(typ),
line, name)
else:
return None
def visit_name_expr(self, expr: NameExpr) -> Value:
assert expr.node, "RefExpr not resolved"
fullname = expr.node.fullname()
if fullname in name_ref_ops:
# Use special access op for this particular name.
desc = name_ref_ops[fullname]
assert desc.result_type is not None
return self.add(PrimitiveOp([], desc, expr.line))
if isinstance(expr.node, Var) and expr.node.is_final:
value = self.emit_load_final(expr.node, fullname, expr.name,
self.is_native_ref_expr(expr), self.types[expr],
expr.line)
if value is not None:
return value
if isinstance(expr.node, MypyFile) and expr.node.fullname() in self.imports:
return self.load_module(expr.node.fullname())
# If the expression is locally defined, then read the result from the corresponding
# assignment target and return it. Otherwise if the expression is a global, load it from
# the globals dictionary.
# Except for imports, that currently always happens in the global namespace.
if expr.kind == LDEF and not (isinstance(expr.node, Var)
and expr.node.is_suppressed_import):
# Try to detect and error when we hit the irritating mypy bug
# where a local variable is cast to None. (#5423)
if (isinstance(expr.node, Var) and is_none_rprimitive(self.node_type(expr))
and expr.node.is_inferred):
self.error(
"Local variable '{}' has inferred type None; add an annotation".format(
expr.node.name()),
expr.node.line)
# TODO: Behavior currently only defined for Var and FuncDef node types.
return self.read(self.get_assignment_target(expr), expr.line)
return self.load_global(expr)
def is_module_member_expr(self, expr: MemberExpr) -> bool:
return isinstance(expr.expr, RefExpr) and isinstance(expr.expr.node, MypyFile)
def visit_member_expr(self, expr: MemberExpr) -> Value:
# First check if this is maybe a final attribute.
final = self.get_final_ref(expr)
if final is not None:
fullname, final_var, native = final
value = self.emit_load_final(final_var, fullname, final_var.name(), native,
self.types[expr], expr.line)
if value is not None:
return value
if isinstance(expr.node, MypyFile) and expr.node.fullname() in self.imports:
return self.load_module(expr.node.fullname())
obj = self.accept(expr.expr)
return self.get_attr(obj, expr.name, self.node_type(expr), expr.line)
def get_attr(self, obj: Value, attr: str, result_type: RType, line: int) -> Value:
if (isinstance(obj.type, RInstance) and obj.type.class_ir.is_ext_class
and obj.type.class_ir.has_attr(attr)):
return self.add(GetAttr(obj, attr, line))
elif isinstance(obj.type, RUnion):
return self.union_get_attr(obj, obj.type, attr, result_type, line)
else:
return self.py_get_attr(obj, attr, line)
def union_get_attr(self,
obj: Value,
rtype: RUnion,
attr: str,
result_type: RType,
line: int) -> Value:
def get_item_attr(value: Value) -> Value:
return self.get_attr(value, attr, result_type, line)
return self.decompose_union_helper(obj, rtype, result_type, get_item_attr, line)
def decompose_union_helper(self,
obj: Value,
rtype: RUnion,
result_type: RType,
process_item: Callable[[Value], Value],
line: int) -> Value:
"""Generate isinstance() + specialized operations for union items.
Say, for Union[A, B] generate ops resembling this (pseudocode):
if isinstance(obj, A):
result = <result of process_item(cast(A, obj)>
else:
result = <result of process_item(cast(B, obj)>
Args:
obj: value with a union type
rtype: the union type
result_type: result of the operation
process_item: callback to generate op for a single union item (arg is coerced
to union item type)
line: line number
"""
# TODO: Optimize cases where a single operation can handle multiple union items
# (say a method is implemented in a common base class)
fast_items = []
rest_items = []
for item in rtype.items:
if isinstance(item, RInstance):
fast_items.append(item)
else:
# For everything but RInstance we fall back to C API
rest_items.append(item)
exit_block = BasicBlock()
result = self.alloc_temp(result_type)
for i, item in enumerate(fast_items):
more_types = i < len(fast_items) - 1 or rest_items
if more_types:
# We are not at the final item so we need one more branch
op = self.isinstance_native(obj, item.class_ir, line)
true_block, false_block = BasicBlock(), BasicBlock()
self.add_bool_branch(op, true_block, false_block)
self.activate_block(true_block)
coerced = self.coerce(obj, item, line)
temp = process_item(coerced)
temp2 = self.coerce(temp, result_type, line)
self.add(Assign(result, temp2))
self.goto(exit_block)
if more_types:
self.activate_block(false_block)
if rest_items:
# For everything else we use generic operation. Use force=True to drop the
# union type.
coerced = self.coerce(obj, object_rprimitive, line, force=True)
temp = process_item(coerced)
temp2 = self.coerce(temp, result_type, line)
self.add(Assign(result, temp2))
self.goto(exit_block)
self.activate_block(exit_block)
return result
def isinstance_helper(self, obj: Value, class_irs: List[ClassIR], line: int) -> Value:
"""Fast path for isinstance() that checks against a list of native classes."""
if not class_irs:
return self.primitive_op(false_op, [], line)
ret = self.isinstance_native(obj, class_irs[0], line)
for class_ir in class_irs[1:]:
def other() -> Value:
return self.isinstance_native(obj, class_ir, line)
ret = self.shortcircuit_helper('or', bool_rprimitive, lambda: ret, other, line)
return ret
def isinstance_native(self, obj: Value, class_ir: ClassIR, line: int) -> Value:
"""Fast isinstance() check for a native class.
If there three or less concrete (non-trait) classes among the class and all
its children, use even faster type comparison checks `type(obj) is typ`.
"""
concrete = all_concrete_classes(class_ir)
if concrete is None or len(concrete) > FAST_ISINSTANCE_MAX_SUBCLASSES + 1:
return self.primitive_op(fast_isinstance_op,
[obj, self.get_native_type(class_ir)],
line)
if not concrete:
# There can't be any concrete instance that matches this.
return self.primitive_op(false_op, [], line)
type_obj = self.get_native_type(concrete[0])
ret = self.primitive_op(type_is_op, [obj, type_obj], line)
for c in concrete[1:]:
def other() -> Value:
return self.primitive_op(type_is_op, [obj, self.get_native_type(c)], line)
ret = self.shortcircuit_helper('or', bool_rprimitive, lambda: ret, other, line)
return ret
def get_native_type(self, cls: ClassIR) -> Value:
fullname = '%s.%s' % (cls.module_name, cls.name)
return self.load_native_type_object(fullname)
def py_get_attr(self, obj: Value, attr: str, line: int) -> Value:
key = self.load_static_unicode(attr)
return self.add(PrimitiveOp([obj, key], py_getattr_op, line))
def py_call(self,
function: Value,
arg_values: List[Value],
line: int,
arg_kinds: Optional[List[int]] = None,
arg_names: Optional[List[Optional[str]]] = None) -> Value:
"""Use py_call_op or py_call_with_kwargs_op for function call."""
# If all arguments are positional, we can use py_call_op.
if (arg_kinds is None) or all(kind == ARG_POS for kind in arg_kinds):
return self.primitive_op(py_call_op, [function] + arg_values, line)
# Otherwise fallback to py_call_with_kwargs_op.
assert arg_names is not None
pos_arg_values = []
kw_arg_key_value_pairs = [] # type: List[DictEntry]
star_arg_values = []
for value, kind, name in zip(arg_values, arg_kinds, arg_names):
if kind == ARG_POS:
pos_arg_values.append(value)
elif kind == ARG_NAMED:
assert name is not None
key = self.load_static_unicode(name)
kw_arg_key_value_pairs.append((key, value))
elif kind == ARG_STAR:
star_arg_values.append(value)
elif kind == ARG_STAR2:
# NOTE: mypy currently only supports a single ** arg, but python supports multiple.
# This code supports multiple primarily to make the logic easier to follow.
kw_arg_key_value_pairs.append((None, value))
else:
assert False, ("Argument kind should not be possible:", kind)
if len(star_arg_values) == 0:
# We can directly construct a tuple if there are no star args.
pos_args_tuple = self.primitive_op(new_tuple_op, pos_arg_values, line)
else:
# Otherwise we construct a list and call extend it with the star args, since tuples
# don't have an extend method.
pos_args_list = self.primitive_op(new_list_op, pos_arg_values, line)
for star_arg_value in star_arg_values:
self.primitive_op(list_extend_op, [pos_args_list, star_arg_value], line)
pos_args_tuple = self.primitive_op(list_tuple_op, [pos_args_list], line)
kw_args_dict = self.make_dict(kw_arg_key_value_pairs, line)
return self.primitive_op(
py_call_with_kwargs_op, [function, pos_args_tuple, kw_args_dict], line)
def py_method_call(self,
obj: Value,
method_name: str,
arg_values: List[Value],
line: int,
arg_kinds: Optional[List[int]] = None,
arg_names: Optional[List[Optional[str]]] = None) -> Value:
if (arg_kinds is None) or all(kind == ARG_POS for kind in arg_kinds):
method_name_reg = self.load_static_unicode(method_name)
return self.primitive_op(py_method_call_op, [obj, method_name_reg] + arg_values, line)
else:
method = self.py_get_attr(obj, method_name, line)
return self.py_call(method, arg_values, line, arg_kinds=arg_kinds, arg_names=arg_names)
def call(self, decl: FuncDecl, args: Sequence[Value],
arg_kinds: List[int],
arg_names: Sequence[Optional[str]],
line: int) -> Value:
# Normalize args to positionals.
args = self.native_args_to_positional(
args, arg_kinds, arg_names, decl.sig, line)
return self.add(Call(decl, args, line))
def visit_call_expr(self, expr: CallExpr) -> Value:
if isinstance(expr.analyzed, CastExpr):
return self.translate_cast_expr(expr.analyzed)
callee = expr.callee
if isinstance(callee, IndexExpr) and isinstance(callee.analyzed, TypeApplication):
callee = callee.analyzed.expr # Unwrap type application
if isinstance(callee, MemberExpr):
return self.translate_method_call(expr, callee)
elif isinstance(callee, SuperExpr):
return self.translate_super_method_call(expr, callee)
else:
return self.translate_call(expr, callee)
def translate_call(self, expr: CallExpr, callee: Expression) -> Value:
# The common case of calls is refexprs
if isinstance(callee, RefExpr):
return self.translate_refexpr_call(expr, callee)
function = self.accept(callee)
args = [self.accept(arg) for arg in expr.args]
return self.py_call(function, args, expr.line,
arg_kinds=expr.arg_kinds, arg_names=expr.arg_names)
def translate_refexpr_call(self, expr: CallExpr, callee: RefExpr) -> Value:
"""Translate a non-method call."""
# TODO: Allow special cases to have default args or named args. Currently they don't since
# they check that everything in arg_kinds is ARG_POS.
# If there is a specializer for this function, try calling it.
if callee.fullname and (callee.fullname, None) in specializers:
val = specializers[callee.fullname, None](self, expr, callee)
if val is not None:
return val
# Gen the argument values
arg_values = [self.accept(arg) for arg in expr.args]
return self.call_refexpr_with_args(expr, callee, arg_values)
def call_refexpr_with_args(
self, expr: CallExpr, callee: RefExpr, arg_values: List[Value]) -> Value:
# Handle data-driven special-cased primitive call ops.
if callee.fullname is not None and expr.arg_kinds == [ARG_POS] * len(arg_values):
ops = func_ops.get(callee.fullname, [])
target = self.matching_primitive_op(ops, arg_values, expr.line, self.node_type(expr))
if target:
return target
# Standard native call if signature and fullname are good and all arguments are positional
# or named.
callee_node = callee.node
if isinstance(callee_node, OverloadedFuncDef):
callee_node = callee_node.impl
if (callee_node is not None
and callee.fullname is not None
and callee_node in self.mapper.func_to_decl
and all(kind in (ARG_POS, ARG_NAMED) for kind in expr.arg_kinds)):
decl = self.mapper.func_to_decl[callee_node]
return self.call(decl, arg_values, expr.arg_kinds, expr.arg_names, expr.line)
# Fall back to a Python call
function = self.accept(callee)
return self.py_call(function, arg_values, expr.line,
arg_kinds=expr.arg_kinds, arg_names=expr.arg_names)
def translate_method_call(self, expr: CallExpr, callee: MemberExpr) -> Value:
"""Generate IR for an arbitrary call of form e.m(...).
This can also deal with calls to module-level functions.
"""
if self.is_native_ref_expr(callee):
# Call to module-level native function or such
return self.translate_call(expr, callee)
elif isinstance(callee.expr, RefExpr) and callee.expr.node in self.mapper.type_to_ir:
# Call a method via the *class*
assert isinstance(callee.expr.node, TypeInfo)
ir = self.mapper.type_to_ir[callee.expr.node]
decl = ir.method_decl(callee.name)
args = []
arg_kinds, arg_names = expr.arg_kinds[:], expr.arg_names[:]
# Add the class argument for class methods in extension classes
if decl.kind == FUNC_CLASSMETHOD and ir.is_ext_class:
args.append(self.load_native_type_object(callee.expr.node.fullname()))
arg_kinds.insert(0, ARG_POS)
arg_names.insert(0, None)
args += [self.accept(arg) for arg in expr.args]
if ir.is_ext_class:
return self.call(decl, args, arg_kinds, arg_names, expr.line)
else:
obj = self.accept(callee.expr)
return self.gen_method_call(obj,
callee.name,
args,
self.node_type(expr),
expr.line,
expr.arg_kinds,
expr.arg_names)
elif self.is_module_member_expr(callee):
# Fall back to a PyCall for non-native module calls
function = self.accept(callee)
args = [self.accept(arg) for arg in expr.args]
return self.py_call(function, args, expr.line,
arg_kinds=expr.arg_kinds, arg_names=expr.arg_names)
else:
receiver_typ = self.node_type(callee.expr)
# If there is a specializer for this method name/type, try calling it.
if (callee.name, receiver_typ) in specializers:
val = specializers[callee.name, receiver_typ](self, expr, callee)
if val is not None:
return val
obj = self.accept(callee.expr)
args = [self.accept(arg) for arg in expr.args]
return self.gen_method_call(obj,
callee.name,
args,
self.node_type(expr),
expr.line,
expr.arg_kinds,
expr.arg_names)
def translate_super_method_call(self, expr: CallExpr, callee: SuperExpr) -> Value:
if callee.info is None or callee.call.args:
return self.translate_call(expr, callee)
ir = self.mapper.type_to_ir[callee.info]
# Search for the method in the mro, skipping ourselves.
for base in ir.mro[1:]:
if callee.name in base.method_decls:
break
else:
return self.translate_call(expr, callee)
decl = base.method_decl(callee.name)
arg_values = [self.accept(arg) for arg in expr.args]
arg_kinds, arg_names = expr.arg_kinds[:], expr.arg_names[:]
if decl.kind != FUNC_STATICMETHOD:
vself = next(iter(self.environment.indexes)) # grab first argument
if decl.kind == FUNC_CLASSMETHOD:
vself = self.primitive_op(type_op, [vself], expr.line)
elif self.fn_info.is_generator:
# For generator classes, the self target is the 6th value
# in the symbol table (which is an ordered dict). This is sort
# of ugly, but we can't search by name since the 'self' parameter
# could be named anything, and it doesn't get added to the
# environment indexes.
self_targ = list(self.environment.symtable.values())[6]
vself = self.read(self_targ, self.fn_info.fitem.line)
arg_values.insert(0, vself)
arg_kinds.insert(0, ARG_POS)
arg_names.insert(0, None)
return self.call(decl, arg_values, arg_kinds, arg_names, expr.line)
def gen_method_call(self,
base: Value,
name: str,
arg_values: List[Value],
return_rtype: Optional[RType],
line: int,
arg_kinds: Optional[List[int]] = None,
arg_names: Optional[List[Optional[str]]] = None) -> Value:
# If arg_kinds contains values other than arg_pos and arg_named, then fallback to
# Python method call.
if (arg_kinds is not None
and not all(kind in (ARG_POS, ARG_NAMED) for kind in arg_kinds)):
return self.py_method_call(base, name, arg_values, base.line, arg_kinds, arg_names)
# If the base type is one of ours, do a MethodCall
if (isinstance(base.type, RInstance) and base.type.class_ir.is_ext_class
and not base.type.class_ir.builtin_base):
if base.type.class_ir.has_method(name):
decl = base.type.class_ir.method_decl(name)
if arg_kinds is None:
assert arg_names is None, "arg_kinds not present but arg_names is"
arg_kinds = [ARG_POS for _ in arg_values]
arg_names = [None for _ in arg_values]
else:
assert arg_names is not None, "arg_kinds present but arg_names is not"
# Normalize args to positionals.
assert decl.bound_sig
arg_values = self.native_args_to_positional(
arg_values, arg_kinds, arg_names, decl.bound_sig, line)
return self.add(MethodCall(base, name, arg_values, line))
elif base.type.class_ir.has_attr(name):
function = self.add(GetAttr(base, name, line))
return self.py_call(function, arg_values, line,
arg_kinds=arg_kinds, arg_names=arg_names)
elif isinstance(base.type, RUnion):
return self.union_method_call(base, base.type, name, arg_values, return_rtype, line,
arg_kinds, arg_names)
# Try to do a special-cased method call
if not arg_kinds or arg_kinds == [ARG_POS] * len(arg_values):
target = self.translate_special_method_call(base, name, arg_values, return_rtype, line)
if target:
return target
# Fall back to Python method call
return self.py_method_call(base, name, arg_values, line, arg_kinds, arg_names)
def union_method_call(self,
base: Value,
obj_type: RUnion,
name: str,
arg_values: List[Value],
return_rtype: Optional[RType],
line: int,
arg_kinds: Optional[List[int]],
arg_names: Optional[List[Optional[str]]]) -> Value:
# Union method call needs a return_rtype for the type of the output register.
# If we don't have one, use object_rprimitive.
return_rtype = return_rtype or object_rprimitive
def call_union_item(value: Value) -> Value:
return self.gen_method_call(value, name, arg_values, return_rtype, line,
arg_kinds, arg_names)
return self.decompose_union_helper(base, obj_type, return_rtype, call_union_item, line)
def translate_cast_expr(self, expr: CastExpr) -> Value:
src = self.accept(expr.expr)
target_type = self.type_to_rtype(expr.type)
return self.coerce(src, target_type, expr.line)
def shortcircuit_helper(self, op: str,
expr_type: RType,
left: Callable[[], Value],
right: Callable[[], Value], line: int) -> Value:
# Having actual Phi nodes would be really nice here!
target = self.alloc_temp(expr_type)
# left_body takes the value of the left side, right_body the right
left_body, right_body, next = BasicBlock(), BasicBlock(), BasicBlock()
# true_body is taken if the left is true, false_body if it is false.
# For 'and' the value is the right side if the left is true, and for 'or'
# it is the right side if the left is false.
true_body, false_body = (
(right_body, left_body) if op == 'and' else (left_body, right_body))
left_value = left()
self.add_bool_branch(left_value, true_body, false_body)
self.activate_block(left_body)
left_coerced = self.coerce(left_value, expr_type, line)
self.add(Assign(target, left_coerced))
self.goto(next)
self.activate_block(right_body)
right_value = right()
right_coerced = self.coerce(right_value, expr_type, line)
self.add(Assign(target, right_coerced))
self.goto(next)
self.activate_block(next)
return target
def shortcircuit_expr(self, expr: OpExpr) -> Value:
return self.shortcircuit_helper(
expr.op, self.node_type(expr),
lambda: self.accept(expr.left),
lambda: self.accept(expr.right),
expr.line
)
def visit_conditional_expr(self, expr: ConditionalExpr) -> Value:
if_body, else_body, next = BasicBlock(), BasicBlock(), BasicBlock()
self.process_conditional(expr.cond, if_body, else_body)
expr_type = self.node_type(expr)
# Having actual Phi nodes would be really nice here!
target = self.alloc_temp(expr_type)
self.activate_block(if_body)
true_value = self.accept(expr.if_expr)
true_value = self.coerce(true_value, expr_type, expr.line)
self.add(Assign(target, true_value))
self.goto(next)
self.activate_block(else_body)
false_value = self.accept(expr.else_expr)
false_value = self.coerce(false_value, expr_type, expr.line)
self.add(Assign(target, false_value))
self.goto(next)
self.activate_block(next)
return target
def translate_special_method_call(self,
base_reg: Value,
name: str,
args: List[Value],
result_type: Optional[RType],
line: int) -> Optional[Value]:
"""Translate a method call which is handled nongenerically.
These are special in the sense that we have code generated specifically for them.
They tend to be method calls which have equivalents in C that are more direct
than calling with the PyObject api.
Return None if no translation found; otherwise return the target register.
"""
ops = method_ops.get(name, [])
return self.matching_primitive_op(ops, [base_reg] + args, line, result_type=result_type)
def visit_list_expr(self, expr: ListExpr) -> Value:
return self._visit_list_display(expr.items, expr.line)
def _visit_list_display(self, items: List[Expression], line: int) -> Value:
return self._visit_display(
items,
new_list_op,
list_append_op,
list_extend_op,
line
)
def _visit_display(self,
items: List[Expression],
constructor_op: OpDescription,
append_op: OpDescription,
extend_op: OpDescription,
line: int
) -> Value:
accepted_items = []
for item in items:
if isinstance(item, StarExpr):
accepted_items.append((True, self.accept(item.expr)))
else:
accepted_items.append((False, self.accept(item)))
result = None # type: Union[Value, None]
initial_items = []
for starred, value in accepted_items:
if result is None and not starred and constructor_op.is_var_arg:
initial_items.append(value)
continue
if result is None:
result = self.primitive_op(constructor_op, initial_items, line)
self.primitive_op(extend_op if starred else append_op, [result, value], line)
if result is None:
result = self.primitive_op(constructor_op, initial_items, line)
return result
def visit_tuple_expr(self, expr: TupleExpr) -> Value:
if any(isinstance(item, StarExpr) for item in expr.items):
# create a tuple of unknown length
return self._visit_tuple_display(expr)
# create an tuple of fixed length (RTuple)
tuple_type = self.node_type(expr)
# When handling NamedTuple et. al we might not have proper type info,
# so make some up if we need it.
types = (tuple_type.types if isinstance(tuple_type, RTuple)
else [object_rprimitive] * len(expr.items))
items = []
for item_expr, item_type in zip(expr.items, types):
reg = self.accept(item_expr)
items.append(self.coerce(reg, item_type, item_expr.line))
return self.add(TupleSet(items, expr.line))
def _visit_tuple_display(self, expr: TupleExpr) -> Value:
"""Create a list, then turn it into a tuple."""
val_as_list = self._visit_list_display(expr.items, expr.line)
return self.primitive_op(list_tuple_op, [val_as_list], expr.line)
def visit_dict_expr(self, expr: DictExpr) -> Value:
"""First accepts all keys and values, then makes a dict out of them."""
key_value_pairs = []
for key_expr, value_expr in expr.items:
key = self.accept(key_expr) if key_expr is not None else None
value = self.accept(value_expr)
key_value_pairs.append((key, value))
return self.make_dict(key_value_pairs, expr.line)
def visit_set_expr(self, expr: SetExpr) -> Value:
return self._visit_display(
expr.items,
new_set_op,
set_add_op,
set_update_op,
expr.line
)
def visit_str_expr(self, expr: StrExpr) -> Value:
return self.load_static_unicode(expr.value)
# Conditional expressions
def process_conditional(self, e: Expression, true: BasicBlock, false: BasicBlock) -> None:
if isinstance(e, OpExpr) and e.op in ['and', 'or']:
if e.op == 'and':
# Short circuit 'and' in a conditional context.
new = BasicBlock()
self.process_conditional(e.left, new, false)
self.activate_block(new)
self.process_conditional(e.right, true, false)
else:
# Short circuit 'or' in a conditional context.
new = BasicBlock()
self.process_conditional(e.left, true, new)
self.activate_block(new)
self.process_conditional(e.right, true, false)
elif isinstance(e, UnaryExpr) and e.op == 'not':
self.process_conditional(e.expr, false, true)
# Catch-all for arbitrary expressions.
else:
reg = self.accept(e)
self.add_bool_branch(reg, true, false)
def visit_basic_comparison(self, op: str, left: Value, right: Value, line: int) -> Value:
negate = False
if op == 'is not':
op, negate = 'is', True
elif op == 'not in':
op, negate = 'in', True
target = self.binary_op(left, right, op, line)
if negate:
target = self.unary_op(target, 'not', line)
return target
def visit_comparison_expr(self, e: ComparisonExpr) -> Value:
# TODO: Don't produce an expression when used in conditional context
# All of the trickiness here is due to support for chained conditionals
# (`e1 < e2 > e3`, etc). `e1 < e2 > e3` is approximately equivalent to
# `e1 < e2 and e2 > e3` except that `e2` is only evaluated once.
expr_type = self.node_type(e)
# go(i, prev) generates code for `ei opi e{i+1} op{i+1} ... en`,
# assuming that prev contains the value of `ei`.
def go(i: int, prev: Value) -> Value:
if i == len(e.operators) - 1:
return self.visit_basic_comparison(
e.operators[i], prev, self.accept(e.operands[i + 1]), e.line)
next = self.accept(e.operands[i + 1])
return self.shortcircuit_helper(
'and', expr_type,
lambda: self.visit_basic_comparison(
e.operators[i], prev, next, e.line),
lambda: go(i + 1, next),
e.line)
return go(0, self.accept(e.operands[0]))
def add_bool_branch(self, value: Value, true: BasicBlock, false: BasicBlock) -> None:
if is_runtime_subtype(value.type, int_rprimitive):
zero = self.add(LoadInt(0))
value = self.binary_op(value, zero, '!=', value.line)
elif is_same_type(value.type, list_rprimitive):
length = self.primitive_op(list_len_op, [value], value.line)
zero = self.add(LoadInt(0))
value = self.binary_op(length, zero, '!=', value.line)
elif (isinstance(value.type, RInstance) and value.type.class_ir.is_ext_class
and value.type.class_ir.has_method('__bool__')):
# Directly call the __bool__ method on classes that have it.
value = self.gen_method_call(value, '__bool__', [], bool_rprimitive, value.line)
else:
value_type = optional_value_type(value.type)
if value_type is not None:
is_none = self.binary_op(value, self.none_object(), 'is not', value.line)
branch = Branch(is_none, true, false, Branch.BOOL_EXPR)
self.add(branch)
always_truthy = False
if isinstance(value_type, RInstance):
# check whether X.__bool__ is always just the default (object.__bool__)
if (not value_type.class_ir.has_method('__bool__')
and value_type.class_ir.is_method_final('__bool__')):
always_truthy = True
if not always_truthy:
# Optional[X] where X may be falsey and requires a check
branch.true = self.new_block()
# unbox_or_cast instead of coerce because we want the
# type to change even if it is a subtype.
remaining = self.unbox_or_cast(value, value_type, value.line)
self.add_bool_branch(remaining, true, false)
return
elif not is_same_type(value.type, bool_rprimitive):
value = self.primitive_op(bool_op, [value], value.line)
self.add(Branch(value, true, false, Branch.BOOL_EXPR))
def visit_nonlocal_decl(self, o: NonlocalDecl) -> None:
pass
def visit_slice_expr(self, expr: SliceExpr) -> Value:
def get_arg(arg: Optional[Expression]) -> Value:
if arg is None:
return self.none_object()
else:
return self.accept(arg)
args = [get_arg(expr.begin_index),
get_arg(expr.end_index),
get_arg(expr.stride)]
return self.primitive_op(new_slice_op, args, expr.line)
def visit_raise_stmt(self, s: RaiseStmt) -> None:
if s.expr is None:
self.primitive_op(reraise_exception_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
return
exc = self.accept(s.expr)
self.primitive_op(raise_exception_op, [exc], s.line)
self.add(Unreachable())
def visit_try_except(self,
body: GenFunc,
handlers: Sequence[
Tuple[Optional[Expression], Optional[Expression], GenFunc]],
else_body: Optional[GenFunc],
line: int) -> None:
"""Generalized try/except/else handling that takes functions to gen the bodies.
The point of this is to also be able to support with."""
assert handlers, "try needs except"
except_entry, exit_block, cleanup_block = BasicBlock(), BasicBlock(), BasicBlock()
double_except_block = BasicBlock()
# If there is an else block, jump there after the try, otherwise just leave
else_block = BasicBlock() if else_body else exit_block
# Compile the try block with an error handler
self.error_handlers.append(except_entry)
self.goto_and_activate(BasicBlock())
body()
self.goto(else_block)
self.error_handlers.pop()
# The error handler catches the error and then checks it
# against the except clauses. We compile the error handler
# itself with an error handler so that it can properly restore
# the *old* exc_info if an exception occurs.
# The exception chaining will be done automatically when the
# exception is raised, based on the exception in exc_info.
self.error_handlers.append(double_except_block)
self.activate_block(except_entry)
old_exc = self.maybe_spill(self.primitive_op(error_catch_op, [], line))
# Compile the except blocks with the nonlocal control flow overridden to clear exc_info
self.nonlocal_control.append(
ExceptNonlocalControl(self.nonlocal_control[-1], old_exc))
# Process the bodies
for type, var, handler_body in handlers:
next_block = None
if type:
next_block, body_block = BasicBlock(), BasicBlock()
matches = self.primitive_op(exc_matches_op, [self.accept(type)], type.line)
self.add(Branch(matches, body_block, next_block, Branch.BOOL_EXPR))
self.activate_block(body_block)
if var:
target = self.get_assignment_target(var)
self.assign(target, self.primitive_op(get_exc_value_op, [], var.line), var.line)
handler_body()
self.goto(cleanup_block)
if next_block:
self.activate_block(next_block)
# Reraise the exception if needed
if next_block:
self.primitive_op(reraise_exception_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
self.nonlocal_control.pop()
self.error_handlers.pop()
# Cleanup for if we leave except through normal control flow:
# restore the saved exc_info information and continue propagating
# the exception if it exists.
self.activate_block(cleanup_block)
self.primitive_op(restore_exc_info_op, [self.read(old_exc)], line)
self.goto(exit_block)
# Cleanup for if we leave except through a raised exception:
# restore the saved exc_info information and continue propagating
# the exception.
self.activate_block(double_except_block)
self.primitive_op(restore_exc_info_op, [self.read(old_exc)], line)
self.primitive_op(keep_propagating_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
# If present, compile the else body in the obvious way
if else_body:
self.activate_block(else_block)
else_body()
self.goto(exit_block)
self.activate_block(exit_block)
def visit_try_except_stmt(self, t: TryStmt) -> None:
def body() -> None:
self.accept(t.body)
# Work around scoping woes
def make_handler(body: Block) -> GenFunc:
return lambda: self.accept(body)
handlers = [(type, var, make_handler(body)) for type, var, body in
zip(t.types, t.vars, t.handlers)]
else_body = (lambda: self.accept(t.else_body)) if t.else_body else None
self.visit_try_except(body, handlers, else_body, t.line)
def try_finally_try(self, err_handler: BasicBlock, return_entry: BasicBlock,
main_entry: BasicBlock, try_body: GenFunc) -> Optional[Register]:
# Compile the try block with an error handler
control = TryFinallyNonlocalControl(return_entry)
self.error_handlers.append(err_handler)
self.nonlocal_control.append(control)
self.goto_and_activate(BasicBlock())
try_body()
self.goto(main_entry)
self.nonlocal_control.pop()
self.error_handlers.pop()
return control.ret_reg
def try_finally_entry_blocks(self,
err_handler: BasicBlock, return_entry: BasicBlock,
main_entry: BasicBlock, finally_block: BasicBlock,
ret_reg: Optional[Register]) -> Value:
old_exc = self.alloc_temp(exc_rtuple)
# Entry block for non-exceptional flow
self.activate_block(main_entry)
if ret_reg:
self.add(Assign(ret_reg, self.add(LoadErrorValue(self.ret_types[-1]))))
self.goto(return_entry)
self.activate_block(return_entry)
self.add(Assign(old_exc, self.add(LoadErrorValue(exc_rtuple))))
self.goto(finally_block)
# Entry block for errors
self.activate_block(err_handler)
if ret_reg:
self.add(Assign(ret_reg, self.add(LoadErrorValue(self.ret_types[-1]))))
self.add(Assign(old_exc, self.primitive_op(error_catch_op, [], -1)))
self.goto(finally_block)
return old_exc
def try_finally_body(
self, finally_block: BasicBlock, finally_body: GenFunc,
ret_reg: Optional[Value], old_exc: Value) -> Tuple[BasicBlock,
'FinallyNonlocalControl']:
cleanup_block = BasicBlock()
# Compile the finally block with the nonlocal control flow overridden to restore exc_info
self.error_handlers.append(cleanup_block)
finally_control = FinallyNonlocalControl(
self.nonlocal_control[-1], ret_reg, old_exc)
self.nonlocal_control.append(finally_control)
self.activate_block(finally_block)
finally_body()
self.nonlocal_control.pop()
return cleanup_block, finally_control
def try_finally_resolve_control(self, cleanup_block: BasicBlock,
finally_control: FinallyNonlocalControl,
old_exc: Value, ret_reg: Optional[Value]) -> BasicBlock:
"""Resolve the control flow out of a finally block.
This means returning if there was a return, propagating
exceptions, break/continue (soon), or just continuing on.
"""
reraise, rest = BasicBlock(), BasicBlock()
self.add(Branch(old_exc, rest, reraise, Branch.IS_ERROR))
# Reraise the exception if there was one
self.activate_block(reraise)
self.primitive_op(reraise_exception_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
self.error_handlers.pop()
# If there was a return, keep returning
if ret_reg:
self.activate_block(rest)
return_block, rest = BasicBlock(), BasicBlock()
self.add(Branch(ret_reg, rest, return_block, Branch.IS_ERROR))
self.activate_block(return_block)
self.nonlocal_control[-1].gen_return(self, ret_reg, -1)
# TODO: handle break/continue
self.activate_block(rest)
out_block = BasicBlock()
self.goto(out_block)
# If there was an exception, restore again
self.activate_block(cleanup_block)
finally_control.gen_cleanup(self, -1)
self.primitive_op(keep_propagating_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
return out_block
def visit_try_finally_stmt(self, try_body: GenFunc, finally_body: GenFunc) -> None:
"""Generalized try/finally handling that takes functions to gen the bodies.
The point of this is to also be able to support with."""
# Finally is a big pain, because there are so many ways that
# exits can occur. We emit 10+ basic blocks for every finally!
err_handler, main_entry, return_entry, finally_block = (
BasicBlock(), BasicBlock(), BasicBlock(), BasicBlock())
# Compile the body of the try
ret_reg = self.try_finally_try(
err_handler, return_entry, main_entry, try_body)
# Set up the entry blocks for the finally statement
old_exc = self.try_finally_entry_blocks(
err_handler, return_entry, main_entry, finally_block, ret_reg)
# Compile the body of the finally
cleanup_block, finally_control = self.try_finally_body(
finally_block, finally_body, ret_reg, old_exc)
# Resolve the control flow out of the finally block
out_block = self.try_finally_resolve_control(
cleanup_block, finally_control, old_exc, ret_reg)
self.activate_block(out_block)
def visit_try_stmt(self, t: TryStmt) -> None:
# Our compilation strategy for try/except/else/finally is to
# treat try/except/else and try/finally as separate language
# constructs that we compile separately. When we have a
# try/except/else/finally, we treat the try/except/else as the
# body of a try/finally block.
if t.finally_body:
def visit_try_body() -> None:
if t.handlers:
self.visit_try_except_stmt(t)
else:
self.accept(t.body)
body = t.finally_body
self.visit_try_finally_stmt(visit_try_body, lambda: self.accept(body))
else:
self.visit_try_except_stmt(t)
def get_sys_exc_info(self) -> List[Value]:
exc_info = self.primitive_op(get_exc_info_op, [], -1)
return [self.add(TupleGet(exc_info, i, -1)) for i in range(3)]
def visit_with(self, expr: Expression, target: Optional[Lvalue],
body: GenFunc, line: int) -> None:
# This is basically a straight transcription of the Python code in PEP 343.
# I don't actually understand why a bunch of it is the way it is.
# We could probably optimize the case where the manager is compiled by us,
# but that is not our common case at all, so.
mgr_v = self.accept(expr)
typ = self.primitive_op(type_op, [mgr_v], line)
exit_ = self.maybe_spill(self.py_get_attr(typ, '__exit__', line))
value = self.py_call(self.py_get_attr(typ, '__enter__', line), [mgr_v], line)
mgr = self.maybe_spill(mgr_v)
exc = self.maybe_spill_assignable(self.primitive_op(true_op, [], -1))
def try_body() -> None:
if target:
self.assign(self.get_assignment_target(target), value, line)
body()
def except_body() -> None:
self.assign(exc, self.primitive_op(false_op, [], -1), line)
out_block, reraise_block = BasicBlock(), BasicBlock()
self.add_bool_branch(self.py_call(self.read(exit_),
[self.read(mgr)] + self.get_sys_exc_info(), line),
out_block, reraise_block)
self.activate_block(reraise_block)
self.primitive_op(reraise_exception_op, [], NO_TRACEBACK_LINE_NO)
self.add(Unreachable())
self.activate_block(out_block)
def finally_body() -> None:
out_block, exit_block = BasicBlock(), BasicBlock()
self.add(Branch(self.read(exc), exit_block, out_block, Branch.BOOL_EXPR))
self.activate_block(exit_block)
none = self.none_object()
self.py_call(self.read(exit_), [self.read(mgr), none, none, none], line)
self.goto_and_activate(out_block)
self.visit_try_finally_stmt(
lambda: self.visit_try_except(try_body, [(None, None, except_body)], None, line),
finally_body)
def visit_with_stmt(self, o: WithStmt) -> None:
# Generate separate logic for each expr in it, left to right
def generate(i: int) -> None:
if i >= len(o.expr):
self.accept(o.body)
else:
self.visit_with(o.expr[i], o.target[i], lambda: generate(i + 1), o.line)
generate(0)
def visit_lambda_expr(self, expr: LambdaExpr) -> Value:
typ = get_proper_type(self.types[expr])
assert isinstance(typ, CallableType)
runtime_args = []
for arg, arg_type in zip(expr.arguments, typ.arg_types):
arg.variable.type = arg_type
runtime_args.append(
RuntimeArg(arg.variable.name(), self.type_to_rtype(arg_type), arg.kind))
ret_type = self.type_to_rtype(typ.ret_type)
fsig = FuncSignature(runtime_args, ret_type)
fname = '{}{}'.format(LAMBDA_NAME, self.lambda_counter)
func_ir, func_reg = self.gen_func_item(expr, fname, fsig)
assert func_reg is not None
self.functions.append(func_ir)
return func_reg
def visit_pass_stmt(self, o: PassStmt) -> None:
pass
def visit_global_decl(self, o: GlobalDecl) -> None:
# Pure declaration -- no runtime effect
pass
def visit_assert_stmt(self, a: AssertStmt) -> None:
if self.options.strip_asserts:
return
cond = self.accept(a.expr)
ok_block, error_block = BasicBlock(), BasicBlock()
self.add_bool_branch(cond, ok_block, error_block)
self.activate_block(error_block)
if a.msg is None:
# Special case (for simpler generated code)
self.add(RaiseStandardError(RaiseStandardError.ASSERTION_ERROR, None, a.line))
elif isinstance(a.msg, StrExpr):
# Another special case
self.add(RaiseStandardError(RaiseStandardError.ASSERTION_ERROR, a.msg.value,
a.line))
else:
# The general case -- explicitly construct an exception instance
message = self.accept(a.msg)
exc_type = self.load_module_attr_by_fullname('builtins.AssertionError', a.line)
exc = self.py_call(exc_type, [message], a.line)
self.primitive_op(raise_exception_op, [exc], a.line)
self.add(Unreachable())
self.activate_block(ok_block)
def translate_list_comprehension(self, gen: GeneratorExpr) -> Value:
list_ops = self.primitive_op(new_list_op, [], gen.line)
loop_params = list(zip(gen.indices, gen.sequences, gen.condlists))
def gen_inner_stmts() -> None:
e = self.accept(gen.left_expr)
self.primitive_op(list_append_op, [list_ops, e], gen.line)
self.comprehension_helper(loop_params, gen_inner_stmts, gen.line)
return list_ops
def visit_list_comprehension(self, o: ListComprehension) -> Value:
return self.translate_list_comprehension(o.generator)
def visit_set_comprehension(self, o: SetComprehension) -> Value:
gen = o.generator
set_ops = self.primitive_op(new_set_op, [], o.line)
loop_params = list(zip(gen.indices, gen.sequences, gen.condlists))
def gen_inner_stmts() -> None:
e = self.accept(gen.left_expr)
self.primitive_op(set_add_op, [set_ops, e], o.line)
self.comprehension_helper(loop_params, gen_inner_stmts, o.line)
return set_ops
def visit_dictionary_comprehension(self, o: DictionaryComprehension) -> Value:
d = self.primitive_op(new_dict_op, [], o.line)
loop_params = list(zip(o.indices, o.sequences, o.condlists))
def gen_inner_stmts() -> None:
k = self.accept(o.key)
v = self.accept(o.value)
self.primitive_op(dict_set_item_op, [d, k, v], o.line)
self.comprehension_helper(loop_params, gen_inner_stmts, o.line)
return d
def visit_generator_expr(self, o: GeneratorExpr) -> Value:
self.warning('Treating generator comprehension as list', o.line)
return self.primitive_op(iter_op, [self.translate_list_comprehension(o)], o.line)
def comprehension_helper(self,
loop_params: List[Tuple[Lvalue, Expression, List[Expression]]],
gen_inner_stmts: Callable[[], None],
line: int) -> None:
"""Helper function for list comprehensions.
"loop_params" is a list of (index, expr, [conditions]) tuples defining nested loops:
- "index" is the Lvalue indexing that loop;
- "expr" is the expression for the object to be iterated over;
- "conditions" is a list of conditions, evaluated in order with short-circuiting,
that must all be true for the loop body to be executed
"gen_inner_stmts" is a function to generate the IR for the body of the innermost loop
"""
def handle_loop(loop_params: List[Tuple[Lvalue, Expression, List[Expression]]]) -> None:
"""Generate IR for a loop.
Given a list of (index, expression, [conditions]) tuples, generate IR
for the nested loops the list defines.
"""
index, expr, conds = loop_params[0]
self.for_loop_helper(index, expr,
lambda: loop_contents(conds, loop_params[1:]),
None, line)
def loop_contents(
conds: List[Expression],
remaining_loop_params: List[Tuple[Lvalue, Expression, List[Expression]]],
) -> None:
"""Generate the body of the loop.
"conds" is a list of conditions to be evaluated (in order, with short circuiting)
to gate the body of the loop.
"remaining_loop_params" is the parameters for any further nested loops; if it's empty
we'll instead evaluate the "gen_inner_stmts" function.
"""
# Check conditions, in order, short circuiting them.
for cond in conds:
cond_val = self.accept(cond)
cont_block, rest_block = BasicBlock(), BasicBlock()
# If the condition is true we'll skip the continue.
self.add_bool_branch(cond_val, rest_block, cont_block)
self.activate_block(cont_block)
self.nonlocal_control[-1].gen_continue(self, cond.line)
self.goto_and_activate(rest_block)
if remaining_loop_params:
# There's another nested level, so the body of this loop is another loop.
return handle_loop(remaining_loop_params)
else:
# We finally reached the actual body of the generator.
# Generate the IR for the inner loop body.
gen_inner_stmts()
handle_loop(loop_params)
def visit_decorator(self, dec: Decorator) -> None:
func_ir, func_reg = self.gen_func_item(dec.func, dec.func.name(),
self.mapper.fdef_to_sig(dec.func))
if dec.func in self.nested_fitems:
assert func_reg is not None
decorated_func = self.load_decorated_func(dec.func, func_reg)
self.assign(self.get_func_target(dec.func), decorated_func, dec.func.line)
func_reg = decorated_func
else:
# Obtain the the function name in order to construct the name of the helper function.
name = dec.func.fullname().split('.')[-1]
helper_name = decorator_helper_name(name)
# Load the callable object representing the non-decorated function, and decorate it.
orig_func = self.load_global_str(helper_name, dec.line)
decorated_func = self.load_decorated_func(dec.func, orig_func)
# Set the callable object representing the decorated function as a global.
self.primitive_op(dict_set_item_op,
[self.load_globals_dict(),
self.load_static_unicode(dec.func.name()), decorated_func],
decorated_func.line)
self.functions.append(func_ir)
def visit_del_stmt(self, o: DelStmt) -> None:
self.visit_del_item(self.get_assignment_target(o.expr), o.line)
def visit_del_item(self, target: AssignmentTarget, line: int) -> None:
if isinstance(target, AssignmentTargetIndex):
self.translate_special_method_call(
target.base,
'__delitem__',
[target.index],
result_type=None,
line=line
)
elif isinstance(target, AssignmentTargetAttr):
key = self.load_static_unicode(target.attr)
self.add(PrimitiveOp([target.obj, key], py_delattr_op, line))
elif isinstance(target, AssignmentTargetRegister):
# Delete a local by assigning an error value to it, which will
# prompt the insertion of uninit checks.
self.add(Assign(target.register,
self.add(LoadErrorValue(target.type, undefines=True))))
elif isinstance(target, AssignmentTargetTuple):
for subtarget in target.items:
self.visit_del_item(subtarget, line)
def visit_super_expr(self, o: SuperExpr) -> Value:
# self.warning('can not optimize super() expression', o.line)
sup_val = self.load_module_attr_by_fullname('builtins.super', o.line)
if o.call.args:
args = [self.accept(arg) for arg in o.call.args]
else:
assert o.info is not None
typ = self.load_native_type_object(o.info.fullname())
ir = self.mapper.type_to_ir[o.info]
iter_env = iter(self.environment.indexes)
vself = next(iter_env) # grab first argument
if self.fn_info.is_generator:
# grab sixth argument (see comment in translate_super_method_call)
self_targ = list(self.environment.symtable.values())[6]
vself = self.read(self_targ, self.fn_info.fitem.line)
elif not ir.is_ext_class:
vself = next(iter_env) # second argument is self if non_extension class
args = [typ, vself]
res = self.py_call(sup_val, args, o.line)
return self.py_get_attr(res, o.name, o.line)
def visit_yield_expr(self, expr: YieldExpr) -> Value:
if expr.expr:
retval = self.accept(expr.expr)
else:
retval = self.none()
return self.emit_yield(retval, expr.line)
def emit_yield(self, val: Value, line: int) -> Value:
retval = self.coerce(val, self.ret_types[-1], line)
cls = self.fn_info.generator_class
# Create a new block for the instructions immediately following the yield expression, and
# set the next label so that the next time '__next__' is called on the generator object,
# the function continues at the new block.
next_block = BasicBlock()
next_label = len(cls.blocks)
cls.blocks.append(next_block)
self.assign(cls.next_label_target, self.add(LoadInt(next_label)), line)
self.add(Return(retval))
self.activate_block(next_block)
self.add_raise_exception_blocks_to_generator_class(line)
assert cls.send_arg_reg is not None
return cls.send_arg_reg
def handle_yield_from_and_await(self, o: Union[YieldFromExpr, AwaitExpr]) -> Value:
# This is basically an implementation of the code in PEP 380.
# TODO: do we want to use the right types here?
result = self.alloc_temp(object_rprimitive)
to_yield_reg = self.alloc_temp(object_rprimitive)
received_reg = self.alloc_temp(object_rprimitive)
if isinstance(o, YieldFromExpr):
iter_val = self.primitive_op(iter_op, [self.accept(o.expr)], o.line)
else:
iter_val = self.primitive_op(coro_op, [self.accept(o.expr)], o.line)
iter_reg = self.maybe_spill_assignable(iter_val)
stop_block, main_block, done_block = BasicBlock(), BasicBlock(), BasicBlock()
_y_init = self.primitive_op(next_raw_op, [self.read(iter_reg)], o.line)
self.add(Branch(_y_init, stop_block, main_block, Branch.IS_ERROR))
# Try extracting a return value from a StopIteration and return it.
# If it wasn't, this reraises the exception.
self.activate_block(stop_block)
self.assign(result, self.primitive_op(check_stop_op, [], o.line), o.line)
self.goto(done_block)
self.activate_block(main_block)
self.assign(to_yield_reg, _y_init, o.line)
# OK Now the main loop!
loop_block = BasicBlock()
self.goto_and_activate(loop_block)
def try_body() -> None:
self.assign(received_reg, self.emit_yield(self.read(to_yield_reg), o.line), o.line)
def except_body() -> None:
# The body of the except is all implemented in a C function to
# reduce how much code we need to generate. It returns a value
# indicating whether to break or yield (or raise an exception).
res = self.primitive_op(yield_from_except_op, [self.read(iter_reg)], o.line)
to_stop = self.add(TupleGet(res, 0, o.line))
val = self.add(TupleGet(res, 1, o.line))
ok, stop = BasicBlock(), BasicBlock()
self.add(Branch(to_stop, stop, ok, Branch.BOOL_EXPR))
# The exception got swallowed. Continue, yielding the returned value
self.activate_block(ok)
self.assign(to_yield_reg, val, o.line)
self.nonlocal_control[-1].gen_continue(self, o.line)
# The exception was a StopIteration. Stop iterating.
self.activate_block(stop)
self.assign(result, val, o.line)
self.nonlocal_control[-1].gen_break(self, o.line)
def else_body() -> None:
# Do a next() or a .send(). It will return NULL on exception
# but it won't automatically propagate.
_y = self.primitive_op(send_op, [self.read(iter_reg), self.read(received_reg)], o.line)
ok, stop = BasicBlock(), BasicBlock()
self.add(Branch(_y, stop, ok, Branch.IS_ERROR))
# Everything's fine. Yield it.
self.activate_block(ok)
self.assign(to_yield_reg, _y, o.line)
self.nonlocal_control[-1].gen_continue(self, o.line)
# Try extracting a return value from a StopIteration and return it.
# If it wasn't, this rereaises the exception.
self.activate_block(stop)
self.assign(result, self.primitive_op(check_stop_op, [], o.line), o.line)
self.nonlocal_control[-1].gen_break(self, o.line)
self.push_loop_stack(loop_block, done_block)
self.visit_try_except(try_body, [(None, None, except_body)], else_body, o.line)
self.pop_loop_stack()
self.goto_and_activate(done_block)
return self.read(result)
def visit_yield_from_expr(self, o: YieldFromExpr) -> Value:
return self.handle_yield_from_and_await(o)
def visit_ellipsis(self, o: EllipsisExpr) -> Value:
return self.primitive_op(ellipsis_op, [], o.line)
# Builtin function special cases
@specialize_function('builtins.globals')
def translate_globals(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
# Special case builtins.globals
if len(expr.args) == 0:
return self.load_globals_dict()
return None
@specialize_function('builtins.len')
def translate_len(
self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
# Special case builtins.len
if (len(expr.args) == 1
and expr.arg_kinds == [ARG_POS]):
expr_rtype = self.node_type(expr.args[0])
if isinstance(expr_rtype, RTuple):
# len() of fixed-length tuple can be trivially determined statically,
# though we still need to evaluate it.
self.accept(expr.args[0])
return self.add(LoadInt(len(expr_rtype.types)))
return None
# Special cases for things that consume iterators where we know we
# can safely compile a generator into a list.
@specialize_function('builtins.tuple')
@specialize_function('builtins.set')
@specialize_function('builtins.dict')
@specialize_function('builtins.sum')
@specialize_function('builtins.min')
@specialize_function('builtins.max')
@specialize_function('builtins.sorted')
@specialize_function('collections.OrderedDict')
@specialize_function('join', str_rprimitive)
@specialize_function('extend', list_rprimitive)
@specialize_function('update', dict_rprimitive)
@specialize_function('update', set_rprimitive)
def translate_safe_generator_call(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
if (len(expr.args) > 0
and expr.arg_kinds[0] == ARG_POS
and isinstance(expr.args[0], GeneratorExpr)):
if isinstance(callee, MemberExpr):
return self.gen_method_call(
self.accept(callee.expr), callee.name,
([self.translate_list_comprehension(expr.args[0])]
+ [self.accept(arg) for arg in expr.args[1:]]),
self.node_type(expr), expr.line, expr.arg_kinds, expr.arg_names)
else:
return self.call_refexpr_with_args(
expr, callee,
([self.translate_list_comprehension(expr.args[0])]
+ [self.accept(arg) for arg in expr.args[1:]]))
return None
@specialize_function('builtins.any')
def translate_any_call(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
if (len(expr.args) == 1
and expr.arg_kinds == [ARG_POS]
and isinstance(expr.args[0], GeneratorExpr)):
return self.any_all_helper(expr.args[0], false_op, lambda x: x, true_op)
return None
@specialize_function('builtins.all')
def translate_all_call(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
if (len(expr.args) == 1
and expr.arg_kinds == [ARG_POS]
and isinstance(expr.args[0], GeneratorExpr)):
return self.any_all_helper(expr.args[0],
true_op,
lambda x: self.unary_op(x, 'not', expr.line),
false_op)
return None
# Special case for 'dataclasses.field' and 'attr.Factory' function calls
# because the results of such calls are typechecked by mypy using the types
# of the arguments to their respective functions, resulting in attempted
# coercions by mypyc that throw a runtime error.
@specialize_function('dataclasses.field')
@specialize_function('attr.Factory')
def translate_dataclasses_field_call(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
self.types[expr] = AnyType(TypeOfAny.from_error)
return None
def any_all_helper(self,
gen: GeneratorExpr,
initial_value_op: OpDescription,
modify: Callable[[Value], Value],
new_value_op: OpDescription) -> Value:
retval = self.alloc_temp(bool_rprimitive)
self.assign(retval, self.primitive_op(initial_value_op, [], -1), -1)
loop_params = list(zip(gen.indices, gen.sequences, gen.condlists))
true_block, false_block, exit_block = BasicBlock(), BasicBlock(), BasicBlock()
def gen_inner_stmts() -> None:
comparison = modify(self.accept(gen.left_expr))
self.add_bool_branch(comparison, true_block, false_block)
self.activate_block(true_block)
self.assign(retval, self.primitive_op(new_value_op, [], -1), -1)
self.goto(exit_block)
self.activate_block(false_block)
self.comprehension_helper(loop_params, gen_inner_stmts, gen.line)
self.goto_and_activate(exit_block)
return retval
# Special case for calling next() on a generator expression, an
# idiom that shows up some in mypy.
#
# For example, next(x for x in l if x.id == 12, None) will
# generate code that searches l for an element where x.id == 12
# and produce the first such object, or None if no such element
# exists.
@specialize_function('builtins.next')
def translate_next_call(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
if not (expr.arg_kinds in ([ARG_POS], [ARG_POS, ARG_POS])
and isinstance(expr.args[0], GeneratorExpr)):
return None
gen = expr.args[0]
retval = self.alloc_temp(self.node_type(expr))
default_val = None
if len(expr.args) > 1:
default_val = self.accept(expr.args[1])
exit_block = BasicBlock()
def gen_inner_stmts() -> None:
# next takes the first element of the generator, so if
# something gets produced, we are done.
self.assign(retval, self.accept(gen.left_expr), gen.left_expr.line)
self.goto(exit_block)
loop_params = list(zip(gen.indices, gen.sequences, gen.condlists))
self.comprehension_helper(loop_params, gen_inner_stmts, gen.line)
# Now we need the case for when nothing got hit. If there was
# a default value, we produce it, and otherwise we raise
# StopIteration.
if default_val:
self.assign(retval, default_val, gen.left_expr.line)
self.goto(exit_block)
else:
self.add(RaiseStandardError(RaiseStandardError.STOP_ITERATION, None, expr.line))
self.add(Unreachable())
self.activate_block(exit_block)
return retval
@specialize_function('builtins.isinstance')
def translate_isinstance(self, expr: CallExpr, callee: RefExpr) -> Optional[Value]:
# Special case builtins.isinstance
if (len(expr.args) == 2
and expr.arg_kinds == [ARG_POS, ARG_POS]
and isinstance(expr.args[1], (RefExpr, TupleExpr))):
irs = self.flatten_classes(expr.args[1])
if irs is not None:
return self.isinstance_helper(self.accept(expr.args[0]), irs, expr.line)
return None
def flatten_classes(self, arg: Union[RefExpr, TupleExpr]) -> Optional[List[ClassIR]]:
"""Flatten classes in isinstance(obj, (A, (B, C))).
If at least one item is not a reference to a native class, return None.
"""
if isinstance(arg, RefExpr):
if isinstance(arg.node, TypeInfo) and self.is_native_module_ref_expr(arg):
ir = self.mapper.type_to_ir.get(arg.node)
if ir:
return [ir]
return None
else:
res = [] # type: List[ClassIR]
for item in arg.items:
if isinstance(item, (RefExpr, TupleExpr)):
item_part = self.flatten_classes(item)
if item_part is None:
return None
res.extend(item_part)
else:
return None
return res
def visit_await_expr(self, o: AwaitExpr) -> Value:
return self.handle_yield_from_and_await(o)
# Unimplemented constructs
def visit_assignment_expr(self, o: AssignmentExpr) -> Value:
self.bail("I Am The Walrus (unimplemented)", o.line)
# Unimplemented constructs that shouldn't come up because they are py2 only
def visit_backquote_expr(self, o: BackquoteExpr) -> Value:
self.bail("Python 2 features are unsupported", o.line)
def visit_exec_stmt(self, o: ExecStmt) -> None:
self.bail("Python 2 features are unsupported", o.line)
def visit_print_stmt(self, o: PrintStmt) -> None:
self.bail("Python 2 features are unsupported", o.line)
def visit_unicode_expr(self, o: UnicodeExpr) -> Value:
self.bail("Python 2 features are unsupported", o.line)
# Constructs that shouldn't ever show up
def visit_enum_call_expr(self, o: EnumCallExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit__promote_expr(self, o: PromoteExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_namedtuple_expr(self, o: NamedTupleExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_newtype_expr(self, o: NewTypeExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_temp_node(self, o: TempNode) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_type_alias_expr(self, o: TypeAliasExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_type_application(self, o: TypeApplication) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_type_var_expr(self, o: TypeVarExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_typeddict_expr(self, o: TypedDictExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_reveal_expr(self, o: RevealExpr) -> Value:
assert False, "can't compile analysis-only expressions"
def visit_var(self, o: Var) -> None:
assert False, "can't compile Var; should have been handled already?"
def visit_cast_expr(self, o: CastExpr) -> Value:
assert False, "CastExpr should have been handled in CallExpr"
def visit_star_expr(self, o: StarExpr) -> Value:
assert False, "should have been handled in Tuple/List/Set/DictExpr or CallExpr"
# Helpers
def enter(self, fn_info: FuncInfo) -> None:
self.environment = Environment(fn_info.name)
self.environments.append(self.environment)
self.fn_info = fn_info
self.fn_infos.append(self.fn_info)
self.ret_types.append(none_rprimitive)
self.error_handlers.append(None)
if fn_info.is_generator:
self.nonlocal_control.append(GeneratorNonlocalControl())
else:
self.nonlocal_control.append(BaseNonlocalControl())
self.blocks.append([])
self.new_block()
def activate_block(self, block: BasicBlock) -> None:
if self.blocks[-1]:
assert isinstance(self.blocks[-1][-1].ops[-1], ControlOp)
block.error_handler = self.error_handlers[-1]
self.blocks[-1].append(block)
def goto_and_activate(self, block: BasicBlock) -> None:
self.goto(block)
self.activate_block(block)
def new_block(self) -> BasicBlock:
block = BasicBlock()
self.activate_block(block)
return block
def goto_new_block(self) -> BasicBlock:
block = BasicBlock()
self.goto_and_activate(block)
return block
def leave(self) -> Tuple[List[BasicBlock], Environment, RType, FuncInfo]:
blocks = self.blocks.pop()
env = self.environments.pop()
ret_type = self.ret_types.pop()
fn_info = self.fn_infos.pop()
self.error_handlers.pop()
self.nonlocal_control.pop()
self.environment = self.environments[-1]
self.fn_info = self.fn_infos[-1]
return blocks, env, ret_type, fn_info
def add(self, op: Op) -> Value:
if self.blocks[-1][-1].ops:
assert not isinstance(self.blocks[-1][-1].ops[-1], ControlOp), (
"Can't add to finished block")
self.blocks[-1][-1].ops.append(op)
if isinstance(op, RegisterOp):
self.environment.add_op(op)
return op
def goto(self, target: BasicBlock) -> None:
if not self.blocks[-1][-1].ops or not isinstance(self.blocks[-1][-1].ops[-1], ControlOp):
self.add(Goto(target))
def primitive_op(self, desc: OpDescription, args: List[Value], line: int) -> Value:
assert desc.result_type is not None
coerced = []
for i, arg in enumerate(args):
formal_type = self.op_arg_type(desc, i)
arg = self.coerce(arg, formal_type, line)
coerced.append(arg)
target = self.add(PrimitiveOp(coerced, desc, line))
return target
def op_arg_type(self, desc: OpDescription, n: int) -> RType:
if n >= len(desc.arg_types):
assert desc.is_var_arg
return desc.arg_types[-1]
return desc.arg_types[n]
@overload
def accept(self, node: Expression) -> Value: ...
@overload
def accept(self, node: Statement) -> None: ...
def accept(self, node: Union[Statement, Expression]) -> Optional[Value]:
with self.catch_errors(node.line):
if isinstance(node, Expression):
try:
res = node.accept(self)
res = self.coerce(res, self.node_type(node), node.line)
# If we hit an error during compilation, we want to
# keep trying, so we can produce more error
# messages. Generate a temp of the right type to keep
# from causing more downstream trouble.
except UnsupportedException:
res = self.alloc_temp(self.node_type(node))
return res
else:
try:
node.accept(self)
except UnsupportedException:
pass
return None
def alloc_temp(self, type: RType) -> Register:
return self.environment.add_temp(type)
def type_to_rtype(self, typ: Optional[Type]) -> RType:
return self.mapper.type_to_rtype(typ)
def node_type(self, node: Expression) -> RType:
if isinstance(node, IntExpr):
# TODO: Don't special case IntExpr
return int_rprimitive
if node not in self.types:
return object_rprimitive
mypy_type = self.types[node]
return self.type_to_rtype(mypy_type)
def box(self, src: Value) -> Value:
if src.type.is_unboxed:
return self.add(Box(src))
else:
return src
def unbox_or_cast(self, src: Value, target_type: RType, line: int) -> Value:
if target_type.is_unboxed:
return self.add(Unbox(src, target_type, line))
else:
return self.add(Cast(src, target_type, line))
def box_expr(self, expr: Expression) -> Value:
return self.box(self.accept(expr))
def make_dict(self, key_value_pairs: Sequence[DictEntry], line: int) -> Value:
result = None # type: Union[Value, None]
initial_items = [] # type: List[Value]
for key, value in key_value_pairs:
if key is not None:
# key:value
if result is None:
initial_items.extend((key, value))
continue
self.translate_special_method_call(
result,
'__setitem__',
[key, value],
result_type=None,
line=line)
else:
# **value
if result is None:
result = self.primitive_op(new_dict_op, initial_items, line)
self.primitive_op(
dict_update_in_display_op,
[result, value],
line=line
)
if result is None:
result = self.primitive_op(new_dict_op, initial_items, line)
return result
def none(self) -> Value:
return self.add(PrimitiveOp([], none_op, line=-1))
def none_object(self) -> Value:
return self.add(PrimitiveOp([], none_object_op, line=-1))
def load_outer_env(self, base: Value, outer_env: Environment) -> Value:
"""Loads the environment class for a given base into a register.
Additionally, iterates through all of the SymbolNode and AssignmentTarget instances of the
environment at the given index's symtable, and adds those instances to the environment of
the current environment. This is done so that the current environment can access outer
environment variables without having to reload all of the environment registers.
Returns the register where the environment class was loaded.
"""
env = self.add(GetAttr(base, ENV_ATTR_NAME, self.fn_info.fitem.line))
assert isinstance(env.type, RInstance), '{} must be of type RInstance'.format(env)
for symbol, target in outer_env.symtable.items():
env.type.class_ir.attributes[symbol.name()] = target.type
symbol_target = AssignmentTargetAttr(env, symbol.name())
self.environment.add_target(symbol, symbol_target)
return env
def load_outer_envs(self, base: ImplicitClass) -> None:
index = len(self.environments) - 2
# Load the first outer environment. This one is special because it gets saved in the
# FuncInfo instance's prev_env_reg field.
if index > 1:
# outer_env = self.fn_infos[index].environment
outer_env = self.environments[index]
if isinstance(base, GeneratorClass):
base.prev_env_reg = self.load_outer_env(base.curr_env_reg, outer_env)
else:
base.prev_env_reg = self.load_outer_env(base.self_reg, outer_env)
env_reg = base.prev_env_reg
index -= 1
# Load the remaining outer environments into registers.
while index > 1:
# outer_env = self.fn_infos[index].environment
outer_env = self.environments[index]
env_reg = self.load_outer_env(env_reg, outer_env)
index -= 1
def load_env_registers(self) -> None:
"""Loads the registers for the current FuncItem being visited.
Adds the arguments of the FuncItem to the environment. If the FuncItem is nested inside of
another function, then this also loads all of the outer environments of the FuncItem into
registers so that they can be used when accessing free variables.
"""
self.add_args_to_env(local=True)
fn_info = self.fn_info
fitem = fn_info.fitem
if fn_info.is_nested:
self.load_outer_envs(fn_info.callable_class)
# If this is a FuncDef, then make sure to load the FuncDef into its own environment
# class so that the function can be called recursively.
if isinstance(fitem, FuncDef):
self.setup_func_for_recursive_call(fitem, fn_info.callable_class)
def add_var_to_env_class(self,
var: SymbolNode,
rtype: RType,
base: Union[FuncInfo, ImplicitClass],
reassign: bool = False) -> AssignmentTarget:
# First, define the variable name as an attribute of the environment class, and then
# construct a target for that attribute.
self.fn_info.env_class.attributes[var.name()] = rtype
attr_target = AssignmentTargetAttr(base.curr_env_reg, var.name())
if reassign:
# Read the local definition of the variable, and set the corresponding attribute of
# the environment class' variable to be that value.
reg = self.read(self.environment.lookup(var), self.fn_info.fitem.line)
self.add(SetAttr(base.curr_env_reg, var.name(), reg, self.fn_info.fitem.line))
# Override the local definition of the variable to instead point at the variable in
# the environment class.
return self.environment.add_target(var, attr_target)
def setup_func_for_recursive_call(self, fdef: FuncDef, base: ImplicitClass) -> None:
"""
Adds the instance of the callable class representing the given FuncDef to a register in the
environment so that the function can be called recursively. Note that this needs to be done
only for nested functions.
"""
# First, set the attribute of the environment class so that GetAttr can be called on it.
prev_env = self.fn_infos[-2].env_class
prev_env.attributes[fdef.name()] = self.type_to_rtype(fdef.type)
if isinstance(base, GeneratorClass):
# If we are dealing with a generator class, then we need to first get the register
# holding the current environment class, and load the previous environment class from
# there.
prev_env_reg = self.add(GetAttr(base.curr_env_reg, ENV_ATTR_NAME, -1))
else:
prev_env_reg = base.prev_env_reg
# Obtain the instance of the callable class representing the FuncDef, and add it to the
# current environment.
val = self.add(GetAttr(prev_env_reg, fdef.name(), -1))
target = self.environment.add_local_reg(fdef, object_rprimitive)
self.assign(target, val, -1)
def setup_env_for_generator_class(self) -> None:
"""Populates the environment for a generator class."""
fitem = self.fn_info.fitem
cls = self.fn_info.generator_class
self_target = self.add_self_to_env(cls.ir)
# Add the type, value, and traceback variables to the environment.
exc_type = self.environment.add_local(Var('type'), object_rprimitive, is_arg=True)
exc_val = self.environment.add_local(Var('value'), object_rprimitive, is_arg=True)
exc_tb = self.environment.add_local(Var('traceback'), object_rprimitive, is_arg=True)
# TODO: Use the right type here instead of object?
exc_arg = self.environment.add_local(Var('arg'), object_rprimitive, is_arg=True)
cls.exc_regs = (exc_type, exc_val, exc_tb)
cls.send_arg_reg = exc_arg
cls.self_reg = self.read(self_target, fitem.line)
cls.curr_env_reg = self.load_outer_env(cls.self_reg, self.environment)
# Define a variable representing the label to go to the next time the '__next__' function
# of the generator is called, and add it as an attribute to the environment class.
cls.next_label_target = self.add_var_to_env_class(Var(NEXT_LABEL_ATTR_NAME),
int_rprimitive,
cls,
reassign=False)
# Add arguments from the original generator function to the generator class' environment.
self.add_args_to_env(local=False, base=cls, reassign=False)
# Set the next label register for the generator class.
cls.next_label_reg = self.read(cls.next_label_target, fitem.line)
def add_args_to_env(self,
local: bool = True,
base: Optional[Union[FuncInfo, ImplicitClass]] = None,
reassign: bool = True) -> None:
fn_info = self.fn_info
if local:
for arg in fn_info.fitem.arguments:
rtype = self.type_to_rtype(arg.variable.type)
self.environment.add_local_reg(arg.variable, rtype, is_arg=True)
else:
for arg in fn_info.fitem.arguments:
if self.is_free_variable(arg.variable) or fn_info.is_generator:
rtype = self.type_to_rtype(arg.variable.type)
assert base is not None, 'base cannot be None for adding nonlocal args'
self.add_var_to_env_class(arg.variable, rtype, base, reassign=reassign)
def gen_func_ns(self) -> str:
"""Generates a namespace for a nested function using its outer function names."""
return '_'.join(info.name + ('' if not info.class_name else '_' + info.class_name)
for info in self.fn_infos
if info.name and info.name != '<top level>')
def setup_callable_class(self) -> None:
"""Generates a callable class representing a nested function or a function within a
non-extension class and sets up the 'self' variable for that class.
This takes the most recently visited function and returns a ClassIR to represent that
function. Each callable class contains an environment attribute with points to another
ClassIR representing the environment class where some of its variables can be accessed.
Note that its '__call__' method is not yet implemented, and is implemented in the
add_call_to_callable_class function.
Returns a newly constructed ClassIR representing the callable class for the nested
function.
"""
# Check to see that the name has not already been taken. If so, rename the class. We allow
# multiple uses of the same function name because this is valid in if-else blocks. Example:
# if True:
# def foo(): ----> foo_obj()
# return True
# else:
# def foo(): ----> foo_obj_0()
# return False
name = base_name = '{}_obj'.format(self.fn_info.namespaced_name())
count = 0
while name in self.callable_class_names:
name = base_name + '_' + str(count)
count += 1
self.callable_class_names.add(name)
# Define the actual callable class ClassIR, and set its environment to point at the
# previously defined environment class.
callable_class_ir = ClassIR(name, self.module_name, is_generated=True)
# The functools @wraps decorator attempts to call setattr on nested functions, so
# we create a dict for these nested functions.
# https://github.com/python/cpython/blob/3.7/Lib/functools.py#L58
if self.fn_info.is_nested:
callable_class_ir.has_dict = True
# If the enclosing class doesn't contain nested (which will happen if
# this is a toplevel lambda), don't set up an environment.
if self.fn_infos[-2].contains_nested:
callable_class_ir.attributes[ENV_ATTR_NAME] = RInstance(self.fn_infos[-2].env_class)
callable_class_ir.mro = [callable_class_ir]
self.fn_info.callable_class = ImplicitClass(callable_class_ir)
self.classes.append(callable_class_ir)
# Add a 'self' variable to the callable class' environment, and store that variable in a
# register to be accessed later.
self_target = self.add_self_to_env(callable_class_ir)
self.fn_info.callable_class.self_reg = self.read(self_target, self.fn_info.fitem.line)
def add_call_to_callable_class(self,
blocks: List[BasicBlock],
sig: FuncSignature,
env: Environment,
fn_info: FuncInfo) -> FuncIR:
"""Generates a '__call__' method for a callable class representing a nested function.
This takes the blocks, signature, and environment associated with a function definition and
uses those to build the '__call__' method of a given callable class, used to represent that
function. Note that a 'self' parameter is added to its list of arguments, as the nested
function becomes a class method.
"""
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),) + sig.args, sig.ret_type)
call_fn_decl = FuncDecl('__call__', fn_info.callable_class.ir.name, self.module_name, sig)
call_fn_ir = FuncIR(call_fn_decl, blocks, env,
fn_info.fitem.line, traceback_name=fn_info.fitem.name())
fn_info.callable_class.ir.methods['__call__'] = call_fn_ir
return call_fn_ir
def add_get_to_callable_class(self, fn_info: FuncInfo) -> None:
"""Generates the '__get__' method for a callable class."""
line = fn_info.fitem.line
self.enter(fn_info)
vself = self.read(self.environment.add_local_reg(Var(SELF_NAME), object_rprimitive, True))
instance = self.environment.add_local_reg(Var('instance'), object_rprimitive, True)
self.environment.add_local_reg(Var('owner'), object_rprimitive, True)
# If accessed through the class, just return the callable
# object. If accessed through an object, create a new bound
# instance method object.
instance_block, class_block = BasicBlock(), BasicBlock()
comparison = self.binary_op(self.read(instance), self.none_object(), 'is', line)
self.add_bool_branch(comparison, class_block, instance_block)
self.activate_block(class_block)
self.add(Return(vself))
self.activate_block(instance_block)
self.add(Return(self.primitive_op(method_new_op, [vself, self.read(instance)], line)))
blocks, env, _, fn_info = self.leave()
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),
RuntimeArg('instance', object_rprimitive),
RuntimeArg('owner', object_rprimitive)),
object_rprimitive)
get_fn_decl = FuncDecl('__get__', fn_info.callable_class.ir.name, self.module_name, sig)
get_fn_ir = FuncIR(get_fn_decl, blocks, env)
fn_info.callable_class.ir.methods['__get__'] = get_fn_ir
self.functions.append(get_fn_ir)
def instantiate_callable_class(self, fn_info: FuncInfo) -> Value:
"""
Assigns a callable class to a register named after the given function definition. Note
that fn_info refers to the function being assigned, whereas self.fn_info refers to the
function encapsulating the function being turned into a callable class.
"""
fitem = fn_info.fitem
func_reg = self.add(Call(fn_info.callable_class.ir.ctor, [], fitem.line))
# Set the callable class' environment attribute to point at the environment class
# defined in the callable class' immediate outer scope. Note that there are three possible
# environment class registers we may use. If the encapsulating function is:
# - a generator function, then the callable class is instantiated from the generator class'
# __next__' function, and hence the generator class' environment register is used.
# - a nested function, then the callable class is instantiated from the current callable
# class' '__call__' function, and hence the callable class' environment register is used.
# - neither, then we use the environment register of the original function.
curr_env_reg = None
if self.fn_info.is_generator:
curr_env_reg = self.fn_info.generator_class.curr_env_reg
elif self.fn_info.is_nested:
curr_env_reg = self.fn_info.callable_class.curr_env_reg
elif self.fn_info.contains_nested:
curr_env_reg = self.fn_info.curr_env_reg
if curr_env_reg:
self.add(SetAttr(func_reg, ENV_ATTR_NAME, curr_env_reg, fitem.line))
return func_reg
def setup_env_class(self) -> ClassIR:
"""Generates a class representing a function environment.
Note that the variables in the function environment are not actually populated here. This
is because when the environment class is generated, the function environment has not yet
been visited. This behavior is allowed so that when the compiler visits nested functions,
it can use the returned ClassIR instance to figure out free variables it needs to access.
The remaining attributes of the environment class are populated when the environment
registers are loaded.
Returns a ClassIR representing an environment for a function containing a nested function.
"""
env_class = ClassIR('{}_env'.format(self.fn_info.namespaced_name()),
self.module_name, is_generated=True)
env_class.attributes[SELF_NAME] = RInstance(env_class)
if self.fn_info.is_nested:
# If the function is nested, its environment class must contain an environment
# attribute pointing to its encapsulating functions' environment class.
env_class.attributes[ENV_ATTR_NAME] = RInstance(self.fn_infos[-2].env_class)
env_class.mro = [env_class]
self.fn_info.env_class = env_class
self.classes.append(env_class)
return env_class
def finalize_env_class(self) -> None:
"""Generates, instantiates, and sets up the environment of an environment class."""
self.instantiate_env_class()
# Iterate through the function arguments and replace local definitions (using registers)
# that were previously added to the environment with references to the function's
# environment class.
if self.fn_info.is_nested:
self.add_args_to_env(local=False, base=self.fn_info.callable_class)
else:
self.add_args_to_env(local=False, base=self.fn_info)
def instantiate_env_class(self) -> Value:
"""Assigns an environment class to a register named after the given function definition."""
curr_env_reg = self.add(Call(self.fn_info.env_class.ctor, [], self.fn_info.fitem.line))
if self.fn_info.is_nested:
self.fn_info.callable_class._curr_env_reg = curr_env_reg
self.add(SetAttr(curr_env_reg,
ENV_ATTR_NAME,
self.fn_info.callable_class.prev_env_reg,
self.fn_info.fitem.line))
else:
self.fn_info._curr_env_reg = curr_env_reg
return curr_env_reg
def gen_generator_func(self) -> None:
self.setup_generator_class()
self.load_env_registers()
self.gen_arg_defaults()
self.finalize_env_class()
self.add(Return(self.instantiate_generator_class()))
def setup_generator_class(self) -> ClassIR:
name = '{}_gen'.format(self.fn_info.namespaced_name())
generator_class_ir = ClassIR(name, self.module_name, is_generated=True)
generator_class_ir.attributes[ENV_ATTR_NAME] = RInstance(self.fn_info.env_class)
generator_class_ir.mro = [generator_class_ir]
self.classes.append(generator_class_ir)
self.fn_info.generator_class = GeneratorClass(generator_class_ir)
return generator_class_ir
def add_helper_to_generator_class(self,
blocks: List[BasicBlock],
sig: FuncSignature,
env: Environment,
fn_info: FuncInfo) -> FuncDecl:
"""Generates a helper method for a generator class, called by '__next__' and 'throw'."""
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),
RuntimeArg('type', object_rprimitive),
RuntimeArg('value', object_rprimitive),
RuntimeArg('traceback', object_rprimitive),
RuntimeArg('arg', object_rprimitive)
), sig.ret_type)
helper_fn_decl = FuncDecl('__mypyc_generator_helper__', fn_info.generator_class.ir.name,
self.module_name, sig)
helper_fn_ir = FuncIR(helper_fn_decl, blocks, env,
fn_info.fitem.line, traceback_name=fn_info.fitem.name())
fn_info.generator_class.ir.methods['__mypyc_generator_helper__'] = helper_fn_ir
self.functions.append(helper_fn_ir)
return helper_fn_decl
def add_iter_to_generator_class(self, fn_info: FuncInfo) -> None:
"""Generates the '__iter__' method for a generator class."""
self.enter(fn_info)
self_target = self.add_self_to_env(fn_info.generator_class.ir)
self.add(Return(self.read(self_target, fn_info.fitem.line)))
blocks, env, _, fn_info = self.leave()
# Next, add the actual function as a method of the generator class.
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),), object_rprimitive)
iter_fn_decl = FuncDecl('__iter__', fn_info.generator_class.ir.name, self.module_name, sig)
iter_fn_ir = FuncIR(iter_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['__iter__'] = iter_fn_ir
self.functions.append(iter_fn_ir)
def add_next_to_generator_class(self,
fn_info: FuncInfo,
fn_decl: FuncDecl,
sig: FuncSignature) -> None:
"""Generates the '__next__' method for a generator class."""
self.enter(fn_info)
self_reg = self.read(self.add_self_to_env(fn_info.generator_class.ir))
none_reg = self.none_object()
# Call the helper function with error flags set to Py_None, and return that result.
result = self.add(Call(fn_decl, [self_reg, none_reg, none_reg, none_reg, none_reg],
fn_info.fitem.line))
self.add(Return(result))
blocks, env, _, fn_info = self.leave()
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),), sig.ret_type)
next_fn_decl = FuncDecl('__next__', fn_info.generator_class.ir.name, self.module_name, sig)
next_fn_ir = FuncIR(next_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['__next__'] = next_fn_ir
self.functions.append(next_fn_ir)
def add_send_to_generator_class(self,
fn_info: FuncInfo,
fn_decl: FuncDecl,
sig: FuncSignature) -> None:
"""Generates the 'send' method for a generator class."""
# FIXME: this is basically the same as add_next...
self.enter(fn_info)
self_reg = self.read(self.add_self_to_env(fn_info.generator_class.ir))
arg = self.environment.add_local_reg(Var('arg'), object_rprimitive, True)
none_reg = self.none_object()
# Call the helper function with error flags set to Py_None, and return that result.
result = self.add(Call(fn_decl, [self_reg, none_reg, none_reg, none_reg, self.read(arg)],
fn_info.fitem.line))
self.add(Return(result))
blocks, env, _, fn_info = self.leave()
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),
RuntimeArg('arg', object_rprimitive),), sig.ret_type)
next_fn_decl = FuncDecl('send', fn_info.generator_class.ir.name, self.module_name, sig)
next_fn_ir = FuncIR(next_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['send'] = next_fn_ir
self.functions.append(next_fn_ir)
def add_throw_to_generator_class(self,
fn_info: FuncInfo,
fn_decl: FuncDecl,
sig: FuncSignature) -> None:
"""Generates the 'throw' method for a generator class."""
self.enter(fn_info)
self_reg = self.read(self.add_self_to_env(fn_info.generator_class.ir))
# Add the type, value, and traceback variables to the environment.
typ = self.environment.add_local_reg(Var('type'), object_rprimitive, True)
val = self.environment.add_local_reg(Var('value'), object_rprimitive, True)
tb = self.environment.add_local_reg(Var('traceback'), object_rprimitive, True)
# Because the value and traceback arguments are optional and hence can be NULL if not
# passed in, we have to assign them Py_None if they are not passed in.
none_reg = self.none_object()
self.assign_if_null(val, lambda: none_reg, self.fn_info.fitem.line)
self.assign_if_null(tb, lambda: none_reg, self.fn_info.fitem.line)
# Call the helper function using the arguments passed in, and return that result.
result = self.add(Call(fn_decl,
[self_reg, self.read(typ), self.read(val), self.read(tb), none_reg],
fn_info.fitem.line))
self.add(Return(result))
blocks, env, _, fn_info = self.leave()
# Create the FuncSignature for the throw function. NOte that the value and traceback fields
# are optional, and are assigned to if they are not passed in inside the body of the throw
# function.
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),
RuntimeArg('type', object_rprimitive),
RuntimeArg('value', object_rprimitive, ARG_OPT),
RuntimeArg('traceback', object_rprimitive, ARG_OPT)),
sig.ret_type)
throw_fn_decl = FuncDecl('throw', fn_info.generator_class.ir.name, self.module_name, sig)
throw_fn_ir = FuncIR(throw_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['throw'] = throw_fn_ir
self.functions.append(throw_fn_ir)
def add_close_to_generator_class(self, fn_info: FuncInfo) -> None:
"""Generates the '__close__' method for a generator class."""
# TODO: Currently this method just triggers a runtime error,
# we should fill this out eventually.
self.enter(fn_info)
self.add_self_to_env(fn_info.generator_class.ir)
self.add(RaiseStandardError(RaiseStandardError.RUNTIME_ERROR,
'close method on generator classes uimplemented',
fn_info.fitem.line))
self.add(Unreachable())
blocks, env, _, fn_info = self.leave()
# Next, add the actual function as a method of the generator class.
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),), object_rprimitive)
close_fn_decl = FuncDecl('close', fn_info.generator_class.ir.name, self.module_name, sig)
close_fn_ir = FuncIR(close_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['close'] = close_fn_ir
self.functions.append(close_fn_ir)
def add_await_to_generator_class(self, fn_info: FuncInfo) -> None:
"""Generates the '__await__' method for a generator class."""
self.enter(fn_info)
self_target = self.add_self_to_env(fn_info.generator_class.ir)
self.add(Return(self.read(self_target, fn_info.fitem.line)))
blocks, env, _, fn_info = self.leave()
# Next, add the actual function as a method of the generator class.
sig = FuncSignature((RuntimeArg(SELF_NAME, object_rprimitive),), object_rprimitive)
await_fn_decl = FuncDecl('__await__', fn_info.generator_class.ir.name,
self.module_name, sig)
await_fn_ir = FuncIR(await_fn_decl, blocks, env)
fn_info.generator_class.ir.methods['__await__'] = await_fn_ir
self.functions.append(await_fn_ir)
def create_switch_for_generator_class(self) -> None:
self.add(Goto(self.fn_info.generator_class.switch_block))
self.fn_info.generator_class.blocks.append(self.new_block())
def populate_switch_for_generator_class(self) -> None:
cls = self.fn_info.generator_class
line = self.fn_info.fitem.line
self.activate_block(cls.switch_block)
for label, true_block in enumerate(cls.blocks):
false_block = BasicBlock()
comparison = self.binary_op(cls.next_label_reg, self.add(LoadInt(label)), '==', line)
self.add_bool_branch(comparison, true_block, false_block)
self.activate_block(false_block)
self.add(RaiseStandardError(RaiseStandardError.STOP_ITERATION, None, line))
self.add(Unreachable())
def instantiate_generator_class(self) -> Value:
fitem = self.fn_info.fitem
generator_reg = self.add(Call(self.fn_info.generator_class.ir.ctor, [], fitem.line))
# Get the current environment register. If the current function is nested, then the
# generator class gets instantiated from the callable class' '__call__' method, and hence
# we use the callable class' environment register. Otherwise, we use the original
# function's environment register.
if self.fn_info.is_nested:
curr_env_reg = self.fn_info.callable_class.curr_env_reg
else:
curr_env_reg = self.fn_info.curr_env_reg
# Set the generator class' environment attribute to point at the environment class
# defined in the current scope.
self.add(SetAttr(generator_reg, ENV_ATTR_NAME, curr_env_reg, fitem.line))
# Set the generator class' environment class' NEXT_LABEL_ATTR_NAME attribute to 0.
zero_reg = self.add(LoadInt(0))
self.add(SetAttr(curr_env_reg, NEXT_LABEL_ATTR_NAME, zero_reg, fitem.line))
return generator_reg
def add_raise_exception_blocks_to_generator_class(self, line: int) -> None:
"""
Generates blocks to check if error flags are set while calling the helper method for
generator functions, and raises an exception if those flags are set.
"""
cls = self.fn_info.generator_class
assert cls.exc_regs is not None
exc_type, exc_val, exc_tb = cls.exc_regs
# Check to see if an exception was raised.
error_block = BasicBlock()
ok_block = BasicBlock()
comparison = self.binary_op(exc_type, self.none_object(), 'is not', line)
self.add_bool_branch(comparison, error_block, ok_block)
self.activate_block(error_block)
self.primitive_op(raise_exception_with_tb_op, [exc_type, exc_val, exc_tb], line)
self.add(Unreachable())
self.goto_and_activate(ok_block)
def add_self_to_env(self, cls: ClassIR) -> AssignmentTargetRegister:
return self.environment.add_local_reg(Var(SELF_NAME),
RInstance(cls),
is_arg=True)
def is_builtin_ref_expr(self, expr: RefExpr) -> bool:
assert expr.node, "RefExpr not resolved"
return '.' in expr.node.fullname() and expr.node.fullname().split('.')[0] == 'builtins'
def load_global(self, expr: NameExpr) -> Value:
"""Loads a Python-level global.
This takes a NameExpr and uses its name as a key to retrieve the corresponding PyObject *
from the _globals dictionary in the C-generated code.
"""
# If the global is from 'builtins', turn it into a module attr load instead
if self.is_builtin_ref_expr(expr):
assert expr.node, "RefExpr not resolved"
return self.load_module_attr_by_fullname(expr.node.fullname(), expr.line)
if (self.is_native_module_ref_expr(expr) and isinstance(expr.node, TypeInfo)
and not self.is_synthetic_type(expr.node)):
assert expr.fullname is not None
return self.load_native_type_object(expr.fullname)
return self.load_global_str(expr.name, expr.line)
def load_global_str(self, name: str, line: int) -> Value:
_globals = self.load_globals_dict()
reg = self.load_static_unicode(name)
return self.primitive_op(dict_get_item_op, [_globals, reg], line)
def load_globals_dict(self) -> Value:
return self.add(LoadStatic(dict_rprimitive, 'globals', self.module_name))
def literal_static_name(self, value: Union[int, float, complex, str, bytes]) -> str:
return self.mapper.literal_static_name(self.current_module, value)
def load_static_int(self, value: int) -> Value:
"""Loads a static integer Python 'int' object into a register."""
static_symbol = self.literal_static_name(value)
return self.add(LoadStatic(int_rprimitive, static_symbol, ann=value))
def load_static_float(self, value: float) -> Value:
"""Loads a static float value into a register."""
static_symbol = self.literal_static_name(value)
return self.add(LoadStatic(float_rprimitive, static_symbol, ann=value))
def load_static_bytes(self, value: bytes) -> Value:
"""Loads a static bytes value into a register."""
static_symbol = self.literal_static_name(value)
return self.add(LoadStatic(object_rprimitive, static_symbol, ann=value))
def load_static_complex(self, value: complex) -> Value:
"""Loads a static complex value into a register."""
static_symbol = self.literal_static_name(value)
return self.add(LoadStatic(object_rprimitive, static_symbol, ann=value))
def load_static_unicode(self, value: str) -> Value:
"""Loads a static unicode value into a register.
This is useful for more than just unicode literals; for example, method calls
also require a PyObject * form for the name of the method.
"""
static_symbol = self.literal_static_name(value)
return self.add(LoadStatic(str_rprimitive, static_symbol, ann=value))
def load_module(self, name: str) -> Value:
return self.add(LoadStatic(object_rprimitive, name, namespace=NAMESPACE_MODULE))
def load_module_attr_by_fullname(self, fullname: str, line: int) -> Value:
module, _, name = fullname.rpartition('.')
left = self.load_module(module)
return self.py_get_attr(left, name, line)
def load_native_type_object(self, fullname: str) -> Value:
module, name = fullname.rsplit('.', 1)
return self.add(LoadStatic(object_rprimitive, name, module, NAMESPACE_TYPE))
def coerce(self, src: Value, target_type: RType, line: int, force: bool = False) -> Value:
"""Generate a coercion/cast from one type to other (only if needed).
For example, int -> object boxes the source int; int -> int emits nothing;
object -> int unboxes the object. All conversions preserve object value.
If force is true, always generate an op (even if it is just an assignment) so
that the result will have exactly target_type as the type.
Returns the register with the converted value (may be same as src).
"""
if src.type.is_unboxed and not target_type.is_unboxed:
return self.box(src)
if ((src.type.is_unboxed and target_type.is_unboxed)
and not is_runtime_subtype(src.type, target_type)):
# To go from one unboxed type to another, we go through a boxed
# in-between value, for simplicity.
tmp = self.box(src)
return self.unbox_or_cast(tmp, target_type, line)
if ((not src.type.is_unboxed and target_type.is_unboxed)
or not is_subtype(src.type, target_type)):
return self.unbox_or_cast(src, target_type, line)
elif force:
tmp = self.alloc_temp(target_type)
self.add(Assign(tmp, src))
return tmp
return src
def native_args_to_positional(self,
args: Sequence[Value],
arg_kinds: List[int],
arg_names: Sequence[Optional[str]],
sig: FuncSignature,
line: int) -> List[Value]:
"""Prepare arguments for a native call.
Given args/kinds/names and a target signature for a native call, map
keyword arguments to their appropriate place in the argument list,
fill in error values for unspecified default arguments,
package arguments that will go into *args/**kwargs into a tuple/dict,
and coerce arguments to the appropriate type.
"""
sig_arg_kinds = [arg.kind for arg in sig.args]
sig_arg_names = [arg.name for arg in sig.args]
formal_to_actual = map_actuals_to_formals(arg_kinds,
arg_names,
sig_arg_kinds,
sig_arg_names,
lambda n: AnyType(TypeOfAny.special_form))
# Flatten out the arguments, loading error values for default
# arguments, constructing tuples/dicts for star args, and
# coercing everything to the expected type.
output_args = []
for lst, arg in zip(formal_to_actual, sig.args):
output_arg = None
if arg.kind == ARG_STAR:
output_arg = self.primitive_op(new_tuple_op, [args[i] for i in lst], line)
elif arg.kind == ARG_STAR2:
dict_entries = [(self.load_static_unicode(cast(str, arg_names[i])), args[i])
for i in lst]
output_arg = self.make_dict(dict_entries, line)
elif not lst:
output_arg = self.add(LoadErrorValue(arg.type, is_borrowed=True))
else:
output_arg = args[lst[0]]
output_args.append(self.coerce(output_arg, arg.type, line))
return output_args
def get_func_target(self, fdef: FuncDef) -> AssignmentTarget:
"""
Given a FuncDef, return the target associated the instance of its callable class. If the
function was not already defined somewhere, then define it and add it to the current
environment.
"""
if fdef.original_def:
# Get the target associated with the previously defined FuncDef.
return self.environment.lookup(fdef.original_def)
if self.fn_info.is_generator or self.fn_info.contains_nested:
return self.environment.lookup(fdef)
return self.environment.add_local_reg(fdef, object_rprimitive)
# Lacks a good type because there wasn't a reasonable type in 3.5 :(
def catch_errors(self, line: int) -> Any:
return catch_errors(self.module_path, line)
def warning(self, msg: str, line: int) -> None:
self.errors.warning(msg, self.module_path, line)
def error(self, msg: str, line: int) -> None:
self.errors.error(msg, self.module_path, line)
def bail(self, msg: str, line: int) -> 'NoReturn':
"""Reports an error and aborts compilation up until the last accept() call
(accept() catches the UnsupportedException and keeps on
processing. This allows errors to be non-blocking without always
needing to write handling for them.
"""
self.error(msg, line)
raise UnsupportedException()