| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| This is base class for tests that exercises different GATT procedures between two connected devices. |
| Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery. |
| """ |
| |
| from queue import Empty |
| |
| from antlion_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_characteristic |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_descriptor |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_service_types |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_event |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_cb_err |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_cb_strings |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_mtu_size |
| from antlion_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection |
| from antlion_contrib.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection |
| from antlion_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics |
| from antlion_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors |
| from antlion_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids |
| from antlion_contrib.test_utils.bt.bt_constants import bt_default_timeout |
| |
| |
| class GattConnectedBaseTest(BluetoothBaseTest): |
| |
| TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" |
| READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8" |
| READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32" |
| NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77" |
| |
| def setup_class(self): |
| super().setup_class() |
| self.cen_ad = self.android_devices[0] |
| self.per_ad = self.android_devices[1] |
| |
| def setup_test(self): |
| super(GattConnectedBaseTest, self).setup_test() |
| |
| self.gatt_server_callback, self.gatt_server = self._setup_multiple_services( |
| ) |
| if not self.gatt_server_callback or not self.gatt_server: |
| raise AssertionError('Service setup failed') |
| |
| self.bluetooth_gatt, self.gatt_callback, self.adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.per_ad.droid.bleStopBleAdvertising(self.adv_callback) |
| |
| self.mtu = gatt_mtu_size['min'] |
| |
| if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt): |
| event = self._client_wait(gatt_event['gatt_serv_disc']) |
| self.discovered_services_index = event['data']['ServicesIndex'] |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| self.discovered_services_index) |
| self.test_service_index = None |
| for i in range(services_count): |
| disc_service_uuid = ( |
| self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( |
| self.discovered_services_index, i).upper()) |
| if disc_service_uuid == self.TEST_SERVICE_UUID: |
| self.test_service_index = i |
| break |
| |
| if not self.test_service_index: |
| print("Service not found") |
| return False |
| |
| connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( |
| self.gatt_server) |
| if len(connected_device_list) == 0: |
| self.log.info("No devices connected from peripheral.") |
| return False |
| |
| return True |
| |
| def teardown_test(self): |
| self.per_ad.droid.gattServerClearServices(self.gatt_server) |
| self.per_ad.droid.gattServerClose(self.gatt_server) |
| |
| del self.gatt_server_callback |
| del self.gatt_server |
| |
| self._orchestrate_gatt_disconnection(self.bluetooth_gatt, |
| self.gatt_callback) |
| |
| return super(GattConnectedBaseTest, self).teardown_test() |
| |
| def _server_wait(self, gatt_event): |
| return self._timed_pop(gatt_event, self.per_ad, |
| self.gatt_server_callback) |
| |
| def _client_wait(self, gatt_event): |
| return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback) |
| |
| def _timed_pop(self, gatt_event, droid, gatt_callback): |
| expected_event = gatt_event["evt"].format(gatt_callback) |
| try: |
| return droid.ed.pop_event(expected_event, bt_default_timeout) |
| except Empty as emp: |
| raise AssertionError(gatt_event["err"].format(expected_event)) |
| |
| def _setup_characteristics_and_descriptors(self, droid): |
| characteristic_input = [ |
| { |
| 'uuid': self.WRITABLE_CHAR_UUID, |
| 'property': gatt_characteristic['property_write'] | |
| gatt_characteristic['property_write_no_response'], |
| 'permission': gatt_characteristic['permission_write'] |
| }, |
| { |
| 'uuid': self.READABLE_CHAR_UUID, |
| 'property': gatt_characteristic['property_read'], |
| 'permission': gatt_characteristic['permission_read'] |
| }, |
| { |
| 'uuid': self.NOTIFIABLE_CHAR_UUID, |
| 'property': gatt_characteristic['property_notify'] | |
| gatt_characteristic['property_indicate'], |
| 'permission': gatt_characteristic['permission_read'] |
| }, |
| ] |
| descriptor_input = [{ |
| 'uuid': self.WRITABLE_DESC_UUID, |
| 'property': gatt_descriptor['permission_read'] | |
| gatt_characteristic['permission_write'], |
| }, { |
| 'uuid': self.READABLE_DESC_UUID, |
| 'property': gatt_descriptor['permission_read'] | |
| gatt_descriptor['permission_write'], |
| }, { |
| 'uuid': gatt_char_desc_uuids['client_char_cfg'], |
| 'property': gatt_descriptor['permission_read'] | |
| gatt_descriptor['permission_write'], |
| }] |
| characteristic_list = setup_gatt_characteristics(droid, |
| characteristic_input) |
| self.notifiable_char_index = characteristic_list[2] |
| descriptor_list = setup_gatt_descriptors(droid, descriptor_input) |
| return characteristic_list, descriptor_list |
| |
| def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): |
| self.log.info("Disconnecting from peripheral device.") |
| try: |
| disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| except GattTestUtilsError as err: |
| log.error(err) |
| return False |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| return True |
| |
| def _find_service_added_event(self, gatt_server_callback, uuid): |
| expected_event = gatt_cb_strings['serv_added'].format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event(expected_event, |
| bt_default_timeout) |
| except Empty: |
| self.log.error(gatt_cb_err['serv_added_err'].format( |
| expected_event)) |
| return False |
| if event['data']['serviceUuid'].lower() != uuid.lower(): |
| self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( |
| event['data']['serviceUuid'], uuid)) |
| return False |
| return True |
| |
| def _setup_multiple_services(self): |
| gatt_server_callback = ( |
| self.per_ad.droid.gattServerCreateGattServerCallback()) |
| gatt_server = self.per_ad.droid.gattServerOpenGattServer( |
| gatt_server_callback) |
| characteristic_list, descriptor_list = ( |
| self._setup_characteristics_and_descriptors(self.per_ad.droid)) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[0], descriptor_list[0]) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[1], descriptor_list[1]) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[2], descriptor_list[2]) |
| gatt_service3 = self.per_ad.droid.gattServerCreateService( |
| self.TEST_SERVICE_UUID, gatt_service_types['primary']) |
| for characteristic in characteristic_list: |
| self.per_ad.droid.gattServerAddCharacteristicToService( |
| gatt_service3, characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) |
| result = self._find_service_added_event(gatt_server_callback, |
| self.TEST_SERVICE_UUID) |
| if not result: |
| return False, False |
| return gatt_server_callback, gatt_server |
| |
| def assertEqual(self, first, second, msg=None): |
| if not first == second: |
| if not msg: |
| raise AssertionError('%r != %r' % (first, second)) |
| else: |
| raise AssertionError(msg + ' %r != %r' % (first, second)) |