blob: 02cd24fb2ebca4ea9e1cf6949593874c7aa6e387 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import time
import logging
import pandas as pd
from antlion import asserts
from antlion.libs.proc import job
from antlion.base_test import BaseTestClass
from antlion_contrib.test_utils.bt.bt_power_test_utils import MediaControl
from antlion_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput_and_read_rssi
from antlion_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
import antlion_contrib.test_utils.bt.bt_test_utils as bt_utils
import antlion_contrib.test_utils.wifi.wifi_performance_test_utils as wifi_utils
PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
FORCE_SAR_ADB_COMMAND = ('am broadcast -n'
'com.google.android.apps.scone/.coex.TestReceiver -a '
'com.google.android.apps.scone.coex.SIMULATE_STATE ')
SLEEP_DURATION = 2
DEFAULT_DURATION = 5
DEFAULT_MAX_ERROR_THRESHOLD = 2
DEFAULT_AGG_MAX_ERROR_THRESHOLD = 2
FIXED_ATTENUATION = 36
class BtSarBaseTest(BaseTestClass):
""" Base class for all BT SAR Test classes.
This class implements functions common to BT SAR test Classes.
"""
BACKUP_BT_SAR_TABLE_NAME = 'backup_bt_sar_table.csv'
def __init__(self, controllers):
BaseTestClass.__init__(self, controllers)
self.power_file_paths = [
'/vendor/etc/bluetooth_power_limits.csv',
'/data/vendor/radio/bluetooth_power_limits.csv'
]
self.sar_file_name = os.path.basename(self.power_file_paths[0])
self.power_column = 'BluetoothPower'
self.REG_DOMAIN_DICT = {
('us', 'ca', 'in'): 'US',
('uk', 'fr', 'es', 'de', 'it', 'ie', 'sg', 'au', 'tw'): 'EU',
('jp', ): 'JP'
}
def setup_class(self):
"""Initializes common test hardware and parameters.
This function initializes hardware and compiles parameters that are
common to all tests in this class and derived classes.
"""
super().setup_class()
self.test_params = self.user_params.get('bt_sar_test_params', {})
if not self.test_params:
self.log.warning(
'bt_sar_test_params was not found in the config file.')
self.user_params.update(self.test_params)
req_params = ['bt_devices', 'calibration_params', 'custom_files']
self.unpack_userparams(
req_params,
country_code='us',
duration=DEFAULT_DURATION,
sort_order=None,
max_error_threshold=DEFAULT_MAX_ERROR_THRESHOLD,
agg_error_threshold=DEFAULT_AGG_MAX_ERROR_THRESHOLD,
tpc_threshold=[2, 8],
sar_margin={
'BDR': 0,
'EDR': 0,
'BLE': 0
})
self.attenuator = self.attenuators[0]
self.dut = self.android_devices[0]
for key in self.REG_DOMAIN_DICT.keys():
if self.country_code.lower() in key:
self.reg_domain = self.REG_DOMAIN_DICT[key]
self.sar_version_2 = False
if 'Error' not in self.dut.adb.shell('bluetooth_sar_test -r'):
#Flag for SAR version 2
self.sar_version_2 = True
self.power_column = 'BluetoothEDRPower'
self.power_file_paths[0] = os.path.join(
os.path.dirname(self.power_file_paths[0]),
'bluetooth_power_limits_{}.csv'.format(self.reg_domain))
self.sar_file_name = os.path.basename(self.power_file_paths[0])
if self.sar_version_2:
custom_file_suffix = 'version2'
else:
custom_file_suffix = 'version1'
for file in self.custom_files:
if 'custom_sar_table_{}.csv'.format(custom_file_suffix) in file:
self.custom_sar_path = file
break
else:
raise RuntimeError('Custom Sar File is missing')
self.sar_file_path = self.power_file_paths[0]
self.atten_min = 0
self.atten_max = int(self.attenuator.get_max_atten())
# Get music file and push it to the phone and initialize Media controller
music_files = self.user_params.get('music_files', [])
if music_files:
music_src = music_files[0]
music_dest = PHONE_MUSIC_FILE_DIRECTORY
success = self.dut.push_system_file(music_src, music_dest)
if success:
self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
os.path.basename(music_src))
# Initialize media_control class
self.media = MediaControl(self.dut, self.music_file)
#Initializing BT device controller
if self.bt_devices:
attr, idx = self.bt_devices.split(':')
self.bt_device_controller = getattr(self, attr)[int(idx)]
self.bt_device = bt_factory().generate(self.bt_device_controller)
else:
self.log.error('No BT devices config is provided!')
bt_utils.enable_bqr(self.android_devices)
self.log_path = os.path.join(logging.log_path, 'results')
os.makedirs(self.log_path, exist_ok=True)
# Reading BT SAR table from the phone
self.bt_sar_df = self.read_sar_table(self.dut)
def setup_test(self):
super().setup_test()
# Starting BT on the master
self.dut.droid.bluetoothFactoryReset()
bt_utils.enable_bluetooth(self.dut.droid, self.dut.ed)
# Starting BT on the slave
self.bt_device.reset()
self.bt_device.power_on()
# Connect master and slave
bt_utils.connect_phone_to_headset(self.dut, self.bt_device, 60)
# Playing music
self.media.play()
# Find and set PL10 level for the DUT
self.pl10_atten = self.set_PL10_atten_level(self.dut)
self.attenuator.set_atten(self.pl10_atten)
def teardown_test(self):
#Stopping Music
if hasattr(self, 'media'):
self.media.stop()
# Stopping BT on slave
self.bt_device.reset()
self.bt_device.power_off()
#Stopping BT on master
bt_utils.disable_bluetooth(self.dut.droid)
#Resetting the atten to initial levels
self.attenuator.set_atten(self.atten_min)
self.log.info('Attenuation set to {} dB'.format(self.atten_min))
def teardown_class(self):
super().teardown_class()
self.dut.droid.bluetoothFactoryReset()
# Stopping BT on slave
self.bt_device.reset()
self.bt_device.power_off()
#Stopping BT on master
bt_utils.disable_bluetooth(self.dut.droid)
def save_sar_plot(self, df):
""" Saves SAR plot to the path given.
Args:
df: Processed SAR table sweep results
"""
self.plot.add_line(
df.index,
df['expected_tx_power'],
legend='expected',
marker='circle')
self.plot.add_line(
df.index,
df['measured_tx_power'],
legend='measured',
marker='circle')
self.plot.add_line(
df.index, df['delta'], legend='delta', marker='circle')
results_file_path = os.path.join(self.log_path, '{}.html'.format(
self.current_test_name))
self.plot.generate_figure()
wifi_utils.BokehFigure.save_figures([self.plot], results_file_path)
def sweep_power_cap(self):
sar_df = self.bt_sar_df
sar_df['BDR_power_cap'] = -128
sar_df['EDR_power_cap'] = -128
sar_df['BLE_power_cap'] = -128
if self.sar_version_2:
power_column_dict = {
'BDR': 'BluetoothBDRPower',
'EDR': 'BluetoothEDRPower',
'BLE': 'BluetoothLEPower'
}
else:
power_column_dict = {'EDR': self.power_column}
power_cap_error = False
for type, column_name in power_column_dict.items():
self.log.info("Performing sanity test on {}".format(type))
# Iterating through the BT SAR scenarios
for scenario in range(0, self.bt_sar_df.shape[0]):
# Reading BT SAR table row into dict
read_scenario = sar_df.loc[scenario].to_dict()
start_time = self.dut.adb.shell('date +%s.%m')
time.sleep(SLEEP_DURATION)
# Setting SAR state to the read BT SAR row
self.set_sar_state(self.dut, read_scenario, self.country_code)
# Reading device power cap from logcat after forcing SAR State
scenario_power_cap = self.get_current_power_cap(
self.dut, start_time, type=type)
sar_df.loc[scenario, '{}_power_cap'.format(
type)] = scenario_power_cap
self.log.info(
'scenario: {}, '
'sar_power: {}, power_cap:{}'.format(
scenario, sar_df.loc[scenario, column_name],
sar_df.loc[scenario, '{}_power_cap'.format(type)]))
if not sar_df['{}_power_cap'.format(type)].equals(sar_df[column_name]):
power_cap_error = True
results_file_path = os.path.join(self.log_path, '{}.csv'.format(
self.current_test_name))
sar_df.to_csv(results_file_path)
return power_cap_error
def sweep_table(self,
client_ad=None,
server_ad=None,
client_conn_id=None,
gatt_server=None,
gatt_callback=None,
isBLE=False):
"""Iterates over the BT SAR table and forces signal states.
Iterates over BT SAR table and forces signal states,
measuring RSSI and power level for each state.
Args:
client_ad: the Android device performing the connection.
server_ad: the Android device accepting the connection.
client_conn_id: the client connection ID.
gatt_server: the gatt server
gatt_callback: Gatt callback objec
isBLE : boolean variable for BLE connection
Returns:
sar_df : SAR table sweep results in pandas dataframe
"""
sar_df = self.bt_sar_df.copy()
sar_df['power_cap'] = -128
sar_df['slave_rssi'] = -128
sar_df['master_rssi'] = -128
sar_df['ble_rssi'] = -128
sar_df['pwlv'] = -1
# Sorts the table
if self.sort_order:
if self.sort_order.lower() == 'ascending':
sar_df = sar_df.sort_values(
by=[self.power_column], ascending=True)
else:
sar_df = sar_df.sort_values(
by=[self.power_column], ascending=False)
sar_df = sar_df.reset_index(drop=True)
# Sweeping BT SAR table
for scenario in range(sar_df.shape[0]):
# Reading BT SAR Scenario from the table
read_scenario = sar_df.loc[scenario].to_dict()
start_time = self.dut.adb.shell('date +%s.%m')
time.sleep(SLEEP_DURATION)
#Setting SAR State
self.set_sar_state(self.dut, read_scenario, self.country_code)
if isBLE:
sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap(
self.dut, start_time, type='BLE')
sar_df.loc[
scenario, 'ble_rssi'] = run_ble_throughput_and_read_rssi(
client_ad, server_ad, client_conn_id, gatt_server,
gatt_callback)
self.log.info('scenario:{}, power_cap:{}, ble_rssi:{}'.format(
scenario, sar_df.loc[scenario, 'power_cap'],
sar_df.loc[scenario, 'ble_rssi']))
else:
sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap(
self.dut, start_time)
processed_bqr_results = bt_utils.get_bt_metric(
self.android_devices, self.duration)
sar_df.loc[scenario, 'slave_rssi'] = processed_bqr_results[
'rssi'][self.bt_device_controller.serial]
sar_df.loc[scenario, 'master_rssi'] = processed_bqr_results[
'rssi'][self.dut.serial]
sar_df.loc[scenario, 'pwlv'] = processed_bqr_results['pwlv'][
self.dut.serial]
self.log.info(
'scenario:{}, power_cap:{}, s_rssi:{}, m_rssi:{}, m_pwlv:{}'
.format(scenario, sar_df.loc[scenario, 'power_cap'],
sar_df.loc[scenario, 'slave_rssi'],
sar_df.loc[scenario, 'master_rssi'],
sar_df.loc[scenario, 'pwlv']))
self.log.info('BT SAR Table swept')
return sar_df
def process_table(self, sar_df):
"""Processes the results of sweep_table and computes BT TX power.
Processes the results of sweep_table and computes BT TX power
after factoring in the path loss and FTM offsets.
Args:
sar_df: BT SAR table after the sweep
Returns:
sar_df: processed BT SAR table
"""
sar_df['pathloss'] = self.calibration_params['pathloss']
if hasattr(self, 'pl10_atten'):
sar_df['atten'] = self.pl10_atten
else:
sar_df['atten'] = FIXED_ATTENUATION
# BT SAR Backoff for each scenario
if self.sar_version_2:
#Reads OTP values from the phone
self.otp = bt_utils.read_otp(self.dut)
#OTP backoff
edr_otp = min(0, float(self.otp['EDR']['10']))
bdr_otp = min(0, float(self.otp['BR']['10']))
ble_otp = min(0, float(self.otp['BLE']['10']))
# EDR TX Power for PL10
edr_tx_power_pl10 = self.calibration_params['target_power']['EDR']['10'] - edr_otp
# BDR TX Power for PL10
bdr_tx_power_pl10 = self.calibration_params['target_power']['BDR']['10'] - bdr_otp
# RSSI being measured is BDR
offset = bdr_tx_power_pl10 - edr_tx_power_pl10
# BDR-EDR offset
sar_df['offset'] = offset
# Max TX power permissible
sar_df['max_power'] = self.calibration_params['max_power']
# Adding a target power column
if 'ble_rssi' in sar_df.columns:
sar_df[
'target_power'] = self.calibration_params['target_power']['BLE']['10'] - ble_otp
else:
sar_df['target_power'] = sar_df['pwlv'].astype(str).map(
self.calibration_params['target_power']['EDR']) - edr_otp
#Translates power_cap values to expected TX power level
sar_df['cap_tx_power'] = sar_df['power_cap'] / 4.0
sar_df['expected_tx_power'] = sar_df[[
'cap_tx_power', 'target_power', 'max_power'
]].min(axis=1)
if hasattr(self, 'pl10_atten'):
sar_df[
'measured_tx_power'] = sar_df['slave_rssi'] + sar_df['pathloss'] + self.pl10_atten - offset
else:
sar_df[
'measured_tx_power'] = sar_df['ble_rssi'] + sar_df['pathloss'] + FIXED_ATTENUATION
else:
# Adding a target power column
sar_df['target_power'] = sar_df['pwlv'].astype(str).map(
self.calibration_params['target_power']['EDR']['10'])
# Adding a ftm power column
sar_df['ftm_power'] = sar_df['pwlv'].astype(str).map(
self.calibration_params['ftm_power']['EDR'])
sar_df[
'backoff'] = sar_df['target_power'] - sar_df['power_cap'] / 4.0
sar_df[
'expected_tx_power'] = sar_df['ftm_power'] - sar_df['backoff']
sar_df[
'measured_tx_power'] = sar_df['slave_rssi'] + sar_df['pathloss'] + self.pl10_atten
sar_df[
'delta'] = sar_df['expected_tx_power'] - sar_df['measured_tx_power']
self.log.info('Sweep results processed')
results_file_path = os.path.join(self.log_path, self.current_test_name)
sar_df.to_csv('{}.csv'.format(results_file_path))
self.save_sar_plot(sar_df)
return sar_df
def process_results(self, sar_df, type='EDR'):
"""Determines the test results of the sweep.
Parses the processed table with computed BT TX power values
to return pass or fail.
Args:
sar_df: processed BT SAR table
"""
if self.sar_version_2:
breach_error_result = (
sar_df['expected_tx_power'] + self.sar_margin[type] >
sar_df['measured_tx_power']).all()
if not breach_error_result:
asserts.fail('Measured TX power exceeds expected')
else:
# checks for errors at particular points in the sweep
max_error_result = abs(
sar_df['delta']) > self.max_error_threshold[type]
if max_error_result:
asserts.fail('Maximum Error Threshold Exceeded')
# checks for error accumulation across the sweep
if sar_df['delta'].sum() > self.agg_error_threshold[type]:
asserts.fail(
'Aggregate Error Threshold Exceeded. Error: {} Threshold: {}'.
format(sar_df['delta'].sum(), self.agg_error_threshold))
asserts.explicit_pass('Measured and Expected Power Values in line')
def set_sar_state(self, ad, signal_dict, country_code='us'):
"""Sets the SAR state corresponding to the BT SAR signal.
The SAR state is forced using an adb command that takes
device signals as input.
Args:
ad: android_device object.
signal_dict: dict of BT SAR signals read from the SAR file.
Returns:
enforced_state: dict of device signals.
"""
signal_dict = {k: max(int(v), 0) for (k, v) in signal_dict.items()}
signal_dict["Wifi"] = signal_dict['WIFI5Ghz']
signal_dict['WIFI2Ghz'] = 0 if signal_dict['WIFI5Ghz'] else 1
device_state_dict = {
('Earpiece', 'earpiece'): signal_dict['Head'],
('Wifi', 'wifi'): signal_dict['WIFI5Ghz'],
('Wifi 2.4G', 'wifi_24g'): signal_dict['WIFI2Ghz'],
('Voice', 'voice'): 0,
('Wifi AP', 'wifi_ap'): signal_dict['HotspotVoice'],
('Bluetooth', 'bluetooth'): 1,
('Bluetooth media', 'bt_media'): signal_dict['BTMedia'],
('Radio', 'radio_power'): signal_dict['Cell'],
('Motion', 'motion'): signal_dict['IMU'],
('Bluetooth connected', 'bt_connected'): 1
}
if 'BTHotspot' in signal_dict.keys():
device_state_dict[('Bluetooth tethering',
'bt_tethering')] = signal_dict['BTHotspot']
enforced_state = {}
sar_state_command = FORCE_SAR_ADB_COMMAND
for key in device_state_dict:
enforced_state[key[0]] = device_state_dict[key]
sar_state_command = '{} --ei {} {}'.format(
sar_state_command, key[1], device_state_dict[key])
if self.sar_version_2:
sar_state_command = '{} --es country_iso "{}"'.format(
sar_state_command, country_code.lower())
#Forcing the SAR state
adb_output = ad.adb.shell(sar_state_command)
# Checking if command was successfully enforced
if 'result=0' in adb_output:
self.log.info('Requested BT SAR state successfully enforced.')
return enforced_state
else:
self.log.error("Couldn't force BT SAR state.")
def parse_bt_logs(self, ad, begin_time, regex=''):
"""Returns bt software stats by parsing logcat since begin_time.
The quantity to be fetched is dictated by the regex provided.
Args:
ad: android_device object.
begin_time: time stamp to start the logcat parsing.
regex: regex for fetching the required BT software stats.
Returns:
stat: the desired BT stat.
"""
# Waiting for logcat to update
time.sleep(SLEEP_DURATION)
bt_adb_log = ad.adb.logcat('-b all -t %s' % begin_time)
for line in bt_adb_log.splitlines():
if re.findall(regex, line):
stat = re.findall(regex, line)[0]
return stat
def set_country_code(self, ad, cc):
"""Sets the SAR regulatory domain as per given country code
The SAR regulatory domain is forced using an adb command that takes
country code as input.
Args:
ad: android_device object.
cc: country code
"""
ad.adb.shell("{} --es country_iso {}".format(FORCE_SAR_ADB_COMMAND,
cc))
self.log.info("Country Code set to {}".format(cc))
def get_country_code(self, ad, begin_time):
"""Returns the enforced regulatory domain since begin_time
Returns enforced regulatory domain since begin_time by parsing logcat.
Function should follow a function call to set a country code
Args:
ad : android_device obj
begin_time: time stamp to start
Returns:
read enforced regulatory domain
"""
reg_domain_regex = "updateRegulatoryDomain:\s+(\S+)"
reg_domain = self.parse_bt_logs(ad, begin_time, reg_domain_regex)
return reg_domain
def get_current_power_cap(self, ad, begin_time, type='EDR'):
""" Returns the enforced software EDR power cap since begin_time.
Returns the enforced EDR power cap since begin_time by parsing logcat.
Function should follow a function call that forces a SAR state
Args:
ad: android_device obj.
begin_time: time stamp to start.
Returns:
read enforced power cap
"""
power_cap_regex_dict = {
'BDR': [
'Bluetooth powers: BR:\s+(\d+), EDR:\s+\d+',
'Bluetooth Tx Power Cap\s+(\d+)'
],
'EDR': [
'Bluetooth powers: BR:\s+\d+, EDR:\s+(\d+)',
'Bluetooth Tx Power Cap\s+(\d+)'
],
'BLE': [
'Bluetooth powers: BR:\s+\d+, EDR:\s+\d+, BLE:\s+(\d+)',
'Bluetooth Tx Power Cap\s+(\d+)'
]
}
power_cap_regex_list = power_cap_regex_dict[type]
for power_cap_regex in power_cap_regex_list:
power_cap = self.parse_bt_logs(ad, begin_time, power_cap_regex)
if power_cap:
return int(power_cap)
raise ValueError('Failed to get TX power cap')
def get_current_device_state(self, ad, begin_time):
""" Returns the device state of the android dut since begin_time.
Returns the device state of the android dut by parsing logcat since
begin_time. Function should follow a function call that forces
a SAR state.
Args:
ad: android_device obj.
begin_time: time stamp to start.
Returns:
device_state: device state of the android device.
"""
device_state_regex = 'updateDeviceState: DeviceState: ([\s*\S+\s]+)'
time.sleep(SLEEP_DURATION)
device_state = self.parse_bt_logs(ad, begin_time, device_state_regex)
if device_state:
return device_state
raise ValueError("Couldn't fetch device state")
def read_sar_table(self, ad, output_path=''):
"""Extracts the BT SAR table from the phone.
Extracts the BT SAR table from the phone into the android device
log path directory.
Args:
ad: android_device object.
output_path: path to custom sar table
Returns:
df : BT SAR table (as pandas DataFrame).
"""
if not output_path:
output_path = os.path.join(ad.device_log_path, self.sar_file_name)
ad.adb.pull('{} {}'.format(self.sar_file_path, output_path))
df = pd.read_csv(output_path)
self.log.info('BT SAR table read from the phone')
return df
def push_table(self, ad, src_path, dest_path=''):
"""Pushes a BT SAR table to the phone.
Pushes a BT SAR table to the android device and reboots the device.
Also creates a backup file if backup flag is True.
Args:
ad: android_device object.
src_path: path to the BT SAR table.
"""
#Copying the to-be-pushed file for logging
if os.path.dirname(src_path) != ad.device_log_path:
job.run('cp {} {}'.format(src_path, ad.device_log_path))
#Pushing the file provided in the config
if dest_path:
ad.push_system_file(src_path, dest_path)
else:
ad.push_system_file(src_path, self.sar_file_path)
self.log.info('BT SAR table pushed')
ad.reboot()
self.bt_sar_df = self.read_sar_table(self.dut, src_path)
def set_PL10_atten_level(self, ad):
"""Finds the attenuation level at which the phone is at PL10
Finds PL10 attenuation level by sweeping the attenuation range.
If the power level is not achieved during sweep,
returns the max atten level
Args:
ad: android object class
Returns:
atten : attenuation level when the phone is at PL10
"""
BT_SAR_ATTEN_STEP = 3
for atten in range(self.atten_min, self.atten_max, BT_SAR_ATTEN_STEP):
self.attenuator.set_atten(atten)
# Sleep required for BQR to reflect the change in parameters
time.sleep(SLEEP_DURATION)
metrics = bt_utils.get_bt_metric(ad)
if metrics['pwlv'][ad.serial] == 10:
self.log.info(
'PL10 located at {}'.format(atten + BT_SAR_ATTEN_STEP))
return atten + BT_SAR_ATTEN_STEP
self.log.warn(
"PL10 couldn't be located in the given attenuation range")