blob: d0297f17b80d0ca5229ed3b991e085188bbab001 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Stream music through connected device from phone test implementation."""
import antlion
import os
import pandas as pd
import shutil
import time
import antlion_contrib.test_utils.coex.audio_test_utils as atu
import antlion_contrib.test_utils.bt.bt_test_utils as btutils
from antlion import asserts
from antlion_contrib.test_utils.bt import bt_constants
from antlion_contrib.test_utils.bt import BtEnum
from antlion_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
from antlion_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from antlion_contrib.test_utils.bt.ble_performance_test_utils import plot_graph
from antlion_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
from antlion_contrib.test_utils.bt.loggers import bluetooth_metric_logger as log
from antlion.signals import TestPass, TestError
PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
INIT_ATTEN = 0
WAIT_TIME = 1
class A2dpBaseTest(BluetoothBaseTest):
"""Stream audio file over desired Bluetooth codec configurations.
Audio file should be a sine wave. Other audio files will not work for the
test analysis metrics.
Device under test is Android phone, connected to headset with a controller
that can generate a BluetoothHandsfreeAbstractDevice from test_utils.
abstract_devices.bluetooth_handsfree_abstract_device.
BuetoothHandsfreeAbstractDeviceFactory.
"""
def setup_class(self):
super().setup_class()
self.bt_logger = log.BluetoothMetricLogger.for_test_case()
self.dut = self.android_devices[0]
req_params = ['audio_params', 'music_files', 'system_path_loss']
opt_params = ['bugreport']
#'audio_params' is a dict, contains the audio device type, audio streaming
#settings such as volumn, duration, audio recording parameters such as
#channel, sampling rate/width, and thdn parameters for audio processing
self.unpack_userparams(req_params)
self.unpack_userparams(opt_params, bugreport=None)
# Find music file and push it to the dut
music_src = self.music_files[0]
music_dest = PHONE_MUSIC_FILE_DIRECTORY
success = self.dut.push_system_file(music_src, music_dest)
if success:
self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
os.path.basename(music_src))
# Initialize media_control class
self.media = btutils.MediaControlOverSl4a(self.dut, self.music_file)
# Set attenuator to minimum attenuation
if hasattr(self, 'attenuators'):
self.attenuator = self.attenuators[0]
self.attenuator.set_atten(INIT_ATTEN)
# Create the BTOE(Bluetooth-Other-End) device object
bt_devices = self.user_params.get('bt_devices', [])
if bt_devices:
attr, idx = bt_devices.split(':')
self.bt_device_controller = getattr(self, attr)[int(idx)]
self.bt_device = bt_factory().generate(self.bt_device_controller)
else:
self.log.error('No BT devices config is provided!')
def teardown_class(self):
super().teardown_class()
if hasattr(self, 'media'):
self.media.stop()
if hasattr(self, 'attenuator'):
self.attenuator.set_atten(INIT_ATTEN)
self.dut.droid.bluetoothFactoryReset()
self.bt_device.reset()
self.bt_device.power_off()
btutils.disable_bluetooth(self.dut.droid)
def setup_test(self):
super().setup_test()
# Initialize audio capture devices
self.audio_device = atu.get_audio_capture_device(
self.bt_device_controller, self.audio_params)
# Reset BT to factory defaults
self.dut.droid.bluetoothFactoryReset()
self.bt_device.reset()
self.bt_device.power_on()
btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
vol = self.dut.droid.getMaxMediaVolume() * self.audio_params['volume']
self.dut.droid.setMediaVolume(0)
time.sleep(1)
self.dut.droid.setMediaVolume(int(vol))
def teardown_test(self):
super().teardown_test()
self.dut.droid.bluetoothFactoryReset()
self.media.stop()
# Set Attenuator to the initial attenuation
if hasattr(self, 'attenuator'):
self.attenuator.set_atten(INIT_ATTEN)
self.bt_device.reset()
self.bt_device.power_off()
btutils.disable_bluetooth(self.dut.droid)
def on_pass(self, test_name, begin_time):
if hasattr(self, 'bugreport') and self.bugreport == 1:
self._take_bug_report(test_name, begin_time)
def play_and_record_audio(self, duration):
"""Play and record audio for a set duration.
Args:
duration: duration in seconds for music playing
Returns:
audio_captured: captured audio file path
"""
self.log.info('Play and record audio for {} second'.format(duration))
self.media.play()
proc = self.audio_device.start()
time.sleep(duration + WAIT_TIME)
proc.kill()
time.sleep(WAIT_TIME)
proc.kill()
audio_captured = self.audio_device.stop()
self.media.stop()
self.log.info('Audio play and record stopped')
asserts.assert_true(audio_captured, 'Audio not recorded')
return audio_captured
def _get_bt_link_metrics(self, tag=''):
"""Get bt link metrics such as rssi and tx pwls.
Returns:
master_metrics_list: list of metrics of central device
slave_metrics_list: list of metric of peripheral device
"""
self.raw_bt_metrics_path = os.path.join(self.log_path,
'BT_Raw_Metrics')
self.media.play()
# Get master rssi and power level
process_data_dict = btutils.get_bt_metric(
self.dut, tag=tag, log_path=self.raw_bt_metrics_path)
rssi_master = process_data_dict.get('rssi')
pwl_master = process_data_dict.get('pwlv')
rssi_c0_master = process_data_dict.get('rssi_c0')
rssi_c1_master = process_data_dict.get('rssi_c1')
txpw_c0_master = process_data_dict.get('txpw_c0')
txpw_c1_master = process_data_dict.get('txpw_c1')
bftx_master = process_data_dict.get('bftx')
divtx_master = process_data_dict.get('divtx')
if isinstance(self.bt_device_controller,
antlion.controllers.android_device.AndroidDevice):
rssi_slave = btutils.get_bt_rssi(self.bt_device_controller,
tag=tag,
log_path=self.raw_bt_metrics_path)
else:
rssi_slave = None
self.media.stop()
master_metrics_list = [
rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
txpw_c0_master, txpw_c1_master, bftx_master, divtx_master
]
slave_metrics_list = [rssi_slave]
return master_metrics_list, slave_metrics_list
def run_thdn_analysis(self, audio_captured, tag):
"""Calculate Total Harmonic Distortion plus Noise for latest recording.
Store result in self.metrics.
Args:
audio_captured: the captured audio file
Returns:
thdn: thdn value in a list
"""
# Calculate Total Harmonic Distortion + Noise
audio_result = atu.AudioCaptureResult(audio_captured,
self.audio_params)
thdn = audio_result.THDN(**self.audio_params['thdn_params'])
file_name = tag + os.path.basename(audio_result.path)
file_new = os.path.join(os.path.dirname(audio_result.path), file_name)
shutil.copyfile(audio_result.path, file_new)
for ch_no, t in enumerate(thdn):
self.log.info('THD+N for channel %s: %.4f%%' % (ch_no, t * 100))
return thdn
def run_anomaly_detection(self, audio_captured):
"""Detect anomalies in latest recording.
Store result in self.metrics.
Args:
audio_captured: the captured audio file
Returns:
anom: anom detected in the captured file
"""
# Detect Anomalies
audio_result = atu.AudioCaptureResult(audio_captured)
anom = audio_result.detect_anomalies(
**self.audio_params['anomaly_params'])
num_anom = 0
for ch_no, anomalies in enumerate(anom):
if anomalies:
for anomaly in anomalies:
num_anom += 1
start, end = anomaly
self.log.warning(
'Anomaly on channel {} at {}:{}. Duration '
'{} sec'.format(ch_no, start // 60, start % 60,
end - start))
else:
self.log.info('%i anomalies detected.' % num_anom)
return anom
def generate_proto(self, data_points, codec_type, sample_rate,
bits_per_sample, channel_mode):
"""Generate a results protobuf.
Args:
data_points: list of dicts representing info to go into
AudioTestDataPoint protobuffer message.
codec_type: The codec type config to store in the proto.
sample_rate: The sample rate config to store in the proto.
bits_per_sample: The bits per sample config to store in the proto.
channel_mode: The channel mode config to store in the proto.
Returns:
dict: Dictionary with key 'proto' mapping to serialized protobuf,
'proto_ascii' mapping to human readable protobuf info, and 'test'
mapping to the test class name that generated the results.
"""
# Populate protobuf
test_case_proto = self.bt_logger.proto_module.BluetoothAudioTestResult(
)
for data_point in data_points:
audio_data_proto = test_case_proto.data_points.add()
log.recursive_assign(audio_data_proto, data_point)
codec_proto = test_case_proto.a2dp_codec_config
codec_proto.codec_type = bt_constants.codec_types[codec_type]
codec_proto.sample_rate = int(sample_rate)
codec_proto.bits_per_sample = int(bits_per_sample)
codec_proto.channel_mode = bt_constants.channel_modes[channel_mode]
self.bt_logger.add_config_data_to_proto(test_case_proto, self.dut,
self.bt_device)
self.bt_logger.add_proto_to_results(test_case_proto,
self.__class__.__name__)
proto_dict = self.bt_logger.get_proto_dict(self.__class__.__name__,
test_case_proto)
del proto_dict["proto_ascii"]
return proto_dict
def set_test_atten(self, atten):
"""Set the attenuation(s) for current test condition.
"""
if hasattr(self, 'dual_chain') and self.dual_chain == 1:
ramp_attenuation(self.atten_c0,
atten,
attenuation_step_max=2,
time_wait_in_between=1)
self.log.info('Set Chain 0 attenuation to %d dB', atten)
ramp_attenuation(self.atten_c1,
atten + self.gain_mismatch,
attenuation_step_max=2,
time_wait_in_between=1)
self.log.info('Set Chain 1 attenuation to %d dB',
atten + self.gain_mismatch)
else:
ramp_attenuation(self.attenuator, atten)
self.log.info('Set attenuation to %d dB', atten)
def run_a2dp_to_max_range(self, codec_config):
attenuation_range = range(self.attenuation_vector['start'],
self.attenuation_vector['stop'] + 1,
self.attenuation_vector['step'])
data_points = []
self.file_output = os.path.join(
self.log_path, '{}.csv'.format(self.current_test_name))
# Set Codec if needed
current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig()
current_codec_type = BtEnum.BluetoothA2dpCodecType(
current_codec['codecType']).name
if current_codec_type != codec_config['codec_type']:
codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config)
asserts.assert_true(codec_set, 'Codec configuration failed.')
else:
self.log.info('Current codec is {}, no need to change'.format(
current_codec_type))
#loop RSSI with the same codec setting
for atten in attenuation_range:
self.media.play()
self.set_test_atten(atten)
tag = 'codec_{}_attenuation_{}dB_'.format(
codec_config['codec_type'], atten)
recorded_file = self.play_and_record_audio(
self.audio_params['duration'])
thdns = self.run_thdn_analysis(recorded_file, tag)
# Collect Metrics for dashboard
[
rssi_master, pwl_master, rssi_c0_master, rssi_c1_master,
txpw_c0_master, txpw_c1_master, bftx_master, divtx_master
], [rssi_slave] = self._get_bt_link_metrics(tag)
data_point = {
'attenuation_db':
int(self.attenuator.get_atten()),
'pathloss':
atten + self.system_path_loss,
'rssi_primary':
rssi_master.get(self.dut.serial, -127),
'tx_power_level_master':
pwl_master.get(self.dut.serial, -127),
'rssi_secondary':
rssi_slave.get(self.bt_device_controller.serial, -127),
'rssi_c0_dut':
rssi_c0_master.get(self.dut.serial, -127),
'rssi_c1_dut':
rssi_c1_master.get(self.dut.serial, -127),
'txpw_c0_dut':
txpw_c0_master.get(self.dut.serial, -127),
'txpw_c1_dut':
txpw_c1_master.get(self.dut.serial, -127),
'bftx_state':
bftx_master.get(self.dut.serial, -127),
'divtx_state':
divtx_master.get(self.dut.serial, -127),
'total_harmonic_distortion_plus_noise_percent':
thdns[0] * 100
}
self.log.info(data_point)
# bokeh data for generating BokehFigure
bokeh_data = {
'x_label': 'Pathloss (dBm)',
'primary_y_label': 'RSSI (dBm)',
'log_path': self.log_path,
'current_test_name': self.current_test_name
}
#plot_data for adding line to existing BokehFigure
plot_data = {
'line_one': {
'x_label': 'Pathloss (dBm)',
'primary_y_label': 'RSSI (dBm)',
'x_column': 'pathloss',
'y_column': 'rssi_primary',
'legend': 'DUT RSSI (dBm)',
'marker': 'circle_x',
'y_axis': 'default'
},
'line_two': {
'x_column': 'pathloss',
'y_column': 'rssi_secondary',
'legend': 'Remote device RSSI (dBm)',
'marker': 'hex',
'y_axis': 'default'
},
'line_three': {
'x_column': 'pathloss',
'y_column': 'tx_power_level_master',
'legend': 'DUT TX Power (dBm)',
'marker': 'hex',
'y_axis': 'secondary'
}
}
# Check thdn for glitches, stop if max range reached
if thdns[0] == 0:
proto_dict = self.generate_proto(data_points, **codec_config)
A2dpRange_df = pd.DataFrame(data_points)
A2dpRange_df.to_csv(self.file_output, index=False)
plot_graph(A2dpRange_df,
plot_data,
bokeh_data,
secondary_y_label='DUT TX Power')
raise TestError(
'Music play/recording is not working properly or Connection has lost'
)
data_points.append(data_point)
A2dpRange_df = pd.DataFrame(data_points)
for thdn in thdns:
if thdn >= self.audio_params['thdn_threshold']:
self.log.info(
'Max range at attenuation {} dB'.format(atten))
self.log.info('DUT rssi {} dBm, DUT tx power level {}, '
'Remote rssi {} dBm'.format(
rssi_master, pwl_master, rssi_slave))
proto_dict = self.generate_proto(data_points,
**codec_config)
A2dpRange_df.to_csv(self.file_output, index=False)
plot_graph(A2dpRange_df,
plot_data,
bokeh_data,
secondary_y_label='DUT TX Power')
return True
raise TestPass('Max range reached and move to next codec',
extras=proto_dict)
# Save Data points to csv
A2dpRange_df.to_csv(self.file_output, index=False)
# Plot graph
plot_graph(A2dpRange_df,
plot_data,
bokeh_data,
secondary_y_label='DUT TX Power')
proto_dict = self.generate_proto(data_points, **codec_config)
return True
raise TestPass('Could not reach max range, need extra attenuation.',
extras=proto_dict)