blob: 7459f73e587a253f41644eaa92d918d1a66c4a0e [file]
# Copyright 2023 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""The library module handles creating Python classes and types based on FIDL IR rules."""
from __future__ import annotations
import dataclasses
import enum
import inspect
import json
import keyword
import os
import sys
import types
import typing
from typing import (
Any,
Callable,
Dict,
ForwardRef,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
)
from fidl_codec import add_ir_path
from fidl_codec import encode_fidl_object
from fuchsia_controller_py import Context
from ._client import EventHandlerBase
from ._client import FidlClient
from ._fidl_common import camel_case_to_snake_case
from ._fidl_common import internal_kind_to_type
from ._fidl_common import MethodInfo
from ._server import ServerBase
FIDL_IR_PATH_ENV: str = "FIDL_IR_PATH"
LIB_MAP: Dict[str, str] = {}
MAP_INIT = False
# Defines a mapping from import names, e.g. "fidl.foo_bar_baz" to IR representations.
#
# load_ir_from_import should be the only function that touches the IR_MAP.
IR_MAP: Dict[str, IR] = {}
class Method(dict):
"""A light wrapper around a dict that represents a FIDL method."""
def __init__(self, parent_ir, json_dict):
super().__init__(json_dict)
self.parent_ir = parent_ir
def __getitem__(self, key) -> typing.Any:
res = super().__getitem__(key)
if key == "identifier":
return normalize_identifier(res)
return res
def has_response(self) -> bool:
"""Returns True if the method has a response."""
return bool(self["has_response"])
def has_request(self) -> bool:
"""Returns True if the method has a request."""
return bool(self["has_request"])
def has_result(self) -> bool:
"""Returns True if the method has a result.
This is different from whether or not a method has a response, because a result is something
that can return an error (technically it's a union with two different values).
"""
return bool(self["has_error"])
def request_payload_identifier(self) -> str | None:
"""Attempts to lookup the payload identifier if it exists.
Returns:
None if there is no identifier, else an identifier string.
"""
assert "maybe_request_payload" in self
payload = self.maybe_request_payload()
if not payload:
return None
return payload.identifier()
def response_payload_raw_identifier(self) -> str | None:
"""Attempts to lookup the response payload identifier if it exists.
Returns:
None if there is no identifier, else an identifier string.
"""
if not "maybe_response_payload" in self:
return None
payload = self.maybe_response_payload()
return payload.raw_identifier() if payload is not None else None
def maybe_response_payload(self) -> IR | None:
if not "maybe_response_payload" in self:
return None
return IR(self.parent_ir, self["maybe_response_payload"])
def maybe_request_payload(self) -> IR | None:
if not "maybe_request_payload" in self:
return None
return IR(self.parent_ir, self["maybe_request_payload"])
def ordinal(self) -> int:
return self["ordinal"]
def name(self) -> str:
return self["name"]
class IR(dict):
"""A light wrapper around a dict that contains some convenience lookup methods."""
def __init__(self, path, json_dict):
super().__init__(json_dict)
self.path = path
if "library_dependencies" in self:
# The names of these fields are specific to how they are declared in the IR. This is so
# they can be programmatically looked up through reflection.
#
# See _sorted_type_declarations for an example of looking these fields up.
for decl in [
"bits",
"enum",
"struct",
"table",
"union",
"const",
"alias",
"protocol",
"experimental_resource",
]:
setattr(self, f"{decl}_decls", self._decl_dict(decl))
def __getitem__(self, key):
res = super().__getitem__(key)
if key == "identifier":
return normalize_identifier(res)
if type(res) == dict:
return IR(self.path, res)
if type(res) == list and res and type(res[0]) == dict:
return [IR(self.path, x) for x in res]
return res
def _decl_dict(self, ty: str) -> Dict[str, IR]:
return {x["name"]: IR(self.path, x) for x in self[f"{ty}_declarations"]}
def name(self) -> str:
return normalize_identifier(self["name"])
def identifier(self) -> str:
return normalize_identifier(self["identifier"])
def raw_identifier(self) -> str:
return super().__getitem__("identifier")
def methods(self) -> List[Method]:
return [Method(self, x) for x in self["methods"]]
def declaration(self, identifier: str) -> Optional[str]:
"""Returns the declaration from the set of 'declarations,' None if not in the set.
Args:
identifier: The FIDL identifier, e.g. foo.bar/Baz to denote the Baz struct from library
foo.bar. This expects a raw_identifier (which may contain underscores like _Result at
the end of the name).
Returns:
The identifier's declaration type, or None if not found. The declaration type is a FIDL
type declaration, e.g. "const," "struct," "table," etc.
"""
return self["declarations"].get(identifier)
def _sorted_type_declarations(self, ty: str) -> List[IR]:
"""Returns type declarations in their IR sorted order.
Args:
ty: The type of the declaration as a string, e.g. "const," "table," "union," etc.
Returns:
The declarations within this IR library, sorted by their dependency order (in order of
least dependencies to most dependencies. An increase in dependencies means that a type
is composited with more elements).
This ensures that declarations, when being exported as types for the given module, are
constructed in the correct dependency order.
"""
return [
getattr(self, f"{ty}_decls")[x]
for x in self["declaration_order"]
if self.declaration(x) == ty
]
def protocol_declarations(self) -> List[IR]:
return self._sorted_type_declarations("protocol")
def union_declarations(self) -> List[IR]:
return self._sorted_type_declarations("union")
def const_declarations(self) -> List[IR]:
return self._sorted_type_declarations("const")
def bits_declarations(self) -> List[IR]:
return self._sorted_type_declarations("bits")
def enum_declarations(self) -> List[IR]:
return self._sorted_type_declarations("enum")
def struct_declarations(self) -> List[IR]:
return self._sorted_type_declarations("struct")
def table_declarations(self) -> List[IR]:
return self._sorted_type_declarations("table")
def alias_declarations(self) -> List[IR]:
return self._sorted_type_declarations("alias")
def experimental_resource_declarations(self) -> List[IR]:
return self._sorted_type_declarations("experimental_resource")
def resolve_kind(self, key) -> Tuple[str, str]:
"""Iteratively attempts to resolve the passed kind.
Returns not only the kind, but the IR in which the kind was found.
"""
kind = self.declaration(key)
ir = self
# If the kind is none, then it is declared in a separate library and must be imported and
# further unwrapped.
while kind is None:
library = fidl_ident_to_py_import(key)
ir = load_ir_from_import(library)
kind = ir.declaration(key)
return (
kind,
next(d for d in ir[f"{kind}_declarations"] if d["name"] == key),
)
def get_fidl_ir_map() -> Mapping[str, str]:
"""Returns a singleton mapping of library names to FIDL files."""
global MAP_INIT
if MAP_INIT:
return LIB_MAP
ctx = Context()
# TODO(b/308723467): Handle multiple paths.
default_ir_path = ctx.config_get_string("fidl.ir.path")
if not default_ir_path:
if FIDL_IR_PATH_ENV in os.environ:
default_ir_path = os.environ[FIDL_IR_PATH_ENV]
else:
# TODO(b/311250297): Remove last resort backstop for unconfigured
# in-tree build config
default_ir_path = "fidling/gen/ir_root"
if not os.path.isdir(default_ir_path):
raise RuntimeError(
f"Unable to find IR path root dir at '{default_ir_path}'"
)
for _, dirs, _ in os.walk(default_ir_path):
for d in dirs:
LIB_MAP[d] = os.path.join(default_ir_path, d, f"{d}.fidl.json")
MAP_INIT = True
return LIB_MAP
def string_to_basetype(t: str) -> type:
"""Takes a base type like int32, bool, etc, and returns a Python type encapsulating it.
Examples:
"int32" would return the type `int`.
"float64" would return the type `bool`.
Returns:
The type represented by the string.
"""
if t.startswith("int") or t.startswith("uint"):
return int
elif t.startswith("float"):
return float
elif t == "bool":
return bool
else:
raise Exception(f"Unsupported subtype: {t}")
def fidl_import_to_fidl_library(name: str) -> str:
"""Converts a fidl import, e.g. fidl.foo_bar_baz, to a fidl library: 'foo.bar.baz'"""
assert name.startswith("fidl.")
short_name = name[len("fidl.") :]
short_name = short_name.replace("_", ".")
return short_name
def fidl_import_to_library_path(name: str) -> str:
"""Returns a fidl IR path based on the fidl import name."""
try:
return get_fidl_ir_map()[fidl_import_to_fidl_library(name)]
except KeyError:
raise ImportError(
f"Unable to import library {name}."
+ " Please ensure that the FIDL IR for this library has been created."
)
def type_annotation(type_ir, root_ir, recurse_guard=None) -> type:
"""Attempts to turn a type's IR representation into a type annotation for class constructions."""
def wrap_optional(annotation):
try:
if type(annotation) == str:
annotation = ForwardRef(annotation)
if type_ir["nullable"]:
return Optional[annotation]
except KeyError:
pass
finally:
return annotation
kind = type_ir["kind"]
if kind == "identifier":
ident = type_ir.raw_identifier()
ident_kind = get_kind_by_identifier(ident, root_ir)
ty = get_type_by_identifier(ident, root_ir, recurse_guard)
if ident_kind == "bits":
ty = Union[ty, Set[ty]]
return wrap_optional(ty)
elif kind == "primitive":
return string_to_basetype(type_ir["subtype"])
elif kind == "handle":
return wrap_optional(f"zx.{type_ir['subtype']}")
elif kind == "string":
return wrap_optional(str)
elif kind == "vector" or kind == "array":
element_type = type_ir["element_type"]
if (
element_type["kind"] == "primitive"
and element_type["subtype"] == "uint8"
):
return wrap_optional(bytes)
else:
ty = type_annotation(element_type, root_ir, recurse_guard)
return wrap_optional(Sequence[ty])
elif kind == "request":
return wrap_optional(
fidl_ident_to_py_library_member(type_ir["subtype"]) + ".Server"
)
elif kind == "internal":
internal_kind = type_ir["subtype"]
return internal_kind_to_type(internal_kind)
raise TypeError(
f"As yet unsupported type in library {root_ir['name']}: {kind}"
)
def fidl_library_to_py_module_path(name: str) -> str:
"""Converts a fidl library, e.g. foo.bar.baz into a Python-friendly import: fidl.foo_bar_baz"""
return "fidl." + name.replace(".", "_")
def fidl_ident_to_library(name: str) -> str:
"""Takes a fidl identifier and returns the library: foo.bar.baz/Foo would return foo.bar.baz"""
return name.split("/")[0]
def fidl_ident_to_py_import(name: str) -> str:
"""Python import from fidl identifier: foo.bar.baz/Foo would return fidl.foo_bar_baz"""
fidl_lib = fidl_ident_to_library(name)
return fidl_library_to_py_module_path(fidl_lib)
def fidl_ident_to_marker(name: str) -> str:
"""Changes a fidl library member to a marker used for protocol lookup.
Returns: foo.bar.baz/Foo returns foo.bar.baz.Foo
"""
name = normalize_identifier(name)
return name.replace("/", ".")
def fidl_ident_to_py_library_member(name: str) -> str:
"""Returns fidl library member name from identifier: foo.bar.baz/Foo would return Foo"""
name = normalize_identifier(name)
return name.split("/")[1]
def docstring(decl, default: Optional[str] = None) -> Optional[str]:
"""Constructs docstring from a fidl's IR documentation declaration if it exists."""
doc_attr = next(
(
attr
for attr in decl.get("maybe_attributes", [])
if attr["name"] == "doc"
),
None,
)
if doc_attr is None:
return default
return doc_attr["arguments"][0]["value"]["value"].strip()
def bits_or_enum_root_type(ir, type_name: str) -> enum.EnumMeta:
"""Constructs a Python type from either bits or enums (they are quite similar so they bottom out
on this function)."""
name = fidl_ident_to_py_library_member(ir.name())
members = {
member["name"]: int(member["value"]["value"])
for member in ir["members"]
}
ty = enum.IntFlag(name, members)
setattr(ty, "__fidl_kind__", type_name)
setattr(ty, "__doc__", docstring(ir))
setattr(ty, "__members_for_aliasing__", members)
return ty
def experimental_resource_type(ir, root_ir, recurse_guard=None) -> type:
name = fidl_ident_to_py_library_member(ir.name())
ty = type(
name,
(int,),
{
"__doc__": docstring(ir),
"__fidl_kind__": "experimental_resource",
"__fidl_type__": ir.name(),
},
)
return ty
def bits_type(ir) -> enum.EnumMeta:
"""Constructs a Python type from a bits declaration in IR."""
return bits_or_enum_root_type(ir, "bits")
def enum_type(ir) -> enum.EnumMeta:
"""Constructs a Python type from a bits declaration in IR."""
return bits_or_enum_root_type(ir, "enum")
def _union_get_value(self):
"""Helper function that attempts to get the union value."""
items = [
m[0].replace("_type", "")
for m in inspect.getmembers(self)
if m[0].endswith("_type")
]
got = None
item = None
for i in items:
got = getattr(self, i)
item = i
if got is not None:
break
return item, got
def union_repr(self) -> str:
"""Returns the union repr in the format <'foo.bar.baz/FooUnion' object({value})>
If {value} is not set, will write None."""
key, value = _union_get_value(self)
string = f"{key}={repr(value)}"
if key is None and value is None:
string = "None"
return f"<'{self.__fidl_type__}' object({string})>"
def union_str(self) -> str:
"""Returns the union string representation, e.g. whatever the union type has been set to."""
key, value = _union_get_value(self)
string = f"{key}={str(value)}"
if key is None and value is None:
string = "None"
return f"{type(self).__name__}({string})"
def union_eq(self, other) -> bool:
if not isinstance(other, type(self)):
return False
items = [
m[0].replace("_type", "")
for m in inspect.getmembers(self)
if m[0].endswith("_type")
]
for item in items:
if getattr(self, item) == getattr(other, item):
return True
return False
def union_type(ir, root_ir, recurse_guard=None) -> type:
"""Constructs a Python type from a FIDL IR Union declaration."""
name = fidl_ident_to_py_library_member(ir.name())
base = type(
name,
(),
{
"__doc__": docstring(ir),
"__fidl_kind__": "union",
"__repr__": union_repr,
"__str__": union_str,
"__eq__": union_eq,
"__fidl_type__": ir.name(),
},
)
# TODO(https://fxbug.dev/127787): Prevent unions from having more than one value set at the same time.
# This is silently allowed during encoding, but should either be prevented during encoding, or
# prevented from within the object itself. If it cannot be prevented there should be some hook
# that sets the previous union variants to None.
for member in ir["members"]:
if member["reserved"]:
continue
member_name = member["name"]
member_type_name = member_name + "_type"
member_constructor_name = member_name + "_variant"
@classmethod
def ctor(cls, value, member_name=member_name):
res = cls()
setattr(res, member_name, value)
return res
setattr(
base,
member_type_name,
type_annotation(member["type"], root_ir, recurse_guard),
)
setattr(ctor, "__doc__", docstring(member))
setattr(base, member_constructor_name, ctor)
setattr(base, member_name, None)
return base
def normalize_member_name(name) -> str:
"""Prevents use of names for struct or table members that are already keywords"""
if name in keyword.kwlist:
return name + "_"
return name
def struct_and_table_subscript(self, item: str):
if not isinstance(item, str):
raise TypeError("Subscripted item must be a string")
return getattr(self, item)
def struct_type(ir, root_ir, recurse_guard=None) -> type:
"""Constructs a Python type from a FIDL IR struct declaration."""
name = fidl_ident_to_py_library_member(ir.name())
members = [
(
normalize_member_name(member["name"]),
type_annotation(member["type"], root_ir, recurse_guard),
)
for member in ir["members"]
]
ty = dataclasses.make_dataclass(name, members)
setattr(ty, "__fidl_kind__", "struct")
setattr(ty, "__fidl_type__", ir.name())
setattr(ty, "__doc__", docstring(ir))
setattr(ty, "__getitem__", struct_and_table_subscript)
return ty
def table_type(ir, root_ir, recurse_guard=None) -> type:
"""Constructs a Python type from a FIDL IR table declaration."""
name = fidl_ident_to_py_library_member(ir.name())
members = []
for member in ir["members"]:
if member["reserved"]:
continue
optional_ty = type_annotation(member["type"], root_ir, recurse_guard)
new_member = (
normalize_member_name(member["name"]),
Optional[optional_ty],
dataclasses.field(default=None),
)
members.append(new_member)
it: Iterable[Tuple[str, type, Any]] = members
ty = dataclasses.make_dataclass(name, it)
setattr(ty, "__fidl_kind__", "table")
setattr(ty, "__fidl_type__", ir.name())
setattr(ty, "__doc__", docstring(ir))
setattr(ty, "__getitem__", struct_and_table_subscript)
return ty
class FIDLConstant(object):
def __init__(self, name, value):
self.name = name
self.value = value
def primitive_converter(subtype: str) -> type:
if "int" in subtype:
return int
elif subtype == "bool":
return bool
elif "float" in subtype:
return float
raise TypeError(f"Unrecognized type: {subtype}")
def const_declaration(ir, root_ir, recurse_guard=None) -> FIDLConstant:
"""Constructs a Python type from a FIDL IR const declaration."""
name = fidl_ident_to_py_library_member(ir.name())
kind = ir["type"]["kind"]
if kind == "primitive":
converter = primitive_converter(ir["type"]["subtype"])
return FIDLConstant(name, converter(ir["value"]["value"]))
elif kind == "identifier":
ident = ir["type"].identifier()
ty = get_type_by_identifier(ident, root_ir, recurse_guard)
if type(ty) == str:
return FIDLConstant(name, ty(ir["value"]["value"]))
elif ty.__class__ == enum.EnumMeta:
return FIDLConstant(name, ty(int(ir["value"]["value"])))
raise TypeError(
f"As yet unsupported identifier type in lib '{root_ir['name']}': {type(ty)}"
)
elif kind == "string":
return FIDLConstant(name, ir["value"]["value"])
raise TypeError(
f"As yet unsupported type in library '{root_ir['name']}': {kind}"
)
def alias_declaration(ir, root_ir, recurse_guard=None) -> type:
"""Constructs a Python type from a FIDL IR alias declaration."""
name = fidl_ident_to_py_library_member(ir.name())
ctor = ir.get("partial_type_ctor")
if ctor:
ctor_type = ctor["name"]
try:
base_type = string_to_basetype(ctor_type)
except Exception:
if ctor_type == "string":
base_type = str
elif ctor_type == "vector":
# This can likely be annotated better, like constraining types.
# There is a doc explaining some of the limitations here at go/fidl-ir-aliases
# So for the time being this is just a generic list rather than anything specific
# Like the struct-creating code that builds vector annotations in a more rigorous
# way.
base_type = list
else:
base_type = get_type_by_identifier(ctor_type, root_ir)
if type(base_type) == enum.EnumType:
# This is a bit of a special case. Enum cannot be used as a base type when using the
# `type` operator.
ty = enum.IntFlag(name, base_type.__members_for_aliasing__)
setattr(ty, "__doc__", docstring(ir))
setattr(ty, "__fidl_kind__", "alias")
setattr(ty, "__fidl_type__", ir.name())
setattr(
ty,
"__members_for_aliasing__",
base_type.__members_for_aliasing__,
)
return ty
base_params = {
"__doc__": docstring(ir),
"__fidl_kind__": "alias",
"__fidl_type__": ir.name(),
}
return type(name, (base_type,), base_params)
def protocol_type(ir, root_ir, recurse_guard=None) -> type:
"""Constructs a Python type from a FIDL IR protocol declaration."""
name = fidl_ident_to_py_library_member(ir.name())
return type(
name,
(object,),
{
"__doc__": docstring(ir),
"__fidl_kind__": "protocol",
"Client": protocol_client_type(ir, root_ir),
"Server": protocol_server_type(ir, root_ir),
"EventHandler": protocol_event_handler_type(ir, root_ir),
"MARKER": fidl_ident_to_marker(ir.name()),
},
)
def protocol_event_handler_type(ir: IR, root_ir) -> type:
name = fidl_ident_to_py_library_member(ir.name())
properties = {
"__doc__": docstring(ir),
"__fidl_kind__": "event_handler",
"library": root_ir.name(),
"method_map": {},
}
for method in ir.methods():
# Methods without a request are event methods.
if method.has_request():
continue
method_snake_case = camel_case_to_snake_case(method.name())
properties[method_snake_case] = event_method(
method, root_ir, get_fidl_request_server_lambda
)
ident = ""
# The IR uses direction-based terminology, so an event is a method where
# "has_request" is false and "has_response" is true (server -> client).
# We are moving towards sequence-based terminology, where "request"
# always means the initiating message. That's why the generated type is
# named as *Request, but we use the "response" IR fields.
# TODO(https://fxbug.dev/7660): Remove this comment when the IR is updated.
if "maybe_response_payload" in method:
ident = method.response_payload_raw_identifier()
properties["method_map"][method.ordinal()] = MethodInfo(
name=method_snake_case,
request_ident=ident,
requires_response=False,
empty_response=False,
has_result=False,
response_identifier=None,
)
return type(name, (EventHandlerBase,), properties)
def protocol_server_type(ir: IR, root_ir) -> type:
name = fidl_ident_to_py_library_member(ir.name())
properties = {
"__doc__": docstring(ir),
"__fidl_kind__": "server",
"library": root_ir.name(),
"method_map": {},
}
for method in ir.methods():
method_snake_case = camel_case_to_snake_case(method.name())
if not method.has_request():
# This is an event. It is callable as a one-way method.
properties[method_snake_case] = event_method(
method, root_ir, send_event_lambda
)
continue
properties[method_snake_case] = protocol_method(
method, root_ir, get_fidl_request_server_lambda
)
ident = ""
if "maybe_request_payload" in method:
ident = method.request_payload_identifier()
properties["method_map"][method.ordinal()] = MethodInfo(
name=method_snake_case,
request_ident=ident,
requires_response=method.has_response()
and "maybe_response_payload" in method,
empty_response=method.has_response()
and "maybe_response_payload" not in method,
has_result=method.has_result(),
response_identifier=method.response_payload_raw_identifier(),
)
return type(name, (ServerBase,), properties)
def protocol_client_type(ir: IR, root_ir) -> type:
name = fidl_ident_to_py_library_member(ir.name())
properties = {
"__doc__": docstring(ir),
"__fidl_kind__": "client",
}
for method in ir.methods():
if not method.has_request():
# This is an event. This needs to be handled on its own.
continue
method_snake_case = camel_case_to_snake_case(method.name())
properties[method_snake_case] = protocol_method(
method, root_ir, get_fidl_request_client_lambda
)
return type(name, (FidlClient,), properties)
def get_fidl_method_response_payload_ident(ir: Method, root_ir) -> str:
assert ir.has_response()
response_ident = ""
if ir.get("maybe_response_payload"):
response_kind = ir.maybe_response_payload()["kind"]
if response_kind == "identifier":
ident = ir.maybe_response_payload().raw_identifier()
# Just ensures the module for this is going to be imported.
get_kind_by_identifier(ident, root_ir)
response_ident = normalize_identifier(ident)
else:
response_ident = response_kind
return response_ident
def get_fidl_request_client_lambda(ir: Method, root_ir, msg) -> Callable:
if ir.has_response():
response_ident = get_fidl_method_response_payload_ident(ir, root_ir)
if msg:
return lambda self, **args: self._send_two_way_fidl_request(
ir["ordinal"], root_ir.name(), msg(**args), response_ident
)
return lambda self: self._send_two_way_fidl_request(
ir["ordinal"], root_ir.name(), msg, response_ident
)
if msg:
return lambda self, **args: self._send_one_way_fidl_request(
0, ir["ordinal"], root_ir.name(), msg(**args)
)
return lambda self: self._send_one_way_fidl_request(
0, ir["ordinal"], root_ir.name(), msg
)
def send_event_lambda(method: Method, root_ir: IR, msg) -> Callable:
assert not method.has_request()
if msg:
return lambda self, *args, **kwargs: self._send_event(
method["ordinal"], root_ir.name(), msg(*args, **kwargs)
)
return lambda self: self._send_event(method["ordinal"], root_ir.name(), msg)
def get_fidl_request_server_lambda(ir: Method, root_ir, msg) -> Callable:
snake_case_name = camel_case_to_snake_case(ir.name())
if msg:
def server_lambda(self, request):
raise NotImplementedError(
f"Method {snake_case_name} not implemented"
)
return lambda self, request: server_lambda(self, request)
else:
def server_lambda(self):
raise NotImplementedError(
f"Method {snake_case_name} not implemented"
)
return lambda self: server_lamdba(self)
def normalize_identifier(identifier: str) -> str:
"""Takes an identifier and attempts to normalize it.
For the average identifier this shouldn't do anything. This only applies to result types
that have underscores in their names.
Returns: The normalized identifier string (sans-underscores).
"""
if identifier.endswith("_Result") or identifier.endswith("_Response"):
return identifier.replace("_", "")
return identifier
def event_method(
method: Method,
root_ir: IR,
lambda_constructor: Callable,
recurse_guard=None,
) -> Callable:
assert not method.has_request()
if "maybe_response_payload" in method:
payload_id = method.response_payload_raw_identifier()
(payload_kind, payload_ir) = root_ir.resolve_kind(payload_id)
else:
payload_id = None
payload_kind = None
payload_ir = None
return create_method(
method,
root_ir,
payload_id,
payload_kind,
payload_ir,
lambda_constructor,
recurse_guard,
)
def protocol_method(
method: Method, root_ir, lambda_constructor: Callable, recurse_guard=None
) -> Callable:
assert method.has_request()
if "maybe_request_payload" in method:
payload_id = method.request_payload_identifier()
(payload_kind, payload_ir) = root_ir.resolve_kind(payload_id)
else:
payload_id = None
payload_kind = None
payload_ir = None
return create_method(
method,
root_ir,
payload_id,
payload_kind,
payload_ir,
lambda_constructor,
recurse_guard,
)
def create_method(
method: Method,
root_ir: IR,
payload_id: str,
payload_kind: str,
payload_ir: IR,
lambda_constructor: Callable,
recurse_guard=None,
):
if payload_kind == "struct":
params = [
inspect.Parameter(
normalize_member_name(member["name"]),
inspect.Parameter.KEYWORD_ONLY,
annotation=type_annotation(
member["type"], root_ir, recurse_guard
),
)
for member in payload_ir["members"]
]
method_impl = lambda_constructor(
method, root_ir, get_type_by_identifier(payload_id, root_ir)
)
elif payload_kind == "table":
params = [
inspect.Parameter(
member["name"],
inspect.Parameter.KEYWORD_ONLY,
default=None,
annotation=type_annotation(
member["type"], root_ir, recurse_guard
),
)
for member in payload_ir["members"]
if not member["reserved"]
]
method_impl = lambda_constructor(
method, root_ir, get_type_by_identifier(payload_id, root_ir)
)
elif payload_kind == "union":
params = [
inspect.Parameter(
member["name"],
inspect.Parameter.POSITIONAL_ONLY,
annotation=type_annotation(
member["type"], root_ir, recurse_guard
),
)
for member in payload_ir["members"]
if not member["reserved"]
]
method_impl = lambda_constructor(
method, root_ir, get_type_by_identifier(payload_id, root_ir)
)
elif payload_kind == None:
params = []
method_impl = lambda_constructor(method, root_ir, None)
else:
raise RuntimeError(
f"Unrecognized method parameter kind: {payload_kind}"
)
setattr(method_impl, "__signature__", inspect.Signature(params))
setattr(method_impl, "__doc__", docstring(method))
setattr(method_impl, "__fidl_type__", method.name())
setattr(method_impl, "__name__", method.name())
return method_impl
def load_ir_from_import(import_name: str) -> IR:
"""Takes an import name, loads/caches the IR, and returns it."""
lib = fidl_import_to_library_path(import_name)
if lib not in IR_MAP:
with open(lib, "r", encoding="UTF-8") as f:
IR_MAP[lib] = IR(lib, json.load(f))
return IR_MAP[lib]
def get_kind_by_identifier(ident: str, loader_ir) -> str:
"""Takes a fidl identifier, e.g. foo.bar.baz/Foo and returns its 'kind'.
This expects a raw identifier (e.g. not one that has been normalized).
e.g. "struct," "table," etc."""
res = loader_ir.declaration(ident)
if res is not None:
return res
return get_type_by_identifier(ident, loader_ir).__fidl_kind__
def get_type_by_identifier(ident: str, loader_ir, recurse_guard=None) -> type:
"""Takes a identifier, e.g. foo.bar.baz/Foo and returns its Python type."""
member_name = fidl_ident_to_py_library_member(ident)
mod = load_module(fidl_ident_to_py_import(ident))
if not hasattr(mod, member_name):
if recurse_guard is not None:
return ForwardRef(member_name)
# library.name/Ident for example.
ident = f"{fidl_import_to_fidl_library(mod.__fullname__)}/{member_name}"
# e.g. protocol, struct, union, etc.
ty = mod.__ir__.declaration(ident)
# IR for library.name/Ident
ty_decl = getattr(mod.__ir__, f"{ty}_decls")[ident]
ty_definition = globals()[f"{ty}_type"](
ty_decl, mod.__ir__, recurse_guard=True
)
if ty == "const":
# This line might not actually be possible to hit.
mod.export_const(ty_definition)
else:
mod.export_type(ty_definition)
return getattr(mod, member_name)
def load_module(fullname: str) -> types.ModuleType:
if fullname not in sys.modules:
sys.modules[fullname] = FIDLLibraryModule(fullname)
return sys.modules[fullname]
class FIDLLibraryModule(types.ModuleType):
def __init__(self, fullname: str):
# Shove ourselves into the import map so that composite types can be looked up as they are
# exported.
sys.modules[fullname] = self
self.fullname = fullname
ir_path = fidl_import_to_library_path(fullname)
add_ir_path(ir_path)
self.__ir__ = load_ir_from_import(fullname)
self.__file__ = f"<FIDL JSON:{ir_path}>"
self.__fullname__ = fullname
super().__init__(
fullname,
docstring(self.__ir__, f"FIDL library {self.__ir__.name()}"),
)
self.__all__: List[str] = []
self.export_bits()
self.export_experimental_resources()
self.export_enums()
self.export_structs()
self.export_tables()
self.export_unions()
self.export_consts()
self.export_aliases()
self.export_protocols()
def export_protocols(self):
for decl in self.__ir__.protocol_declarations():
if decl.name() not in self.__all__:
self.export_type(protocol_type(decl, self.__ir__))
def export_structs(self):
for decl in self.__ir__.struct_declarations():
if decl.name() not in self.__all__:
self.export_type(struct_type(decl, self.__ir__))
def export_tables(self):
for decl in self.__ir__.table_declarations():
if decl.name() not in self.__all__:
self.export_type(table_type(decl, self.__ir__))
def export_experimental_resources(self):
for decl in self.__ir__.experimental_resource_declarations():
if decl.name() not in self.__all__:
self.export_type(experimental_resource_type(decl, self.__ir__))
def export_bits(self):
for decl in self.__ir__.bits_declarations():
if decl.name() not in self.__all__:
self.export_type(bits_type(decl))
def export_enums(self):
for decl in self.__ir__.enum_declarations():
if decl.name() not in self.__all__:
self.export_type(enum_type(decl))
def export_consts(self):
for decl in self.__ir__.const_declarations():
if decl.name() not in self.__all__:
self.export_fidl_const(const_declaration(decl, self.__ir__))
def export_aliases(self):
for decl in self.__ir__.alias_declarations():
if decl.name() not in self.__all__:
self.export_type(alias_declaration(decl, self.__ir__))
def export_unions(self):
for decl in self.__ir__.union_declarations():
if decl.name() not in self.__all__:
self.export_type(union_type(decl, self.__ir__))
def export_fidl_const(self, c):
setattr(self, c.name, c.value)
self.__all__.append(c.name)
def export_type(self, t):
def encode_func(obj):
library = obj.__module__
library = library.removeprefix("fidl.")
library = library.replace("_", ".")
type_name = f"{library}/{type(obj).__name__}"
return encode_fidl_object(obj, library, type_name)
setattr(t, "__module__", self.fullname)
setattr(t, "encode", encode_func)
setattr(self, t.__name__, t)
self.__all__.append(t.__name__)