blob: c0d225f338fc753fa217f785b55149717f2a0653 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This is base class for tests that exercises different GATT procedures between two connected devices.
Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery.
"""
from queue import Empty
from antlion_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from antlion_contrib.test_utils.bt.bt_constants import gatt_characteristic
from antlion_contrib.test_utils.bt.bt_constants import gatt_descriptor
from antlion_contrib.test_utils.bt.bt_constants import gatt_service_types
from antlion_contrib.test_utils.bt.bt_constants import gatt_event
from antlion_contrib.test_utils.bt.bt_constants import gatt_cb_err
from antlion_contrib.test_utils.bt.bt_constants import gatt_cb_strings
from antlion_contrib.test_utils.bt.bt_constants import gatt_mtu_size
from antlion_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
from antlion_contrib.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection
from antlion_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics
from antlion_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors
from antlion_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids
from antlion_contrib.test_utils.bt.bt_constants import bt_default_timeout
class GattConnectedBaseTest(BluetoothBaseTest):
TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8"
READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32"
NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77"
def setup_class(self):
super().setup_class()
self.cen_ad = self.android_devices[0]
self.per_ad = self.android_devices[1]
def setup_test(self):
super(GattConnectedBaseTest, self).setup_test()
self.gatt_server_callback, self.gatt_server = self._setup_multiple_services(
)
if not self.gatt_server_callback or not self.gatt_server:
raise AssertionError('Service setup failed')
self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (
orchestrate_gatt_connection(self.cen_ad, self.per_ad))
self.per_ad.droid.bleStopBleAdvertising(self.adv_callback)
self.mtu = gatt_mtu_size['min']
if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt):
event = self._client_wait(gatt_event['gatt_serv_disc'])
self.discovered_services_index = event['data']['ServicesIndex']
services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
self.discovered_services_index)
self.test_service_index = None
for i in range(services_count):
disc_service_uuid = (
self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
self.discovered_services_index, i).upper())
if disc_service_uuid == self.TEST_SERVICE_UUID:
self.test_service_index = i
break
if not self.test_service_index:
print("Service not found")
return False
connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices(
self.gatt_server)
if len(connected_device_list) == 0:
self.log.info("No devices connected from peripheral.")
return False
return True
def teardown_test(self):
self.per_ad.droid.gattServerClearServices(self.gatt_server)
self.per_ad.droid.gattServerClose(self.gatt_server)
del self.gatt_server_callback
del self.gatt_server
self._orchestrate_gatt_disconnection(self.bluetooth_gatt,
self.gatt_callback)
return super(GattConnectedBaseTest, self).teardown_test()
def _server_wait(self, gatt_event):
return self._timed_pop(gatt_event, self.per_ad,
self.gatt_server_callback)
def _client_wait(self, gatt_event):
return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback)
def _timed_pop(self, gatt_event, droid, gatt_callback):
expected_event = gatt_event["evt"].format(gatt_callback)
try:
return droid.ed.pop_event(expected_event, bt_default_timeout)
except Empty as emp:
raise AssertionError(gatt_event["err"].format(expected_event))
def _setup_characteristics_and_descriptors(self, droid):
characteristic_input = [
{
'uuid': self.WRITABLE_CHAR_UUID,
'property': gatt_characteristic['property_write'] |
gatt_characteristic['property_write_no_response'],
'permission': gatt_characteristic['permission_write']
},
{
'uuid': self.READABLE_CHAR_UUID,
'property': gatt_characteristic['property_read'],
'permission': gatt_characteristic['permission_read']
},
{
'uuid': self.NOTIFIABLE_CHAR_UUID,
'property': gatt_characteristic['property_notify'] |
gatt_characteristic['property_indicate'],
'permission': gatt_characteristic['permission_read']
},
]
descriptor_input = [{
'uuid': self.WRITABLE_DESC_UUID,
'property': gatt_descriptor['permission_read'] |
gatt_characteristic['permission_write'],
}, {
'uuid': self.READABLE_DESC_UUID,
'property': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
}, {
'uuid': gatt_char_desc_uuids['client_char_cfg'],
'property': gatt_descriptor['permission_read'] |
gatt_descriptor['permission_write'],
}]
characteristic_list = setup_gatt_characteristics(droid,
characteristic_input)
self.notifiable_char_index = characteristic_list[2]
descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
return characteristic_list, descriptor_list
def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
self.log.info("Disconnecting from peripheral device.")
try:
disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
gatt_callback)
except GattTestUtilsError as err:
log.error(err)
return False
self.cen_ad.droid.gattClientClose(bluetooth_gatt)
return True
def _find_service_added_event(self, gatt_server_callback, uuid):
expected_event = gatt_cb_strings['serv_added'].format(
gatt_server_callback)
try:
event = self.per_ad.ed.pop_event(expected_event,
bt_default_timeout)
except Empty:
self.log.error(gatt_cb_err['serv_added_err'].format(
expected_event))
return False
if event['data']['serviceUuid'].lower() != uuid.lower():
self.log.error("Uuid mismatch. Found: {}, Expected {}.".format(
event['data']['serviceUuid'], uuid))
return False
return True
def _setup_multiple_services(self):
gatt_server_callback = (
self.per_ad.droid.gattServerCreateGattServerCallback())
gatt_server = self.per_ad.droid.gattServerOpenGattServer(
gatt_server_callback)
characteristic_list, descriptor_list = (
self._setup_characteristics_and_descriptors(self.per_ad.droid))
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[0], descriptor_list[0])
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[1], descriptor_list[1])
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[2], descriptor_list[2])
gatt_service3 = self.per_ad.droid.gattServerCreateService(
self.TEST_SERVICE_UUID, gatt_service_types['primary'])
for characteristic in characteristic_list:
self.per_ad.droid.gattServerAddCharacteristicToService(
gatt_service3, characteristic)
self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3)
result = self._find_service_added_event(gatt_server_callback,
self.TEST_SERVICE_UUID)
if not result:
return False, False
return gatt_server_callback, gatt_server
def assertEqual(self, first, second, msg=None):
if not first == second:
if not msg:
raise AssertionError('%r != %r' % (first, second))
else:
raise AssertionError(msg + ' %r != %r' % (first, second))