| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import re |
| import time |
| import logging |
| import pandas as pd |
| |
| from antlion import asserts |
| from antlion.libs.proc import job |
| from antlion.base_test import BaseTestClass |
| |
| from antlion_contrib.test_utils.bt.bt_power_test_utils import MediaControl |
| from antlion_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput_and_read_rssi |
| from antlion_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory |
| |
| import antlion_contrib.test_utils.bt.bt_test_utils as bt_utils |
| import antlion_contrib.test_utils.wifi.wifi_performance_test_utils as wifi_utils |
| |
| PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' |
| |
| FORCE_SAR_ADB_COMMAND = ('am broadcast -n' |
| 'com.google.android.apps.scone/.coex.TestReceiver -a ' |
| 'com.google.android.apps.scone.coex.SIMULATE_STATE ') |
| |
| SLEEP_DURATION = 2 |
| |
| DEFAULT_DURATION = 5 |
| DEFAULT_MAX_ERROR_THRESHOLD = 2 |
| DEFAULT_AGG_MAX_ERROR_THRESHOLD = 2 |
| FIXED_ATTENUATION = 36 |
| |
| |
| class BtSarBaseTest(BaseTestClass): |
| """ Base class for all BT SAR Test classes. |
| |
| This class implements functions common to BT SAR test Classes. |
| """ |
| BACKUP_BT_SAR_TABLE_NAME = 'backup_bt_sar_table.csv' |
| |
| def __init__(self, controllers): |
| BaseTestClass.__init__(self, controllers) |
| self.power_file_paths = [ |
| '/vendor/etc/bluetooth_power_limits.csv', |
| '/data/vendor/radio/bluetooth_power_limits.csv' |
| ] |
| self.sar_file_name = os.path.basename(self.power_file_paths[0]) |
| self.power_column = 'BluetoothPower' |
| self.REG_DOMAIN_DICT = { |
| ('us', 'ca', 'in'): 'US', |
| ('uk', 'fr', 'es', 'de', 'it', 'ie', 'sg', 'au', 'tw'): 'EU', |
| ('jp', ): 'JP' |
| } |
| |
| def setup_class(self): |
| """Initializes common test hardware and parameters. |
| |
| This function initializes hardware and compiles parameters that are |
| common to all tests in this class and derived classes. |
| """ |
| super().setup_class() |
| |
| self.test_params = self.user_params.get('bt_sar_test_params', {}) |
| if not self.test_params: |
| self.log.warning( |
| 'bt_sar_test_params was not found in the config file.') |
| |
| self.user_params.update(self.test_params) |
| req_params = ['bt_devices', 'calibration_params', 'custom_files'] |
| |
| self.unpack_userparams( |
| req_params, |
| country_code='us', |
| duration=DEFAULT_DURATION, |
| sort_order=None, |
| max_error_threshold=DEFAULT_MAX_ERROR_THRESHOLD, |
| agg_error_threshold=DEFAULT_AGG_MAX_ERROR_THRESHOLD, |
| tpc_threshold=[2, 8], |
| sar_margin={ |
| 'BDR': 0, |
| 'EDR': 0, |
| 'BLE': 0 |
| }) |
| |
| self.attenuator = self.attenuators[0] |
| self.dut = self.android_devices[0] |
| |
| for key in self.REG_DOMAIN_DICT.keys(): |
| if self.country_code.lower() in key: |
| self.reg_domain = self.REG_DOMAIN_DICT[key] |
| |
| self.sar_version_2 = False |
| |
| if 'Error' not in self.dut.adb.shell('bluetooth_sar_test -r'): |
| #Flag for SAR version 2 |
| self.sar_version_2 = True |
| self.power_column = 'BluetoothEDRPower' |
| self.power_file_paths[0] = os.path.join( |
| os.path.dirname(self.power_file_paths[0]), |
| 'bluetooth_power_limits_{}.csv'.format(self.reg_domain)) |
| self.sar_file_name = os.path.basename(self.power_file_paths[0]) |
| |
| if self.sar_version_2: |
| custom_file_suffix = 'version2' |
| else: |
| custom_file_suffix = 'version1' |
| |
| for file in self.custom_files: |
| if 'custom_sar_table_{}.csv'.format(custom_file_suffix) in file: |
| self.custom_sar_path = file |
| break |
| else: |
| raise RuntimeError('Custom Sar File is missing') |
| |
| self.sar_file_path = self.power_file_paths[0] |
| self.atten_min = 0 |
| self.atten_max = int(self.attenuator.get_max_atten()) |
| |
| # Get music file and push it to the phone and initialize Media controller |
| music_files = self.user_params.get('music_files', []) |
| if music_files: |
| music_src = music_files[0] |
| music_dest = PHONE_MUSIC_FILE_DIRECTORY |
| success = self.dut.push_system_file(music_src, music_dest) |
| if success: |
| self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, |
| os.path.basename(music_src)) |
| # Initialize media_control class |
| self.media = MediaControl(self.dut, self.music_file) |
| |
| #Initializing BT device controller |
| if self.bt_devices: |
| attr, idx = self.bt_devices.split(':') |
| self.bt_device_controller = getattr(self, attr)[int(idx)] |
| self.bt_device = bt_factory().generate(self.bt_device_controller) |
| else: |
| self.log.error('No BT devices config is provided!') |
| |
| bt_utils.enable_bqr(self.android_devices) |
| |
| self.log_path = os.path.join(logging.log_path, 'results') |
| os.makedirs(self.log_path, exist_ok=True) |
| |
| # Reading BT SAR table from the phone |
| self.bt_sar_df = self.read_sar_table(self.dut) |
| |
| def setup_test(self): |
| super().setup_test() |
| |
| # Starting BT on the master |
| self.dut.droid.bluetoothFactoryReset() |
| bt_utils.enable_bluetooth(self.dut.droid, self.dut.ed) |
| |
| # Starting BT on the slave |
| self.bt_device.reset() |
| self.bt_device.power_on() |
| |
| # Connect master and slave |
| bt_utils.connect_phone_to_headset(self.dut, self.bt_device, 60) |
| |
| # Playing music |
| self.media.play() |
| |
| # Find and set PL10 level for the DUT |
| self.pl10_atten = self.set_PL10_atten_level(self.dut) |
| self.attenuator.set_atten(self.pl10_atten) |
| |
| def teardown_test(self): |
| #Stopping Music |
| if hasattr(self, 'media'): |
| self.media.stop() |
| |
| # Stopping BT on slave |
| self.bt_device.reset() |
| self.bt_device.power_off() |
| |
| #Stopping BT on master |
| bt_utils.disable_bluetooth(self.dut.droid) |
| |
| #Resetting the atten to initial levels |
| self.attenuator.set_atten(self.atten_min) |
| self.log.info('Attenuation set to {} dB'.format(self.atten_min)) |
| |
| def teardown_class(self): |
| |
| super().teardown_class() |
| self.dut.droid.bluetoothFactoryReset() |
| |
| # Stopping BT on slave |
| self.bt_device.reset() |
| self.bt_device.power_off() |
| |
| #Stopping BT on master |
| bt_utils.disable_bluetooth(self.dut.droid) |
| |
| def save_sar_plot(self, df): |
| """ Saves SAR plot to the path given. |
| |
| Args: |
| df: Processed SAR table sweep results |
| """ |
| self.plot.add_line( |
| df.index, |
| df['expected_tx_power'], |
| legend='expected', |
| marker='circle') |
| self.plot.add_line( |
| df.index, |
| df['measured_tx_power'], |
| legend='measured', |
| marker='circle') |
| self.plot.add_line( |
| df.index, df['delta'], legend='delta', marker='circle') |
| |
| results_file_path = os.path.join(self.log_path, '{}.html'.format( |
| self.current_test_name)) |
| self.plot.generate_figure() |
| wifi_utils.BokehFigure.save_figures([self.plot], results_file_path) |
| |
| def sweep_power_cap(self): |
| sar_df = self.bt_sar_df |
| sar_df['BDR_power_cap'] = -128 |
| sar_df['EDR_power_cap'] = -128 |
| sar_df['BLE_power_cap'] = -128 |
| |
| if self.sar_version_2: |
| power_column_dict = { |
| 'BDR': 'BluetoothBDRPower', |
| 'EDR': 'BluetoothEDRPower', |
| 'BLE': 'BluetoothLEPower' |
| } |
| else: |
| power_column_dict = {'EDR': self.power_column} |
| |
| power_cap_error = False |
| |
| for type, column_name in power_column_dict.items(): |
| |
| self.log.info("Performing sanity test on {}".format(type)) |
| # Iterating through the BT SAR scenarios |
| for scenario in range(0, self.bt_sar_df.shape[0]): |
| # Reading BT SAR table row into dict |
| read_scenario = sar_df.loc[scenario].to_dict() |
| start_time = self.dut.adb.shell('date +%s.%m') |
| time.sleep(SLEEP_DURATION) |
| |
| # Setting SAR state to the read BT SAR row |
| self.set_sar_state(self.dut, read_scenario, self.country_code) |
| |
| # Reading device power cap from logcat after forcing SAR State |
| scenario_power_cap = self.get_current_power_cap( |
| self.dut, start_time, type=type) |
| sar_df.loc[scenario, '{}_power_cap'.format( |
| type)] = scenario_power_cap |
| self.log.info( |
| 'scenario: {}, ' |
| 'sar_power: {}, power_cap:{}'.format( |
| scenario, sar_df.loc[scenario, column_name], |
| sar_df.loc[scenario, '{}_power_cap'.format(type)])) |
| |
| if not sar_df['{}_power_cap'.format(type)].equals(sar_df[column_name]): |
| power_cap_error = True |
| |
| results_file_path = os.path.join(self.log_path, '{}.csv'.format( |
| self.current_test_name)) |
| sar_df.to_csv(results_file_path) |
| |
| return power_cap_error |
| |
| def sweep_table(self, |
| client_ad=None, |
| server_ad=None, |
| client_conn_id=None, |
| gatt_server=None, |
| gatt_callback=None, |
| isBLE=False): |
| """Iterates over the BT SAR table and forces signal states. |
| |
| Iterates over BT SAR table and forces signal states, |
| measuring RSSI and power level for each state. |
| |
| Args: |
| client_ad: the Android device performing the connection. |
| server_ad: the Android device accepting the connection. |
| client_conn_id: the client connection ID. |
| gatt_server: the gatt server |
| gatt_callback: Gatt callback objec |
| isBLE : boolean variable for BLE connection |
| Returns: |
| sar_df : SAR table sweep results in pandas dataframe |
| """ |
| |
| sar_df = self.bt_sar_df.copy() |
| sar_df['power_cap'] = -128 |
| sar_df['slave_rssi'] = -128 |
| sar_df['master_rssi'] = -128 |
| sar_df['ble_rssi'] = -128 |
| sar_df['pwlv'] = -1 |
| |
| # Sorts the table |
| if self.sort_order: |
| if self.sort_order.lower() == 'ascending': |
| sar_df = sar_df.sort_values( |
| by=[self.power_column], ascending=True) |
| else: |
| sar_df = sar_df.sort_values( |
| by=[self.power_column], ascending=False) |
| sar_df = sar_df.reset_index(drop=True) |
| |
| # Sweeping BT SAR table |
| for scenario in range(sar_df.shape[0]): |
| # Reading BT SAR Scenario from the table |
| read_scenario = sar_df.loc[scenario].to_dict() |
| |
| start_time = self.dut.adb.shell('date +%s.%m') |
| time.sleep(SLEEP_DURATION) |
| |
| #Setting SAR State |
| self.set_sar_state(self.dut, read_scenario, self.country_code) |
| |
| if isBLE: |
| sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap( |
| self.dut, start_time, type='BLE') |
| |
| sar_df.loc[ |
| scenario, 'ble_rssi'] = run_ble_throughput_and_read_rssi( |
| client_ad, server_ad, client_conn_id, gatt_server, |
| gatt_callback) |
| |
| self.log.info('scenario:{}, power_cap:{}, ble_rssi:{}'.format( |
| scenario, sar_df.loc[scenario, 'power_cap'], |
| sar_df.loc[scenario, 'ble_rssi'])) |
| else: |
| sar_df.loc[scenario, 'power_cap'] = self.get_current_power_cap( |
| self.dut, start_time) |
| |
| processed_bqr_results = bt_utils.get_bt_metric( |
| self.android_devices, self.duration) |
| sar_df.loc[scenario, 'slave_rssi'] = processed_bqr_results[ |
| 'rssi'][self.bt_device_controller.serial] |
| sar_df.loc[scenario, 'master_rssi'] = processed_bqr_results[ |
| 'rssi'][self.dut.serial] |
| sar_df.loc[scenario, 'pwlv'] = processed_bqr_results['pwlv'][ |
| self.dut.serial] |
| self.log.info( |
| 'scenario:{}, power_cap:{}, s_rssi:{}, m_rssi:{}, m_pwlv:{}' |
| .format(scenario, sar_df.loc[scenario, 'power_cap'], |
| sar_df.loc[scenario, 'slave_rssi'], |
| sar_df.loc[scenario, 'master_rssi'], |
| sar_df.loc[scenario, 'pwlv'])) |
| |
| self.log.info('BT SAR Table swept') |
| |
| return sar_df |
| |
| def process_table(self, sar_df): |
| """Processes the results of sweep_table and computes BT TX power. |
| |
| Processes the results of sweep_table and computes BT TX power |
| after factoring in the path loss and FTM offsets. |
| |
| Args: |
| sar_df: BT SAR table after the sweep |
| |
| Returns: |
| sar_df: processed BT SAR table |
| """ |
| |
| sar_df['pathloss'] = self.calibration_params['pathloss'] |
| |
| if hasattr(self, 'pl10_atten'): |
| sar_df['atten'] = self.pl10_atten |
| else: |
| sar_df['atten'] = FIXED_ATTENUATION |
| |
| # BT SAR Backoff for each scenario |
| if self.sar_version_2: |
| #Reads OTP values from the phone |
| self.otp = bt_utils.read_otp(self.dut) |
| |
| #OTP backoff |
| edr_otp = min(0, float(self.otp['EDR']['10'])) |
| bdr_otp = min(0, float(self.otp['BR']['10'])) |
| ble_otp = min(0, float(self.otp['BLE']['10'])) |
| |
| # EDR TX Power for PL10 |
| edr_tx_power_pl10 = self.calibration_params['target_power']['EDR']['10'] - edr_otp |
| |
| # BDR TX Power for PL10 |
| bdr_tx_power_pl10 = self.calibration_params['target_power']['BDR']['10'] - bdr_otp |
| |
| # RSSI being measured is BDR |
| offset = bdr_tx_power_pl10 - edr_tx_power_pl10 |
| |
| # BDR-EDR offset |
| sar_df['offset'] = offset |
| |
| # Max TX power permissible |
| sar_df['max_power'] = self.calibration_params['max_power'] |
| |
| # Adding a target power column |
| if 'ble_rssi' in sar_df.columns: |
| sar_df[ |
| 'target_power'] = self.calibration_params['target_power']['BLE']['10'] - ble_otp |
| else: |
| sar_df['target_power'] = sar_df['pwlv'].astype(str).map( |
| self.calibration_params['target_power']['EDR']) - edr_otp |
| |
| #Translates power_cap values to expected TX power level |
| sar_df['cap_tx_power'] = sar_df['power_cap'] / 4.0 |
| |
| sar_df['expected_tx_power'] = sar_df[[ |
| 'cap_tx_power', 'target_power', 'max_power' |
| ]].min(axis=1) |
| |
| if hasattr(self, 'pl10_atten'): |
| sar_df[ |
| 'measured_tx_power'] = sar_df['slave_rssi'] + sar_df['pathloss'] + self.pl10_atten - offset |
| else: |
| sar_df[ |
| 'measured_tx_power'] = sar_df['ble_rssi'] + sar_df['pathloss'] + FIXED_ATTENUATION |
| |
| else: |
| |
| # Adding a target power column |
| sar_df['target_power'] = sar_df['pwlv'].astype(str).map( |
| self.calibration_params['target_power']['EDR']['10']) |
| |
| # Adding a ftm power column |
| sar_df['ftm_power'] = sar_df['pwlv'].astype(str).map( |
| self.calibration_params['ftm_power']['EDR']) |
| sar_df[ |
| 'backoff'] = sar_df['target_power'] - sar_df['power_cap'] / 4.0 |
| |
| sar_df[ |
| 'expected_tx_power'] = sar_df['ftm_power'] - sar_df['backoff'] |
| sar_df[ |
| 'measured_tx_power'] = sar_df['slave_rssi'] + sar_df['pathloss'] + self.pl10_atten |
| |
| sar_df[ |
| 'delta'] = sar_df['expected_tx_power'] - sar_df['measured_tx_power'] |
| |
| self.log.info('Sweep results processed') |
| |
| results_file_path = os.path.join(self.log_path, self.current_test_name) |
| sar_df.to_csv('{}.csv'.format(results_file_path)) |
| self.save_sar_plot(sar_df) |
| |
| return sar_df |
| |
| def process_results(self, sar_df, type='EDR'): |
| """Determines the test results of the sweep. |
| |
| Parses the processed table with computed BT TX power values |
| to return pass or fail. |
| |
| Args: |
| sar_df: processed BT SAR table |
| """ |
| if self.sar_version_2: |
| breach_error_result = ( |
| sar_df['expected_tx_power'] + self.sar_margin[type] > |
| sar_df['measured_tx_power']).all() |
| if not breach_error_result: |
| asserts.fail('Measured TX power exceeds expected') |
| |
| else: |
| # checks for errors at particular points in the sweep |
| max_error_result = abs( |
| sar_df['delta']) > self.max_error_threshold[type] |
| if max_error_result: |
| asserts.fail('Maximum Error Threshold Exceeded') |
| |
| # checks for error accumulation across the sweep |
| if sar_df['delta'].sum() > self.agg_error_threshold[type]: |
| asserts.fail( |
| 'Aggregate Error Threshold Exceeded. Error: {} Threshold: {}'. |
| format(sar_df['delta'].sum(), self.agg_error_threshold)) |
| |
| asserts.explicit_pass('Measured and Expected Power Values in line') |
| |
| def set_sar_state(self, ad, signal_dict, country_code='us'): |
| """Sets the SAR state corresponding to the BT SAR signal. |
| |
| The SAR state is forced using an adb command that takes |
| device signals as input. |
| |
| Args: |
| ad: android_device object. |
| signal_dict: dict of BT SAR signals read from the SAR file. |
| Returns: |
| enforced_state: dict of device signals. |
| """ |
| signal_dict = {k: max(int(v), 0) for (k, v) in signal_dict.items()} |
| signal_dict["Wifi"] = signal_dict['WIFI5Ghz'] |
| signal_dict['WIFI2Ghz'] = 0 if signal_dict['WIFI5Ghz'] else 1 |
| |
| device_state_dict = { |
| ('Earpiece', 'earpiece'): signal_dict['Head'], |
| ('Wifi', 'wifi'): signal_dict['WIFI5Ghz'], |
| ('Wifi 2.4G', 'wifi_24g'): signal_dict['WIFI2Ghz'], |
| ('Voice', 'voice'): 0, |
| ('Wifi AP', 'wifi_ap'): signal_dict['HotspotVoice'], |
| ('Bluetooth', 'bluetooth'): 1, |
| ('Bluetooth media', 'bt_media'): signal_dict['BTMedia'], |
| ('Radio', 'radio_power'): signal_dict['Cell'], |
| ('Motion', 'motion'): signal_dict['IMU'], |
| ('Bluetooth connected', 'bt_connected'): 1 |
| } |
| |
| if 'BTHotspot' in signal_dict.keys(): |
| device_state_dict[('Bluetooth tethering', |
| 'bt_tethering')] = signal_dict['BTHotspot'] |
| |
| enforced_state = {} |
| sar_state_command = FORCE_SAR_ADB_COMMAND |
| for key in device_state_dict: |
| enforced_state[key[0]] = device_state_dict[key] |
| sar_state_command = '{} --ei {} {}'.format( |
| sar_state_command, key[1], device_state_dict[key]) |
| if self.sar_version_2: |
| sar_state_command = '{} --es country_iso "{}"'.format( |
| sar_state_command, country_code.lower()) |
| |
| #Forcing the SAR state |
| adb_output = ad.adb.shell(sar_state_command) |
| |
| # Checking if command was successfully enforced |
| if 'result=0' in adb_output: |
| self.log.info('Requested BT SAR state successfully enforced.') |
| return enforced_state |
| else: |
| self.log.error("Couldn't force BT SAR state.") |
| |
| def parse_bt_logs(self, ad, begin_time, regex=''): |
| """Returns bt software stats by parsing logcat since begin_time. |
| |
| The quantity to be fetched is dictated by the regex provided. |
| |
| Args: |
| ad: android_device object. |
| begin_time: time stamp to start the logcat parsing. |
| regex: regex for fetching the required BT software stats. |
| |
| Returns: |
| stat: the desired BT stat. |
| """ |
| # Waiting for logcat to update |
| time.sleep(SLEEP_DURATION) |
| bt_adb_log = ad.adb.logcat('-b all -t %s' % begin_time) |
| for line in bt_adb_log.splitlines(): |
| if re.findall(regex, line): |
| stat = re.findall(regex, line)[0] |
| return stat |
| |
| def set_country_code(self, ad, cc): |
| """Sets the SAR regulatory domain as per given country code |
| |
| The SAR regulatory domain is forced using an adb command that takes |
| country code as input. |
| |
| Args: |
| ad: android_device object. |
| cc: country code |
| """ |
| |
| ad.adb.shell("{} --es country_iso {}".format(FORCE_SAR_ADB_COMMAND, |
| cc)) |
| self.log.info("Country Code set to {}".format(cc)) |
| |
| def get_country_code(self, ad, begin_time): |
| """Returns the enforced regulatory domain since begin_time |
| |
| Returns enforced regulatory domain since begin_time by parsing logcat. |
| Function should follow a function call to set a country code |
| |
| Args: |
| ad : android_device obj |
| begin_time: time stamp to start |
| |
| Returns: |
| read enforced regulatory domain |
| """ |
| |
| reg_domain_regex = "updateRegulatoryDomain:\s+(\S+)" |
| reg_domain = self.parse_bt_logs(ad, begin_time, reg_domain_regex) |
| return reg_domain |
| |
| def get_current_power_cap(self, ad, begin_time, type='EDR'): |
| """ Returns the enforced software EDR power cap since begin_time. |
| |
| Returns the enforced EDR power cap since begin_time by parsing logcat. |
| Function should follow a function call that forces a SAR state |
| |
| Args: |
| ad: android_device obj. |
| begin_time: time stamp to start. |
| |
| Returns: |
| read enforced power cap |
| """ |
| power_cap_regex_dict = { |
| 'BDR': [ |
| 'Bluetooth powers: BR:\s+(\d+), EDR:\s+\d+', |
| 'Bluetooth Tx Power Cap\s+(\d+)' |
| ], |
| 'EDR': [ |
| 'Bluetooth powers: BR:\s+\d+, EDR:\s+(\d+)', |
| 'Bluetooth Tx Power Cap\s+(\d+)' |
| ], |
| 'BLE': [ |
| 'Bluetooth powers: BR:\s+\d+, EDR:\s+\d+, BLE:\s+(\d+)', |
| 'Bluetooth Tx Power Cap\s+(\d+)' |
| ] |
| } |
| |
| power_cap_regex_list = power_cap_regex_dict[type] |
| |
| for power_cap_regex in power_cap_regex_list: |
| power_cap = self.parse_bt_logs(ad, begin_time, power_cap_regex) |
| if power_cap: |
| return int(power_cap) |
| |
| raise ValueError('Failed to get TX power cap') |
| |
| def get_current_device_state(self, ad, begin_time): |
| """ Returns the device state of the android dut since begin_time. |
| |
| Returns the device state of the android dut by parsing logcat since |
| begin_time. Function should follow a function call that forces |
| a SAR state. |
| |
| Args: |
| ad: android_device obj. |
| begin_time: time stamp to start. |
| |
| Returns: |
| device_state: device state of the android device. |
| """ |
| |
| device_state_regex = 'updateDeviceState: DeviceState: ([\s*\S+\s]+)' |
| time.sleep(SLEEP_DURATION) |
| device_state = self.parse_bt_logs(ad, begin_time, device_state_regex) |
| if device_state: |
| return device_state |
| |
| raise ValueError("Couldn't fetch device state") |
| |
| def read_sar_table(self, ad, output_path=''): |
| """Extracts the BT SAR table from the phone. |
| |
| Extracts the BT SAR table from the phone into the android device |
| log path directory. |
| |
| Args: |
| ad: android_device object. |
| output_path: path to custom sar table |
| Returns: |
| df : BT SAR table (as pandas DataFrame). |
| """ |
| if not output_path: |
| output_path = os.path.join(ad.device_log_path, self.sar_file_name) |
| ad.adb.pull('{} {}'.format(self.sar_file_path, output_path)) |
| |
| df = pd.read_csv(output_path) |
| self.log.info('BT SAR table read from the phone') |
| return df |
| |
| def push_table(self, ad, src_path, dest_path=''): |
| """Pushes a BT SAR table to the phone. |
| |
| Pushes a BT SAR table to the android device and reboots the device. |
| Also creates a backup file if backup flag is True. |
| |
| Args: |
| ad: android_device object. |
| src_path: path to the BT SAR table. |
| """ |
| #Copying the to-be-pushed file for logging |
| if os.path.dirname(src_path) != ad.device_log_path: |
| job.run('cp {} {}'.format(src_path, ad.device_log_path)) |
| |
| #Pushing the file provided in the config |
| if dest_path: |
| ad.push_system_file(src_path, dest_path) |
| else: |
| ad.push_system_file(src_path, self.sar_file_path) |
| self.log.info('BT SAR table pushed') |
| ad.reboot() |
| |
| self.bt_sar_df = self.read_sar_table(self.dut, src_path) |
| |
| def set_PL10_atten_level(self, ad): |
| """Finds the attenuation level at which the phone is at PL10 |
| |
| Finds PL10 attenuation level by sweeping the attenuation range. |
| If the power level is not achieved during sweep, |
| returns the max atten level |
| |
| Args: |
| ad: android object class |
| Returns: |
| atten : attenuation level when the phone is at PL10 |
| """ |
| BT_SAR_ATTEN_STEP = 3 |
| |
| for atten in range(self.atten_min, self.atten_max, BT_SAR_ATTEN_STEP): |
| self.attenuator.set_atten(atten) |
| # Sleep required for BQR to reflect the change in parameters |
| time.sleep(SLEEP_DURATION) |
| metrics = bt_utils.get_bt_metric(ad) |
| if metrics['pwlv'][ad.serial] == 10: |
| self.log.info( |
| 'PL10 located at {}'.format(atten + BT_SAR_ATTEN_STEP)) |
| return atten + BT_SAR_ATTEN_STEP |
| |
| self.log.warn( |
| "PL10 couldn't be located in the given attenuation range") |