| """Semantic analysis of TypedDict definitions.""" |
| |
| from __future__ import annotations |
| |
| from collections.abc import Collection |
| from typing import Final |
| |
| from mypy import errorcodes as codes, message_registry |
| from mypy.errorcodes import ErrorCode |
| from mypy.expandtype import expand_type |
| from mypy.exprtotype import TypeTranslationError, expr_to_unanalyzed_type |
| from mypy.message_registry import TYPEDDICT_OVERRIDE_MERGE |
| from mypy.messages import MessageBuilder |
| from mypy.nodes import ( |
| ARG_NAMED, |
| ARG_POS, |
| AssignmentStmt, |
| CallExpr, |
| ClassDef, |
| Context, |
| DictExpr, |
| EllipsisExpr, |
| Expression, |
| ExpressionStmt, |
| IndexExpr, |
| NameExpr, |
| PassStmt, |
| RefExpr, |
| Statement, |
| StrExpr, |
| TempNode, |
| TupleExpr, |
| TypeAlias, |
| TypedDictExpr, |
| TypeInfo, |
| ) |
| from mypy.options import Options |
| from mypy.semanal_shared import ( |
| SemanticAnalyzerInterface, |
| has_placeholder, |
| require_bool_literal_argument, |
| ) |
| from mypy.state import state |
| from mypy.typeanal import check_for_explicit_any, has_any_from_unimported_type |
| from mypy.types import ( |
| TPDICT_NAMES, |
| AnyType, |
| ReadOnlyType, |
| RequiredType, |
| Type, |
| TypedDictType, |
| TypeOfAny, |
| TypeVarLikeType, |
| get_proper_type, |
| ) |
| |
| TPDICT_CLASS_ERROR: Final = ( |
| 'Invalid statement in TypedDict definition; expected "field_name: field_type"' |
| ) |
| |
| |
| class TypedDictAnalyzer: |
| def __init__( |
| self, options: Options, api: SemanticAnalyzerInterface, msg: MessageBuilder |
| ) -> None: |
| self.options = options |
| self.api = api |
| self.msg = msg |
| |
| def analyze_typeddict_classdef(self, defn: ClassDef) -> tuple[bool, TypeInfo | None]: |
| """Analyze a class that may define a TypedDict. |
| |
| Assume that base classes have been analyzed already. |
| |
| Note: Unlike normal classes, we won't create a TypeInfo until |
| the whole definition of the TypeDict (including the body and all |
| key names and types) is complete. This is mostly because we |
| store the corresponding TypedDictType in the TypeInfo. |
| |
| Return (is this a TypedDict, new TypeInfo). Specifics: |
| * If we couldn't finish due to incomplete reference anywhere in |
| the definition, return (True, None). |
| * If this is not a TypedDict, return (False, None). |
| """ |
| possible = False |
| for base_expr in defn.base_type_exprs: |
| if isinstance(base_expr, CallExpr): |
| base_expr = base_expr.callee |
| if isinstance(base_expr, IndexExpr): |
| base_expr = base_expr.base |
| if isinstance(base_expr, RefExpr): |
| self.api.accept(base_expr) |
| if base_expr.fullname in TPDICT_NAMES or self.is_typeddict(base_expr): |
| possible = True |
| if isinstance(base_expr.node, TypeInfo) and base_expr.node.is_final: |
| err = message_registry.CANNOT_INHERIT_FROM_FINAL |
| self.fail(err.format(base_expr.node.name).value, defn, code=err.code) |
| if not possible: |
| return False, None |
| existing_info = None |
| if isinstance(defn.analyzed, TypedDictExpr): |
| existing_info = defn.analyzed.info |
| |
| field_types: dict[str, Type] | None |
| if ( |
| len(defn.base_type_exprs) == 1 |
| and isinstance(defn.base_type_exprs[0], RefExpr) |
| and defn.base_type_exprs[0].fullname in TPDICT_NAMES |
| ): |
| # Building a new TypedDict |
| field_types, statements, required_keys, readonly_keys = ( |
| self.analyze_typeddict_classdef_fields(defn) |
| ) |
| if field_types is None: |
| return True, None # Defer |
| if self.api.is_func_scope() and "@" not in defn.name: |
| defn.name += "@" + str(defn.line) |
| info = self.build_typeddict_typeinfo( |
| defn.name, field_types, required_keys, readonly_keys, defn.line, existing_info |
| ) |
| defn.analyzed = TypedDictExpr(info) |
| defn.analyzed.line = defn.line |
| defn.analyzed.column = defn.column |
| defn.defs.body = statements |
| return True, info |
| |
| # Extending/merging existing TypedDicts |
| typeddict_bases: list[Expression] = [] |
| typeddict_bases_set = set() |
| for expr in defn.base_type_exprs: |
| ok, maybe_type_info, _ = self.check_typeddict(expr, None, False) |
| if ok and maybe_type_info is not None: |
| # expr is a CallExpr |
| info = maybe_type_info |
| typeddict_bases_set.add(info.fullname) |
| typeddict_bases.append(expr) |
| elif isinstance(expr, RefExpr) and expr.fullname in TPDICT_NAMES: |
| if "TypedDict" not in typeddict_bases_set: |
| typeddict_bases_set.add("TypedDict") |
| else: |
| self.fail('Duplicate base class "TypedDict"', defn) |
| elif ( |
| isinstance(expr, RefExpr) |
| and self.is_typeddict(expr) |
| or isinstance(expr, IndexExpr) |
| and self.is_typeddict(expr.base) |
| ): |
| info = self._parse_typeddict_base(expr, defn) |
| if info.fullname not in typeddict_bases_set: |
| typeddict_bases_set.add(info.fullname) |
| typeddict_bases.append(expr) |
| else: |
| self.fail(f'Duplicate base class "{info.name}"', defn) |
| else: |
| self.fail("All bases of a new TypedDict must be TypedDict types", defn) |
| |
| field_types = {} |
| required_keys = set() |
| readonly_keys = set() |
| # Iterate over bases in reverse order so that leftmost base class' keys take precedence |
| for base in reversed(typeddict_bases): |
| self.add_keys_and_types_from_base( |
| base, field_types, required_keys, readonly_keys, defn |
| ) |
| (new_field_types, new_statements, new_required_keys, new_readonly_keys) = ( |
| self.analyze_typeddict_classdef_fields(defn, oldfields=field_types) |
| ) |
| if new_field_types is None: |
| return True, None # Defer |
| field_types.update(new_field_types) |
| required_keys.update(new_required_keys) |
| readonly_keys.update(new_readonly_keys) |
| info = self.build_typeddict_typeinfo( |
| defn.name, field_types, required_keys, readonly_keys, defn.line, existing_info |
| ) |
| defn.analyzed = TypedDictExpr(info) |
| defn.analyzed.line = defn.line |
| defn.analyzed.column = defn.column |
| defn.defs.body = new_statements |
| return True, info |
| |
| def add_keys_and_types_from_base( |
| self, |
| base: Expression, |
| field_types: dict[str, Type], |
| required_keys: set[str], |
| readonly_keys: set[str], |
| ctx: Context, |
| ) -> None: |
| info = self._parse_typeddict_base(base, ctx) |
| base_args: list[Type] = [] |
| if isinstance(base, IndexExpr): |
| args = self.analyze_base_args(base, ctx) |
| if args is None: |
| return |
| base_args = args |
| |
| assert info.typeddict_type is not None |
| base_typed_dict = info.typeddict_type |
| base_items = base_typed_dict.items |
| valid_items = base_items.copy() |
| |
| # Always fix invalid bases to avoid crashes. |
| tvars = info.defn.type_vars |
| if len(base_args) != len(tvars): |
| any_kind = TypeOfAny.from_omitted_generics |
| if base_args: |
| self.fail(f'Invalid number of type arguments for "{info.name}"', ctx) |
| any_kind = TypeOfAny.from_error |
| base_args = [AnyType(any_kind) for _ in tvars] |
| |
| with state.strict_optional_set(self.options.strict_optional): |
| valid_items = self.map_items_to_base(valid_items, tvars, base_args) |
| for key in base_items: |
| if key in field_types: |
| self.fail(TYPEDDICT_OVERRIDE_MERGE.format(key), ctx) |
| |
| field_types.update(valid_items) |
| required_keys.update(base_typed_dict.required_keys) |
| readonly_keys.update(base_typed_dict.readonly_keys) |
| |
| def _parse_typeddict_base(self, base: Expression, ctx: Context) -> TypeInfo: |
| if isinstance(base, RefExpr): |
| if isinstance(base.node, TypeInfo): |
| return base.node |
| elif isinstance(base.node, TypeAlias): |
| # Only old TypeAlias / plain assignment, PEP695 `type` stmt |
| # cannot be used as a base class |
| target = get_proper_type(base.node.target) |
| assert isinstance(target, TypedDictType) |
| return target.fallback.type |
| else: |
| assert False |
| elif isinstance(base, IndexExpr): |
| assert isinstance(base.base, RefExpr) |
| return self._parse_typeddict_base(base.base, ctx) |
| else: |
| assert isinstance(base, CallExpr) |
| assert isinstance(base.analyzed, TypedDictExpr) |
| return base.analyzed.info |
| |
| def analyze_base_args(self, base: IndexExpr, ctx: Context) -> list[Type] | None: |
| """Analyze arguments of base type expressions as types. |
| |
| We need to do this, because normal base class processing happens after |
| the TypedDict special-casing (plus we get a custom error message). |
| """ |
| base_args = [] |
| if isinstance(base.index, TupleExpr): |
| args = base.index.items |
| else: |
| args = [base.index] |
| |
| for arg_expr in args: |
| try: |
| type = expr_to_unanalyzed_type(arg_expr, self.options, self.api.is_stub_file) |
| except TypeTranslationError: |
| self.fail("Invalid TypedDict type argument", ctx) |
| return None |
| analyzed = self.api.anal_type( |
| type, |
| allow_typed_dict_special_forms=True, |
| allow_placeholder=not self.api.is_func_scope(), |
| ) |
| if analyzed is None: |
| return None |
| base_args.append(analyzed) |
| return base_args |
| |
| def map_items_to_base( |
| self, valid_items: dict[str, Type], tvars: list[TypeVarLikeType], base_args: list[Type] |
| ) -> dict[str, Type]: |
| """Map item types to how they would look in their base with type arguments applied. |
| |
| Note it is safe to use expand_type() during semantic analysis, because it should never |
| (indirectly) call is_subtype(). |
| """ |
| mapped_items = {} |
| for key in valid_items: |
| type_in_base = valid_items[key] |
| if not tvars: |
| mapped_items[key] = type_in_base |
| continue |
| # TODO: simple zip can't be used for variadic types. |
| mapped_items[key] = expand_type( |
| type_in_base, {t.id: a for (t, a) in zip(tvars, base_args)} |
| ) |
| return mapped_items |
| |
| def analyze_typeddict_classdef_fields( |
| self, defn: ClassDef, oldfields: Collection[str] | None = None |
| ) -> tuple[dict[str, Type] | None, list[Statement], set[str], set[str]]: |
| """Analyze fields defined in a TypedDict class definition. |
| |
| This doesn't consider inherited fields (if any). Also consider totality, |
| if given. |
| |
| Return tuple with these items: |
| * Dict of key -> type (or None if found an incomplete reference -> deferral) |
| * List of statements from defn.defs.body that are legally allowed to be a |
| part of a TypedDict definition |
| * Set of required keys |
| """ |
| fields: dict[str, Type] = {} |
| readonly_keys = set[str]() |
| required_keys = set[str]() |
| statements: list[Statement] = [] |
| |
| total: bool | None = True |
| for key in defn.keywords: |
| if key == "total": |
| total = require_bool_literal_argument( |
| self.api, defn.keywords["total"], "total", True |
| ) |
| continue |
| for_function = ' for "__init_subclass__" of "TypedDict"' |
| self.msg.unexpected_keyword_argument_for_function(for_function, key, defn) |
| |
| for stmt in defn.defs.body: |
| if not isinstance(stmt, AssignmentStmt): |
| # Still allow pass or ... (for empty TypedDict's) and docstrings |
| if isinstance(stmt, PassStmt) or ( |
| isinstance(stmt, ExpressionStmt) |
| and isinstance(stmt.expr, (EllipsisExpr, StrExpr)) |
| ): |
| statements.append(stmt) |
| else: |
| defn.removed_statements.append(stmt) |
| self.fail(TPDICT_CLASS_ERROR, stmt) |
| elif len(stmt.lvalues) > 1 or not isinstance(stmt.lvalues[0], NameExpr): |
| # An assignment, but an invalid one. |
| defn.removed_statements.append(stmt) |
| self.fail(TPDICT_CLASS_ERROR, stmt) |
| else: |
| name = stmt.lvalues[0].name |
| if name in (oldfields or []): |
| self.fail(f'Overwriting TypedDict field "{name}" while extending', stmt) |
| if name in fields: |
| self.fail(f'Duplicate TypedDict key "{name}"', stmt) |
| continue |
| # Append stmt, name, and type in this case... |
| statements.append(stmt) |
| |
| field_type: Type |
| if stmt.unanalyzed_type is None: |
| field_type = AnyType(TypeOfAny.unannotated) |
| else: |
| analyzed = self.api.anal_type( |
| stmt.unanalyzed_type, |
| allow_typed_dict_special_forms=True, |
| allow_placeholder=not self.api.is_func_scope(), |
| prohibit_self_type="TypedDict item type", |
| prohibit_special_class_field_types="TypedDict", |
| ) |
| if analyzed is None: |
| return None, [], set(), set() # Need to defer |
| field_type = analyzed |
| if not has_placeholder(analyzed): |
| stmt.type = self.extract_meta_info(analyzed, stmt)[0] |
| |
| field_type, required, readonly = self.extract_meta_info(field_type) |
| fields[name] = field_type |
| |
| if (total or required is True) and required is not False: |
| required_keys.add(name) |
| if readonly: |
| readonly_keys.add(name) |
| |
| # ...despite possible minor failures that allow further analysis. |
| if stmt.type is None or hasattr(stmt, "new_syntax") and not stmt.new_syntax: |
| self.fail(TPDICT_CLASS_ERROR, stmt) |
| elif not isinstance(stmt.rvalue, TempNode): |
| # x: int assigns rvalue to TempNode(AnyType()) |
| self.fail("Right hand side values are not supported in TypedDict", stmt) |
| |
| return fields, statements, required_keys, readonly_keys |
| |
| def extract_meta_info( |
| self, typ: Type, context: Context | None = None |
| ) -> tuple[Type, bool | None, bool]: |
| """Unwrap all metadata types.""" |
| is_required = None # default, no modification |
| readonly = False # by default all is mutable |
| |
| seen_required = False |
| seen_readonly = False |
| while isinstance(typ, (RequiredType, ReadOnlyType)): |
| if isinstance(typ, RequiredType): |
| if context is not None and seen_required: |
| self.fail( |
| '"{}" type cannot be nested'.format( |
| "Required[]" if typ.required else "NotRequired[]" |
| ), |
| context, |
| code=codes.VALID_TYPE, |
| ) |
| is_required = typ.required |
| seen_required = True |
| typ = typ.item |
| if isinstance(typ, ReadOnlyType): |
| if context is not None and seen_readonly: |
| self.fail('"ReadOnly[]" type cannot be nested', context, code=codes.VALID_TYPE) |
| readonly = True |
| seen_readonly = True |
| typ = typ.item |
| return typ, is_required, readonly |
| |
| def check_typeddict( |
| self, node: Expression, var_name: str | None, is_func_scope: bool |
| ) -> tuple[bool, TypeInfo | None, list[TypeVarLikeType]]: |
| """Check if a call defines a TypedDict. |
| |
| The optional var_name argument is the name of the variable to |
| which this is assigned, if any. |
| |
| Return a pair (is it a typed dict, corresponding TypeInfo). |
| |
| If the definition is invalid but looks like a TypedDict, |
| report errors but return (some) TypeInfo. If some type is not ready, |
| return (True, None). |
| """ |
| if not isinstance(node, CallExpr): |
| return False, None, [] |
| call = node |
| callee = call.callee |
| if not isinstance(callee, RefExpr): |
| return False, None, [] |
| fullname = callee.fullname |
| if fullname not in TPDICT_NAMES: |
| return False, None, [] |
| res = self.parse_typeddict_args(call) |
| if res is None: |
| # This is a valid typed dict, but some type is not ready. |
| # The caller should defer this until next iteration. |
| return True, None, [] |
| name, items, types, total, tvar_defs, ok = res |
| if not ok: |
| # Error. Construct dummy return value. |
| if var_name: |
| name = var_name |
| if is_func_scope: |
| name += "@" + str(call.line) |
| else: |
| name = var_name = "TypedDict@" + str(call.line) |
| info = self.build_typeddict_typeinfo(name, {}, set(), set(), call.line, None) |
| else: |
| if var_name is not None and name != var_name: |
| self.fail( |
| 'First argument "{}" to TypedDict() does not match variable name "{}"'.format( |
| name, var_name |
| ), |
| node, |
| code=codes.NAME_MATCH, |
| ) |
| if name != var_name or is_func_scope: |
| # Give it a unique name derived from the line number. |
| name += "@" + str(call.line) |
| required_keys = { |
| field |
| for (field, t) in zip(items, types) |
| if (total or (isinstance(t, RequiredType) and t.required)) |
| and not (isinstance(t, RequiredType) and not t.required) |
| } |
| readonly_keys = { |
| field for (field, t) in zip(items, types) if isinstance(t, ReadOnlyType) |
| } |
| types = [ # unwrap Required[T] or ReadOnly[T] to just T |
| t.item if isinstance(t, (RequiredType, ReadOnlyType)) else t for t in types |
| ] |
| |
| # Perform various validations after unwrapping. |
| for t in types: |
| check_for_explicit_any( |
| t, self.options, self.api.is_typeshed_stub_file, self.msg, context=call |
| ) |
| if self.options.disallow_any_unimported: |
| for t in types: |
| if has_any_from_unimported_type(t): |
| self.msg.unimported_type_becomes_any("Type of a TypedDict key", t, call) |
| |
| existing_info = None |
| if isinstance(node.analyzed, TypedDictExpr): |
| existing_info = node.analyzed.info |
| info = self.build_typeddict_typeinfo( |
| name, |
| dict(zip(items, types)), |
| required_keys, |
| readonly_keys, |
| call.line, |
| existing_info, |
| ) |
| info.line = node.line |
| # Store generated TypeInfo under both names, see semanal_namedtuple for more details. |
| if name != var_name or is_func_scope: |
| self.api.add_symbol_skip_local(name, info) |
| if var_name: |
| self.api.add_symbol(var_name, info, node) |
| call.analyzed = TypedDictExpr(info) |
| call.analyzed.set_line(call) |
| return True, info, tvar_defs |
| |
| def parse_typeddict_args( |
| self, call: CallExpr |
| ) -> tuple[str, list[str], list[Type], bool, list[TypeVarLikeType], bool] | None: |
| """Parse typed dict call expression. |
| |
| Return names, types, totality, was there an error during parsing. |
| If some type is not ready, return None. |
| """ |
| # TODO: Share code with check_argument_count in checkexpr.py? |
| args = call.args |
| if len(args) < 2: |
| return self.fail_typeddict_arg("Too few arguments for TypedDict()", call) |
| if len(args) > 3: |
| return self.fail_typeddict_arg("Too many arguments for TypedDict()", call) |
| # TODO: Support keyword arguments |
| if call.arg_kinds not in ([ARG_POS, ARG_POS], [ARG_POS, ARG_POS, ARG_NAMED]): |
| return self.fail_typeddict_arg("Unexpected arguments to TypedDict()", call) |
| if len(args) == 3 and call.arg_names[2] != "total": |
| return self.fail_typeddict_arg( |
| f'Unexpected keyword argument "{call.arg_names[2]}" for "TypedDict"', call |
| ) |
| if not isinstance(args[0], StrExpr): |
| return self.fail_typeddict_arg( |
| "TypedDict() expects a string literal as the first argument", call |
| ) |
| if not isinstance(args[1], DictExpr): |
| return self.fail_typeddict_arg( |
| "TypedDict() expects a dictionary literal as the second argument", call |
| ) |
| total: bool | None = True |
| if len(args) == 3: |
| total = require_bool_literal_argument(self.api, call.args[2], "total") |
| if total is None: |
| return "", [], [], True, [], False |
| dictexpr = args[1] |
| tvar_defs = self.api.get_and_bind_all_tvars([t for k, t in dictexpr.items]) |
| res = self.parse_typeddict_fields_with_types(dictexpr.items) |
| if res is None: |
| # One of the types is not ready, defer. |
| return None |
| items, types, ok = res |
| assert total is not None |
| return args[0].value, items, types, total, tvar_defs, ok |
| |
| def parse_typeddict_fields_with_types( |
| self, dict_items: list[tuple[Expression | None, Expression]] |
| ) -> tuple[list[str], list[Type], bool] | None: |
| """Parse typed dict items passed as pairs (name expression, type expression). |
| |
| Return names, types, was there an error. If some type is not ready, return None. |
| """ |
| seen_keys = set() |
| items: list[str] = [] |
| types: list[Type] = [] |
| for field_name_expr, field_type_expr in dict_items: |
| if isinstance(field_name_expr, StrExpr): |
| key = field_name_expr.value |
| items.append(key) |
| if key in seen_keys: |
| self.fail(f'Duplicate TypedDict key "{key}"', field_name_expr) |
| seen_keys.add(key) |
| else: |
| name_context = field_name_expr or field_type_expr |
| self.fail_typeddict_arg("Invalid TypedDict() field name", name_context) |
| return [], [], False |
| try: |
| type = expr_to_unanalyzed_type( |
| field_type_expr, self.options, self.api.is_stub_file |
| ) |
| except TypeTranslationError: |
| self.fail_typeddict_arg("Use dict literal for nested TypedDict", field_type_expr) |
| return [], [], False |
| analyzed = self.api.anal_type( |
| type, |
| allow_typed_dict_special_forms=True, |
| allow_placeholder=not self.api.is_func_scope(), |
| prohibit_self_type="TypedDict item type", |
| prohibit_special_class_field_types="TypedDict", |
| ) |
| if analyzed is None: |
| return None |
| types.append(analyzed) |
| return items, types, True |
| |
| def fail_typeddict_arg( |
| self, message: str, context: Context |
| ) -> tuple[str, list[str], list[Type], bool, list[TypeVarLikeType], bool]: |
| self.fail(message, context) |
| return "", [], [], True, [], False |
| |
| def build_typeddict_typeinfo( |
| self, |
| name: str, |
| item_types: dict[str, Type], |
| required_keys: set[str], |
| readonly_keys: set[str], |
| line: int, |
| existing_info: TypeInfo | None, |
| ) -> TypeInfo: |
| # Prefer typing then typing_extensions if available. |
| fallback = ( |
| self.api.named_type_or_none("typing._TypedDict", []) |
| or self.api.named_type_or_none("typing_extensions._TypedDict", []) |
| or self.api.named_type_or_none("mypy_extensions._TypedDict", []) |
| ) |
| assert fallback is not None |
| info = existing_info or self.api.basic_new_typeinfo(name, fallback, line) |
| typeddict_type = TypedDictType(item_types, required_keys, readonly_keys, fallback) |
| if info.special_alias and has_placeholder(info.special_alias.target): |
| self.api.process_placeholder( |
| None, "TypedDict item", info, force_progress=typeddict_type != info.typeddict_type |
| ) |
| info.update_typeddict_type(typeddict_type) |
| return info |
| |
| # Helpers |
| |
| def is_typeddict(self, expr: Expression) -> bool: |
| return isinstance(expr, RefExpr) and ( |
| isinstance(expr.node, TypeInfo) |
| and expr.node.typeddict_type is not None |
| or isinstance(expr.node, TypeAlias) |
| and isinstance(get_proper_type(expr.node.target), TypedDictType) |
| ) |
| |
| def fail(self, msg: str, ctx: Context, *, code: ErrorCode | None = None) -> None: |
| self.api.fail(msg, ctx, code=code) |
| |
| def note(self, msg: str, ctx: Context) -> None: |
| self.api.note(msg, ctx) |