| # Copyright 2023 The Fuchsia Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from dataclasses import dataclass |
| from enum import IntEnum |
| import inspect |
| import re |
| import sys |
| import typing |
| from typing import List, Tuple |
| |
| import fuchsia_controller_py as fc |
| |
| # These can be updated to use TypeAlias when python is updated to 3.10+ |
| TXID_Type = int |
| Ordinal = int |
| |
| FidlMessage = Tuple[bytearray, List[fc.Channel]] |
| |
| # The number of bytes in a FIDL header. |
| FIDL_HEADER_SIZE = 8 |
| # The number of bytes in a FIDL ordinal. |
| FIDL_ORDINAL_SIZE = 8 |
| FIDL_EPITAPH_ORDINAL = 0xFFFFFFFFFFFFFFFF |
| |
| |
| @dataclass |
| class MethodInfo: |
| name: str |
| request_ident: str |
| requires_response: bool |
| empty_response: bool |
| has_result: bool |
| response_identifier: str |
| |
| |
| class StopServer(Exception): |
| """An exception used to stop a server loop from continuing. |
| |
| This closes the underlying channel as well. |
| """ |
| |
| |
| @dataclass |
| class DomainError: |
| """A class used to wrap returning an error from a two-way method.""" |
| |
| error: typing.Any |
| |
| |
| class StopEventHandler(Exception): |
| """An exception used to stop an event handler from continuing.""" |
| |
| |
| def internal_kind_to_type(internal_kind: str): |
| # TODO(https://fxbug.dev/109789): Remove "transport_error". |
| if internal_kind == "framework_error" or internal_kind == "transport_error": |
| return FrameworkError |
| raise RuntimeError(f"Unrecognized internal type: {internal_kind}") |
| |
| |
| class EpitaphError(fc.ZxStatus): |
| """An exception received when an epitaph has been sent on the channel.""" |
| |
| |
| class FrameworkError(IntEnum): |
| UNKNOWN_METHOD = -2 |
| |
| |
| @dataclass |
| class GenericResult: |
| fidl_type: str |
| response: typing.Optional[object] = None |
| err: typing.Optional[object] = None |
| framework_err: FrameworkError | None = None |
| |
| @property |
| def __fidl_type__(self): |
| return self.fidl_type |
| |
| |
| def parse_txid(msg: FidlMessage): |
| (b, _) = msg |
| return int.from_bytes(b[0:4], sys.byteorder) |
| |
| |
| def parse_ordinal(msg: FidlMessage): |
| (b, _) = msg |
| start = FIDL_HEADER_SIZE |
| end = FIDL_HEADER_SIZE + FIDL_ORDINAL_SIZE |
| return int.from_bytes(b[start:end], sys.byteorder) |
| |
| |
| def parse_epitaph_value(msg: FidlMessage): |
| (b, _) = msg |
| start = FIDL_HEADER_SIZE + FIDL_ORDINAL_SIZE |
| end = start + 4 |
| res = int.from_bytes(b[start:end], sys.byteorder) |
| |
| # Do two's complement here to get the true value if it's negative. |
| if res & (1 << 31) != 0: |
| return res - (1 << 32) |
| return res |
| |
| |
| def camel_case_to_snake_case(s: str): |
| return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower() |
| |
| |
| def make_default_obj_from_ident(ident): |
| """Takes a FIDL identifier, e.g. foo.bar/Baz, returns the default object (all fields None). |
| |
| Args: |
| ident: The FIDL identifier. |
| |
| Returns: |
| The default object construction (all fields None). |
| """ |
| # If there is not identifier then this is for a two way method that returns (). |
| if not ident: |
| return None |
| split = ident.split("/") |
| library = "fidl." + split[0].replace(".", "_") |
| ty = split[1] |
| mod = sys.modules[library] |
| obj_ty = getattr(mod, ty) |
| return make_default_obj(obj_ty) |
| |
| |
| def construct_response_object( |
| response_ident: str | None, response_obj: typing.Any | None |
| ): |
| obj = make_default_obj_from_ident(response_ident) |
| if obj is not None: |
| construct_result(obj, response_obj) |
| return obj |
| |
| |
| def unwrap_type(ty): |
| """Takes a type `ty`, then removes the meta-typing surrounding it. |
| |
| Args: |
| ty: a Python type. |
| |
| Returns: |
| The Python type after removing indirection. |
| |
| This is because when a user imports fidl.[foo_library], they may import a recursive type, which |
| cannot be defined at runtime. This will then return an actual type (since everything will be |
| resolvable at this point). |
| """ |
| while True: |
| try: |
| ty = typing.get_args(ty)[0] |
| except IndexError: |
| if ty.__class__ is typing.ForwardRef: |
| return ty.__forward_arg__ |
| return ty |
| |
| |
| def get_type_from_import(i): |
| """Takes an import and returns the Python type. |
| |
| Args: |
| i: The python FIDL import string, e.g. "fidl.foo_bar_baz.Mumble" and returns the type class |
| Mumble. |
| |
| Returns: |
| The Python type class of the object. |
| """ |
| module_path = i.split(".") |
| fidl_import_path = f"{module_path[0]}.{module_path[1]}" |
| mod = sys.modules[fidl_import_path] |
| obj = mod |
| for attr in module_path[2:]: |
| obj = getattr(obj, attr) |
| return obj |
| |
| |
| def construct_from_name_and_type(constructed_obj, sub_parsed_obj, name, ty): |
| unwrapped_ty = unwrap_type(ty) |
| unwrapped_module = unwrapped_ty.__module__ |
| if unwrapped_module.startswith("fidl."): |
| obj = get_type_from_import( |
| f"{unwrapped_module}.{unwrapped_ty.__name__}" |
| ) |
| if isinstance(sub_parsed_obj, dict): |
| sub_obj = make_default_obj(obj) |
| construct_result(sub_obj, sub_parsed_obj) |
| setattr(constructed_obj, name, sub_obj) |
| elif isinstance(sub_parsed_obj, list): |
| results = [] |
| for item in sub_parsed_obj: |
| sub_obj = make_default_obj(obj) |
| construct_result(sub_obj, item) |
| results.append(sub_obj) |
| setattr(constructed_obj, name, results) |
| else: |
| setattr(constructed_obj, name, sub_parsed_obj) |
| else: |
| setattr(constructed_obj, name, sub_parsed_obj) |
| |
| |
| def construct_result(constructed_obj, parsed_obj): |
| if constructed_obj.__fidl_kind__ == "union": |
| key = next(iter(parsed_obj.keys())) |
| sub_obj_type = getattr(constructed_obj, f"{key}_type") |
| sub_parsed_obj = parsed_obj[key] |
| setattr(constructed_obj, key, None) |
| construct_from_name_and_type( |
| constructed_obj, sub_parsed_obj, key, sub_obj_type |
| ) |
| # Since there is only one item for the union, no need to continue with iterating |
| # elements. |
| return |
| elements = type(constructed_obj).__annotations__ |
| for name, ty in elements.items(): |
| if parsed_obj.get(name) is None: |
| setattr(constructed_obj, name, None) |
| continue |
| sub_parsed_obj = parsed_obj[name] |
| construct_from_name_and_type(constructed_obj, sub_parsed_obj, name, ty) |
| |
| |
| def make_default_obj(object_ty): |
| """Takes a type `object_ty` and creates the default __init__ implementation of the object. |
| |
| Args: |
| object_ty: The type of object which is being constructed (this is also the type of the |
| return value). |
| |
| Returns: |
| The default (all fields None) object created from object_ty. |
| |
| For example, if the object is a struct, it will return the "default" version of the struct, |
| where all fields are set to None, regardless what the field type is. |
| """ |
| sig = inspect.signature(object_ty.__init__) |
| args: typing.Dict[str, typing.Any | None] = {} |
| for arg in sig.parameters: |
| if str(arg) == "self": |
| continue |
| args[str(arg)] = None |
| if not args: |
| return object_ty() |
| try: |
| return object_ty(**args) |
| except TypeError: |
| # Object might accept *args/**kwargs, so use empty constructor. |
| return object_ty() |