blob: 5861951e74e4c5b91d777e761f439a8704710de2 [file] [log] [blame]
#!/usr/bin/python3
#
# Copyright (C) 2023 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import subprocess
from datetime import datetime
from utils import CHPP_PARSER_INCLUDE_PATH
from utils import CHPP_PARSER_SOURCE_PATH
from utils import LICENSE_HEADER
from utils import android_build_top_abs_path
from utils import system_chre_abs_path
class CodeGenerator:
"""Given an ApiParser object, generates a header file with structure definitions in CHPP format.
"""
def __init__(self, api, commit_hash):
"""
:param api: ApiParser object
"""
self.api = api
self.json = api.json
# Turn "chre_api/include/chre_api/chre/wwan.h" into "wwan"
self.service_name = self.json['filename'].split('/')[-1].split('.')[0]
self.capitalized_service_name = self.service_name.capitalize()
self.commit_hash = commit_hash
# ----------------------------------------------------------------------------------------------
# Header generation methods (plus some methods shared with encoder generation)
# ----------------------------------------------------------------------------------------------
def _autogen_notice(self):
out = []
out.append('// This file was automatically generated by {}\n'.format(
os.path.basename(__file__)))
out.append(
'// Date: {} UTC\n'.format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')))
out.append(
'// Source: {} @ commit {}\n\n'.format(self.json['filename'], self.commit_hash))
out.append(
'// DO NOT modify this file directly, as those changes will be lost the next\n')
out.append('// time the script is executed\n\n')
return out
def _dump_to_file(self, output_filename, contents, dry_run, skip_clang_fomat):
"""Outputs contents to output_filename, or prints contents if dry_run is True"""
if dry_run:
print('---- {} ----'.format(output_filename))
print(contents)
print('---- end of {} ----\n'.format(output_filename))
else:
with open(output_filename, 'w') as f:
f.write(contents)
if not skip_clang_fomat:
clang_format_path = (android_build_top_abs_path() +
'/prebuilts/clang/host/linux-x86/clang-stable/bin/clang-format')
args = [clang_format_path, '-i', output_filename]
result = subprocess.run(args)
result.check_returncode()
def _is_array_type(self, type_info):
# If this is an array type, declarators will be a tuple containing a list of
# a single int element giving the size of the array
return len(type_info.declarators) == 1 and isinstance(type_info.declarators[0], list)
def _get_array_len(self, type_info):
return type_info.declarators[0][0]
def _get_chpp_type_from_chre(self, chre_type):
"""Returns 'struct ChppWwanCellInfo', etc. given 'chreWwanCellInfo'"""
prefix = self._get_struct_or_union_prefix(chre_type)
# First see if we have an explicit name override (e.g. for anonymous types)
for annotation in self.api.annotations[chre_type]['.']:
if annotation['annotation'] == 'rename_type':
return prefix + annotation['type_override']
# Otherwise, use the existing type name, just replace the "chre" prefix with "Chpp"
if chre_type.startswith('chre'):
return prefix + 'Chpp' + chre_type[4:]
else:
raise RuntimeError(
"Couldn't figure out new type name for {}".format(chre_type))
def _get_chre_type_with_prefix(self, chre_type):
"""Returns 'struct chreWwanCellInfo', etc. given 'chreWwanCellInfo'"""
return self._get_struct_or_union_prefix(chre_type) + chre_type
def _get_chpp_header_type_from_chre(self, chre_type):
"""Returns 'struct ChppWwanCellInfoWithHeader', etc. given 'chreWwanCellInfo'"""
return self._get_chpp_type_from_chre(chre_type) + 'WithHeader'
def _get_member_comment(self, member_info):
for annotation in member_info['annotations']:
if annotation['annotation'] == 'fixed_value':
return ' // Input ignored; always set to {}'.format(annotation['value'])
elif annotation['annotation'] == 'var_len_array':
return ' // References {} instances of {}'.format(
annotation['length_field'], self._get_member_type(member_info))
return ''
def _get_member_type(self, member_info, underlying_vla_type=False):
"""Gets the CHPP type specification prefix for a struct/union member.
:param member_info: a dict element from self.api.structs_and_unions[struct]['members']
:param underlying_vla_type: (used only for var-len array types) False to output
'struct ChppOffset', and True to output the type that ChppOffset references
:return: type specification string that prefixes the field name, e.g. 'uint8_t'
"""
# 4 cases to handle:
# 1) Annotation gives explicit type that we should use
# 2) Annotation says this is a variable length array (so use ChppOffset if
# underlying_vla_type is False)
# 3) This is a struct/union type, so use the renamed (CHPP) type name
# 4) Regular type, e.g. uint32_t, so just use the type spec as-is
for annotation in member_info['annotations']:
if annotation['annotation'] == 'rewrite_type':
return annotation['type_override']
elif not underlying_vla_type and annotation['annotation'] in ['var_len_array', 'string']:
return 'struct ChppOffset'
if not underlying_vla_type and len(member_info['type'].declarators) > 0 and \
member_info['type'].declarators[0] == '*':
# This case should either be handled by rewrite_type (e.g. to uint32_t as
# opaque/ignored), or var_len_array
raise RuntimeError('Pointer types require annotation\n{}'.format(
member_info))
if member_info['is_nested_type']:
return self._get_chpp_type_from_chre(member_info['nested_type_name'])
return member_info['type'].type_spec
def _get_member_type_suffix(self, member_info):
if self._is_array_type(member_info['type']):
return '[{}]'.format(self._get_array_len(member_info['type']))
return ''
def _get_struct_or_union_prefix(self, chre_type):
return 'struct ' if not self.api.structs_and_unions[chre_type]['is_union'] else 'union '
def _gen_header_includes(self):
"""Generates #include directives for use in <service>_types.h"""
out = ['#include <stdbool.h>\n#include <stddef.h>\n#include <stdint.h>\n\n']
includes = ['chpp/app.h', 'chpp/macros.h', 'chre_api/chre/version.h']
includes.extend(self.json['output_includes'])
for incl in sorted(includes):
out.append('#include "{}"\n'.format(incl))
out.append('\n')
return out
def _gen_struct_or_union(self, name):
"""Generates the definition for a single struct/union type."""
out = []
if not name.startswith('anon'):
out.append('//! See {{@link {}}} for details\n'.format(name))
out.append('{} {{\n'.format(self._get_chpp_type_from_chre(name)))
for member_info in self.api.structs_and_unions[name]['members']:
out.append(' {} {}{};{}\n'.format(self._get_member_type(member_info),
member_info['name'],
self._get_member_type_suffix(
member_info),
self._get_member_comment(member_info)))
out.append('} CHPP_PACKED_ATTR;\n\n')
return out
def _gen_header_struct(self, chre_type):
"""Generates the definition for the type with header (WithHeader)."""
out = []
out.append('//! CHPP app header plus {}\n'.format(
self._get_chpp_header_type_from_chre(chre_type)))
out.append('{} {{\n'.format(
self._get_chpp_header_type_from_chre(chre_type)))
out.append(' struct ChppAppHeader header;\n')
out.append(' {} payload;\n'.format(
self._get_chpp_type_from_chre(chre_type)))
out.append('} CHPP_PACKED_ATTR;\n\n')
return out
def _gen_structs_and_unions(self):
"""Generates definitions for all struct/union types required for the root structs."""
out = []
out.append('CHPP_PACKED_START\n\n')
sorted_structs = self._sorted_structs(self.json['root_structs'])
for type_name in sorted_structs:
out.extend(self._gen_struct_or_union(type_name))
for chre_type in self.json['root_structs']:
out.extend(self._gen_header_struct(chre_type))
out.append('CHPP_PACKED_END\n\n')
return out
def _sorted_structs(self, root_nodes):
"""Implements a topological sort on self.api.structs_and_unions.
Elements are ordered by definition dependency, i.e. if A includes a field of type B,
then B will appear before A in the returned list.
:return: list of keys in self.api.structs_and_unions, sorted by dependency order
"""
result = []
visited = set()
def sort_helper(collection, key):
for dep in sorted(collection[key]['dependencies']):
if dep not in visited:
visited.add(dep)
sort_helper(collection, dep)
result.append(key)
for node in sorted(root_nodes):
sort_helper(self.api.structs_and_unions, node)
return result
# ----------------------------------------------------------------------------------------------
# Encoder function generation methods (CHRE --> CHPP)
# ----------------------------------------------------------------------------------------------
def _get_chpp_member_sizeof_call(self, member_info):
"""Returns invocation used to determine the size of the provided member when encoded.
Will be either sizeof(<type in CHPP struct>) or a function call if the member contains a VLA
:param member_info: a dict element from self.api.structs_and_unions[struct]['members']
:return: string
"""
type_name = None
if member_info['is_nested_type']:
chre_type = member_info['nested_type_name']
if self.api.structs_and_unions[chre_type]['has_vla_member']:
return '{}(in->{})'.format(self._get_chpp_sizeof_function_name(chre_type),
member_info['name'])
else:
type_name = self._get_chpp_type_from_chre(chre_type)
else:
type_name = member_info['type'].type_spec
return 'sizeof({})'.format(type_name)
def _gen_chpp_sizeof_function(self, chre_type):
"""Generates a function to determine the encoded size of the CHRE struct, if necessary."""
out = []
# Note that this function *should* work with unions as well, but at the time of writing
# it'll only be used with structs, so names, etc. are written with that in mind
struct_info = self.api.structs_and_unions[chre_type]
if not struct_info['has_vla_member']:
# No codegen necessary, just sizeof on the CHPP structure name is sufficient
return out
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_type)
parameter_name = core_type_name[0].lower() + core_type_name[1:]
chpp_type_name = self._get_chpp_header_type_from_chre(chre_type)
out.append('//! @return number of bytes required to represent the given\n'
'//! {} along with the CHPP header as\n'
'//! {}\n'
.format(chre_type, chpp_type_name))
out.append('static size_t {}(\n const {}{} *{}) {{\n'
.format(self._get_chpp_sizeof_function_name(chre_type),
self._get_struct_or_union_prefix(
chre_type), chre_type,
parameter_name))
# sizeof(this struct)
out.append(' size_t encodedSize = sizeof({});\n'.format(chpp_type_name))
# Plus count * sizeof(type) for each var-len array included in this struct
for member_info in self.api.structs_and_unions[chre_type]['members']:
for annotation in member_info['annotations']:
if annotation['annotation'] == 'var_len_array':
# If the VLA field itself contains a VLA, then we'd need to generate a for
# loop to calculate the size of each element individually - I don't think we
# have any of these in the CHRE API today, so leaving this functionality out.
# Also note that to support that case we'd also want to recursively call this
# function to generate sizeof functions for nested fields.
if member_info['is_nested_type'] and \
self.api.structs_and_unions[member_info['nested_type_name']][
'has_vla_member']:
raise RuntimeError(
'Nested variable-length arrays is not currently supported ({} '
'in {})'.format(member_info['name'], chre_type))
out.append(' encodedSize += {}->{} * sizeof({});\n'.format(
parameter_name, annotation['length_field'],
self._get_member_type(member_info, True)))
elif annotation['annotation'] == 'string':
out.append(' if ({}->{} != NULL) {{'.format(
parameter_name, annotation['field']))
out.append(' encodedSize += strlen({}->{}) + 1;\n'.format(
parameter_name, annotation['field']))
out.append(' }\n')
out.append(' return encodedSize;\n}\n\n')
return out
def _gen_chpp_sizeof_functions(self):
"""For each root struct, generate necessary functions to determine their encoded size."""
out = []
for struct in self.json['root_structs']:
out.extend(self._gen_chpp_sizeof_function(struct))
return out
def _gen_conversion_includes(self):
"""Generates #include directives for the conversion source file."""
out = ['#include "chpp/macros.h"\n'
'#include "chpp/memory.h"\n'
'#include "chpp/common/{}_types.h"\n\n'.format(self.service_name)]
out.append(
'#include <stddef.h>\n#include <stdint.h>\n#include <string.h>\n\n')
return out
def _get_chpp_sizeof_function_name(self, chre_struct):
"""Returns the function name used to compute the encoded size of the given struct at
runtime.
"""
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_struct)
return 'chpp{}SizeOf{}FromChre'.format(self.capitalized_service_name, core_type_name)
def _get_encoding_function_name(self, chre_type):
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_type)
return 'chpp{}Convert{}FromChre'.format(self.capitalized_service_name, core_type_name)
def _gen_encoding_function_signature(self, chre_type):
out = []
out.append(
'void {}(\n'.format(self._get_encoding_function_name(chre_type)))
out.append(' const {}{} *in,\n'.format(
self._get_struct_or_union_prefix(chre_type), chre_type))
out.append(' {} *out'.format(self._get_chpp_type_from_chre(chre_type)))
if self.api.structs_and_unions[chre_type]['has_vla_member']:
out.append(',\n')
out.append(' uint8_t *payload,\n')
out.append(' size_t payloadSize,\n')
out.append(' uint16_t *vlaOffset')
out.append(')')
return out
def _gen_string_encoding(self, member_info, annotation):
out = []
# Might want to revisit this if we ever end up supporting NULL strings
# in our API. We can assert here since there's currently no API that
# does so.
member_name = member_info['name']
out.append(' if (in->{} != NULL) {{\n'.format(member_name))
out.append(
' size_t strSize = strlen(in->{}) + 1;\n'.format(member_name))
out.append(' memcpy(&payload[*vlaOffset], in->{}, strSize);\n'.format(
member_name))
out.append(' out->{}.length = (uint16_t)(strSize);\n'.format(
member_name))
out.append(' out->{}.offset = *vlaOffset;\n'.format(member_name))
out.append(' *vlaOffset += out->{}.length;\n'.format(member_name))
out.append(' } else {\n')
out.append(' out->{}.length = 0;\n'.format(member_name))
out.append(' out->{}.offset = 0;\n'.format(member_name))
out.append(' }\n\n')
return out
def _gen_vla_encoding(self, member_info, annotation):
out = []
variable_name = member_info['name']
chpp_type = self._get_member_type(member_info, True)
if member_info['is_nested_type']:
out.append('\n {} *{} = ({} *) &payload[*vlaOffset];\n'.format(
chpp_type, variable_name, chpp_type))
out.append(' out->{}.length = (uint16_t)(in->{} * {});\n'.format(
member_info['name'], annotation['length_field'],
self._get_chpp_member_sizeof_call(member_info)))
out.append(' CHPP_ASSERT((size_t)(*vlaOffset + out->{}.length) <= payloadSize);\n'.format(
member_info['name']))
out.append(' if (out->{}.length > 0 &&\n'
' *vlaOffset + out->{}.length <= payloadSize) {{\n'.format(
member_info['name'], member_info['name']))
if member_info['is_nested_type']:
out.append(' for (size_t i = 0; i < in->{}; i++) {{\n'.format(
annotation['length_field']))
out.append(' {}'.format(
self._get_assignment_statement_for_field(member_info, in_vla_loop=True)))
out.append(' }\n')
else:
out.append('memcpy(&payload[*vlaOffset], in->{}, in->{} * sizeof({}));\n'.format(
member_info['name'], annotation['length_field'], chpp_type))
out.append(
' out->{}.offset = *vlaOffset;\n'.format(member_info['name']))
out.append(
' *vlaOffset += out->{}.length;\n'.format(member_info['name']))
out.append(' } else {\n')
out.append(' out->{}.offset = 0;\n'.format(member_info['name']))
out.append(' }\n')
return out
# ----------------------------------------------------------------------------------------------
# Encoder / decoder function generation methods (CHRE <--> CHPP)
# ----------------------------------------------------------------------------------------------
def _get_assignment_statement_for_field(self, member_info,
in_vla_loop=False,
containing_field_name=None,
decode_mode=False):
"""Returns a statement to assign the provided member
:param member_info:
:param in_vla_loop: True if we're currently inside a loop and should append [i]
:param containing_field_name: Additional member name to use to access the target field, or
None; for example the normal case is "out->field = in->field", but if we're generating
assignments in the parent conversion function (e.g. as used for union variants), we need to
do "out->nested_field.field = in->nested_field.field"
:param decode_mode: True converts from CHPP to CHRE. False from CHRE to CHPP
:return: assignment statement as a string
"""
array_index = '[i]' if in_vla_loop else ''
output_accessor = '' if in_vla_loop else 'out->'
containing_field = containing_field_name + \
'.' if containing_field_name is not None else ''
output_variable = '{}{}{}{}'.format(output_accessor, containing_field, member_info['name'],
array_index)
input_variable = 'in->{}{}{}'.format(containing_field,
member_info['name'], array_index)
if decode_mode and in_vla_loop:
output_variable = '{}Out{}'.format(
member_info['name'], array_index)
input_variable = '{}In{}'.format(member_info['name'], array_index)
if member_info['is_nested_type']:
chre_type = member_info['nested_type_name']
has_vla_member = self.api.structs_and_unions[chre_type]['has_vla_member']
if decode_mode:
# Use decoding function
vla_params = ', inSize' if has_vla_member else ''
out = 'if (!{}(&{}, &{}{})) {{\n'.format(
self._get_decoding_function_name(
chre_type), input_variable,
output_variable, vla_params)
if has_vla_member:
out += ' CHPP_FREE_AND_NULLIFY({}Out);\n'.format(
member_info['name'])
out += ' return false;\n'
out += '}\n'
return out
else:
# Use encoding function
vla_params = ', payload, payloadSize, vlaOffset' if has_vla_member else ''
return '{}(&{}, &{}{});\n'.format(
self._get_encoding_function_name(
chre_type), input_variable, output_variable,
vla_params)
elif self._is_array_type(member_info['type']):
# Array of primitive type (e.g. uint32_t[8]) - use memcpy
return 'memcpy({}, {}, sizeof({}));\n'.format(output_variable, input_variable,
output_variable)
else:
# Regular assignment
return '{} = {};\n'.format(output_variable, input_variable)
def _gen_union_variant_conversion_code(self, member_info, annotation, decode_mode):
"""Generates a switch statement to encode the "active"/"used" field within a union.
Handles cases where a union has multiple types, but there's another peer/adjacent field
which tells you which field in the union is to be used. Outputs code like this:
switch (in->{discriminator field}) {
case {first discriminator value associated with a fields}:
{conversion code for the field associated with this discriminator value}
...
:param chre_type: CHRE type of the union
:param annotation: Reference to JSON annotation data with the discriminator mapping data
:param decode_mode: False encodes from CHRE to CHPP. True decodes from CHPP to CHRE
:return: list of strings
"""
out = []
chre_type = member_info['nested_type_name']
struct_info = self.api.structs_and_unions[chre_type]
# Start off by zeroing out the union field so any padding is set to a consistent value
out.append(' memset(&out->{}, 0, sizeof(out->{}));\n'.format(member_info['name'],
member_info['name']))
# Next, generate the switch statement that will copy over the proper values
out.append(
' switch (in->{}) {{\n'.format(annotation['discriminator']))
for value, field_name in annotation['mapping']:
out.append(' case {}:\n'.format(value))
found = False
for nested_member_info in struct_info['members']:
if nested_member_info['name'] == field_name:
out.append(' {}'.format(
self._get_assignment_statement_for_field(
nested_member_info,
containing_field_name=member_info['name'],
decode_mode=decode_mode)))
found = True
break
if not found:
raise RuntimeError("Invalid mapping - couldn't find target field {} in struct {}"
.format(field_name, chre_type))
out.append(' break;\n')
out.append(' default:\n'
' CHPP_ASSERT(false);\n'
' }\n')
return out
def _gen_conversion_function(self, chre_type, already_generated, decode_mode):
out = []
struct_info = self.api.structs_and_unions[chre_type]
for dependency in sorted(struct_info['dependencies']):
if dependency not in already_generated:
out.extend(
self._gen_conversion_function(dependency, already_generated, decode_mode))
# Skip if we've already generated code for this type, or if it's a union (in which case we
# handle the assignment in the parent structure to enable support for discrimination of
# which field in the union to use)
if chre_type in already_generated or struct_info['is_union']:
return out
already_generated.add(chre_type)
out.append('static ')
if decode_mode:
out.extend(self._gen_decoding_function_signature(chre_type))
else:
out.extend(self._gen_encoding_function_signature(chre_type))
out.append(' {\n')
for member_info in self.api.structs_and_unions[chre_type]['members']:
generated_by_annotation = False
for annotation in member_info['annotations']:
if annotation['annotation'] == 'fixed_value':
if self._is_array_type(member_info['type']):
out.append(' memset(&out->{}, {}, sizeof(out->{}));\n'.format(
member_info['name'], annotation['value'], member_info['name']))
else:
out.append(' out->{} = {};\n'.format(member_info['name'],
annotation['value']))
generated_by_annotation = True
break
elif annotation['annotation'] == 'enum':
# Note: We could generate range verification code here, but it has not
# been considered necessary thus far.
pass
elif annotation['annotation'] == 'var_len_array':
if decode_mode:
out.extend(self._gen_vla_decoding(
member_info, annotation))
else:
out.extend(self._gen_vla_encoding(
member_info, annotation))
generated_by_annotation = True
break
elif annotation['annotation'] == 'string':
if decode_mode:
out.extend(self._gen_string_decoding(
member_info, annotation))
else:
out.extend(self._gen_string_encoding(
member_info, annotation))
generated_by_annotation = True
break
elif annotation['annotation'] == 'union_variant':
out.extend(self._gen_union_variant_conversion_code(
member_info, annotation, decode_mode))
generated_by_annotation = True
break
if not generated_by_annotation:
out.append(' {}'.format(
self._get_assignment_statement_for_field(member_info, decode_mode=decode_mode)))
if decode_mode:
out.append('\n return true;\n')
out.append('}\n\n')
return out
def _gen_conversion_functions(self, decode_mode):
out = []
already_generated = set()
for struct in self.json['root_structs']:
out.extend(self._gen_conversion_function(
struct, already_generated, decode_mode))
return out
def _strip_prefix_and_service_from_chre_struct_name(self, struct):
"""Strips 'chre' and service prefix, e.g. 'chreWwanCellResultInfo' -> 'CellResultInfo'."""
chre_stripped = struct[4:]
upcased_service_name = self.service_name[0].upper(
) + self.service_name[1:]
if not struct.startswith('chre') or not chre_stripped.startswith(upcased_service_name):
# If this happens, we need to update the script to handle it. Right we assume struct
# naming follows the pattern "chre<Service_name><Thing_name>"
raise RuntimeError('Unexpected structure name {}'.format(struct))
return chre_stripped[len(self.service_name):]
# ----------------------------------------------------------------------------------------------
# Memory allocation generation methods
# ----------------------------------------------------------------------------------------------
def _get_chpp_sizeof_call(self, chre_type):
"""Returns invocation used to determine the size of the provided CHRE struct (with the CHPP
app header) after encoding.
Like _get_chpp_member_sizeof_call(), except for a top-level type assigned to the variable
"in" rather than a member within a structure (e.g. a VLA field)
:param chre_type: CHRE type name
:return: string
"""
if self.api.structs_and_unions[chre_type]['has_vla_member']:
return '{}(in)'.format(self._get_chpp_sizeof_function_name(chre_type))
else:
return 'sizeof({})'.format(self._get_chpp_header_type_from_chre(chre_type))
def _get_encode_allocation_function_name(self, chre_type):
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_type)
return 'chpp{}{}FromChre'.format(self.capitalized_service_name, core_type_name)
def _gen_encode_allocation_function_signature(self, chre_type, gen_docs=False):
out = []
if gen_docs:
out.append('/**\n'
' * Converts from given CHRE structure to serialized CHPP type.\n'
' *\n'
' * @param in Fully-formed CHRE structure.\n'
' * @param out Upon success, will point to a buffer allocated with '
'chppMalloc().\n'
' * It is the responsibility of the caller to set the values of the CHPP '
'app layer header, and to free the buffer when it is no longer needed via '
'chppFree() or CHPP_FREE_AND_NULLIFY().\n'
' * @param outSize Upon success, will be set to the size of the output '
'buffer, in bytes.\n'
' *\n'
' * @return true on success, false if memory allocation failed.\n'
' */\n')
out.append(
'bool {}(\n'.format(self._get_encode_allocation_function_name(chre_type)))
out.append(' const {}{} *in,\n'.format(
self._get_struct_or_union_prefix(chre_type), chre_type))
out.append(
' {} **out,\n'.format(self._get_chpp_header_type_from_chre(chre_type)))
out.append(' size_t *outSize)')
return out
def _gen_encode_allocation_function(self, chre_type):
out = []
out.extend(self._gen_encode_allocation_function_signature(chre_type))
out.append(' {\n')
out.append(' CHPP_NOT_NULL(out);\n')
out.append(' CHPP_NOT_NULL(outSize);\n\n')
out.append(' size_t payloadSize = {};\n'.format(
self._get_chpp_sizeof_call(chre_type)))
out.append(' *out = chppMalloc(payloadSize);\n')
out.append(' if (*out != NULL) {\n')
struct_info = self.api.structs_and_unions[chre_type]
if struct_info['has_vla_member']:
out.append(' uint8_t *payload = (uint8_t *) &(*out)->payload;\n')
out.append(' uint16_t vlaOffset = sizeof({});\n'.format(
self._get_chpp_type_from_chre(chre_type)))
out.append(' {}(in, &(*out)->payload'.format(
self._get_encoding_function_name(chre_type)))
if struct_info['has_vla_member']:
out.append(', payload, payloadSize, &vlaOffset')
out.append(');\n')
out.append(' *outSize = payloadSize;\n')
out.append(' return true;\n')
out.append(' }\n')
out.append(' return false;\n}\n\n')
return out
def _gen_encode_allocation_functions(self):
out = []
for chre_type in self.json['root_structs']:
out.extend(self._gen_encode_allocation_function(chre_type))
return out
def _gen_encode_allocation_function_signatures(self):
out = []
for chre_type in self.json['root_structs']:
out.extend(
self._gen_encode_allocation_function_signature(chre_type, True))
out.append(';\n\n')
return out
# ----------------------------------------------------------------------------------------------
# Decoder function generation methods (CHPP --> CHRE)
# ----------------------------------------------------------------------------------------------
def _get_decoding_function_name(self, chre_type):
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_type)
return 'chpp{}Convert{}ToChre'.format(self.capitalized_service_name, core_type_name)
def _gen_decoding_function_signature(self, chre_type):
out = []
out.append(
'bool {}(\n'.format(self._get_decoding_function_name(chre_type)))
out.append(
' const {} *in,\n'.format(self._get_chpp_type_from_chre(chre_type)))
out.append(
' {} *out'.format(self._get_chre_type_with_prefix(chre_type)))
if self.api.structs_and_unions[chre_type]['has_vla_member']:
out.append(',\n')
out.append(' size_t inSize')
out.append(')')
return out
def _gen_string_decoding(self, member_info, annotation):
out = []
variable_name = member_info['name']
out.append('\n')
out.append(' if (in->{}.length == 0) {{\n'.format(variable_name))
out.append(' out->{} = NULL;\n'.format(variable_name))
out.append(' } else {\n')
out.append(' char *{}Out = chppMalloc(in->{}.length);\n'.format(
variable_name, variable_name))
out.append(' if ({}Out == NULL) {{\n'.format(variable_name))
out.append(' return false;\n')
out.append(' }\n\n')
out.append(' memcpy({}Out, &((const uint8_t *)in)[in->{}.offset],\n'.format(
variable_name, variable_name))
out.append(' in->{}.length);\n'.format(variable_name))
out.append(' out->{} = {}Out;\n'.format(variable_name, variable_name))
out.append(' }\n')
return out
def _gen_vla_decoding(self, member_info, annotation):
out = []
variable_name = member_info['name']
chpp_type = self._get_member_type(member_info, True)
if member_info['is_nested_type']:
chre_type = self._get_chre_type_with_prefix(
member_info['nested_type_name'])
else:
chre_type = chpp_type
out.append('\n')
out.append(' if (in->{}.length == 0) {{\n'.format(variable_name))
out.append(' out->{} = NULL;\n'.format(variable_name))
out.append(' }\n')
out.append(' else {\n')
out.append(' if (in->{}.offset + in->{}.length > inSize ||\n'.format(
variable_name, variable_name))
out.append(' in->{}.length != in->{} * sizeof({})) {{\n'.format(
variable_name, annotation['length_field'], chpp_type))
out.append(' return false;\n')
out.append(' }\n\n')
if member_info['is_nested_type']:
out.append(
' const {} *{}In =\n'.format(chpp_type, variable_name))
out.append(' (const {} *) &((const uint8_t *)in)[in->{}.offset];\n\n'.format(
chpp_type, variable_name))
out.append(' {} *{}Out = chppMalloc(in->{} * sizeof({}));\n'.format(
chre_type, variable_name, annotation['length_field'], chre_type))
out.append(' if ({}Out == NULL) {{\n'.format(variable_name))
out.append(' return false;\n')
out.append(' }\n\n')
if member_info['is_nested_type']:
out.append(' for (size_t i = 0; i < in->{}; i++) {{\n'.format(
annotation['length_field'], variable_name))
out.append(' {}'.format(self._get_assignment_statement_for_field(
member_info, in_vla_loop=True, decode_mode=True)))
out.append(' }\n')
else:
out.append(' memcpy({}Out, &((const uint8_t *)in)[in->{}.offset],\n'.format(
variable_name, variable_name))
out.append(' in->{} * sizeof({}));\n'.format(
annotation['length_field'], chre_type))
out.append(' out->{} = {}Out;\n'.format(variable_name, variable_name))
out.append(' }\n\n')
return out
def _get_decode_allocation_function_name(self, chre_type):
core_type_name = self._strip_prefix_and_service_from_chre_struct_name(
chre_type)
return 'chpp{}{}ToChre'.format(self.capitalized_service_name, core_type_name)
def _gen_decode_allocation_function_signature(self, chre_type, gen_docs=False):
out = []
if gen_docs:
out.append('/**\n'
' * Converts from serialized CHPP structure to a CHRE type.\n'
' *\n'
' * @param in Fully-formed CHPP structure.\n'
' * @param in Size of the CHPP structure in bytes.\n'
' *\n'
' * @return If successful, a pointer to a CHRE structure allocated with '
'chppMalloc(). If unsuccessful, null.\n'
' * It is the responsibility of the caller to free the buffer when it is no '
'longer needed via chppFree() or CHPP_FREE_AND_NULLIFY().\n'
' */\n')
out.append('{} *{}(\n'.format(
self._get_chre_type_with_prefix(chre_type),
self._get_decode_allocation_function_name(chre_type)))
out.append(
' const {} *in,\n'.format(self._get_chpp_type_from_chre(chre_type)))
out.append(' size_t inSize)')
return out
def _gen_decode_allocation_function(self, chre_type):
out = []
out.extend(self._gen_decode_allocation_function_signature(chre_type))
out.append(' {\n')
out.append(' {} *out = NULL;\n\n'.format(
self._get_chre_type_with_prefix(chre_type)))
out.append(' if (inSize >= sizeof({})) {{\n'.format(
self._get_chpp_type_from_chre(chre_type)))
out.append(' out = chppMalloc(sizeof({}));\n'.format(
self._get_chre_type_with_prefix(chre_type)))
out.append(' if (out != NULL) {\n')
struct_info = self.api.structs_and_unions[chre_type]
out.append(' if (!{}(in, out'.format(
self._get_decoding_function_name(chre_type)))
if struct_info['has_vla_member']:
out.append(', inSize')
out.append(')) {')
out.append(' CHPP_FREE_AND_NULLIFY(out);\n')
out.append(' }\n')
out.append(' }\n')
out.append(' }\n\n')
out.append(' return out;\n')
out.append('}\n')
return out
def _gen_decode_allocation_functions(self):
out = []
for chre_type in self.json['root_structs']:
out.extend(self._gen_decode_allocation_function(chre_type))
return out
def _gen_decode_allocation_function_signatures(self):
out = []
for chre_type in self.json['root_structs']:
out.extend(
self._gen_decode_allocation_function_signature(chre_type, True))
out.append(';\n\n')
return out
# ----------------------------------------------------------------------------------------------
# Public methods
# ----------------------------------------------------------------------------------------------
def generate_header_file(self, dry_run=False, skip_clang_format=False):
"""Creates a C header file for this API and writes it to the file indicated in the JSON."""
filename = self.service_name + '_types.h'
if not dry_run:
print('Generating {} ... '.format(filename), end='', flush=True)
output_file = os.path.join(
system_chre_abs_path(), CHPP_PARSER_INCLUDE_PATH, filename)
header = self.generate_header_string()
self._dump_to_file(output_file, header, dry_run, skip_clang_format)
if not dry_run:
print('done')
def generate_header_string(self):
"""Returns a C header with structure definitions for this API."""
# To defer concatenation (speed things up), build the file as a list of strings then only
# concatenate once at the end
out = [LICENSE_HEADER]
header_guard = 'CHPP_{}_TYPES_H_'.format(self.service_name.upper())
out.append('#ifndef {}\n#define {}\n\n'.format(
header_guard, header_guard))
out.extend(self._autogen_notice())
out.extend(self._gen_header_includes())
out.append('#ifdef __cplusplus\nextern "C" {\n#endif\n\n')
out.extend(self._gen_structs_and_unions())
out.append('\n// Encoding functions (CHRE --> CHPP)\n\n')
out.extend(self._gen_encode_allocation_function_signatures())
out.append('\n// Decoding functions (CHPP --> CHRE)\n\n')
out.extend(self._gen_decode_allocation_function_signatures())
out.append('#ifdef __cplusplus\n}\n#endif\n\n')
out.append('#endif // {}\n'.format(header_guard))
return ''.join(out)
def generate_conversion_file(self, dry_run=False, skip_clang_format=False):
"""Generates a .c file with functions for encoding CHRE structs into CHPP and vice versa."""
filename = self.service_name + '_convert.c'
if not dry_run:
print('Generating {} ... '.format(filename), end='', flush=True)
contents = self.generate_conversion_string()
output_file = os.path.join(
system_chre_abs_path(), CHPP_PARSER_SOURCE_PATH, filename)
self._dump_to_file(output_file, contents, dry_run, skip_clang_format)
if not dry_run:
print('done')
def generate_conversion_string(self):
"""Returns C code for encoding CHRE structs into CHPP and vice versa."""
out = [LICENSE_HEADER, '\n']
out.extend(self._autogen_notice())
out.extend(self._gen_conversion_includes())
out.append('\n// Encoding (CHRE --> CHPP) size functions\n\n')
out.extend(self._gen_chpp_sizeof_functions())
out.append('\n// Encoding (CHRE --> CHPP) conversion functions\n\n')
out.extend(self._gen_conversion_functions(False))
out.append('\n// Encoding (CHRE --> CHPP) top-level functions\n\n')
out.extend(self._gen_encode_allocation_functions())
out.append('\n// Decoding (CHPP --> CHRE) conversion functions\n\n')
out.extend(self._gen_conversion_functions(True))
out.append('\n// Decoding (CHPP --> CHRE) top-level functions\n\n')
out.extend(self._gen_decode_allocation_functions())
return ''.join(out)