blob: ac2fa6d102592255a97c798e2a571fbf06c3eedf [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A comprehensive interface for performing test actions on an Apollo device.
"""
import time
from antlion.controllers.android_lib.tel.tel_utils import initiate_call
from antlion.controllers.android_lib.tel.tel_utils import wait_for_droid_in_call
from antlion.controllers.buds_lib.apollo_lib import DeviceError
from antlion.controllers.buds_lib.test_actions.agsa_acts import AgsaOTAError
from antlion.controllers.buds_lib.test_actions.base_test_actions import BaseTestAction
from antlion.controllers.buds_lib.test_actions.base_test_actions import timed_action
from antlion.controllers.buds_lib.test_actions.bt_utils import BTUtils
from antlion.libs.utils.timer import TimeRecorder
from antlion.utils import wait_until
PACKAGE_NAME_AGSA = 'com.google.android.googlequicksearchbox'
PACKAGE_NAME_GMS = 'com.google.android.gms'
PACKAGE_NAME_NEARBY = 'com.google.android.gms.policy_nearby'
PACKAGE_NAME_SETTINGS = 'com.android.settings'
BISTO_MP_DETECT_HEADER = 'Pixel Buds'
BISTO_MP_DEVICE_TEXT = 'Pixel Buds'
BISTO_MP_DETECT_TEXT = BISTO_MP_DETECT_HEADER + BISTO_MP_DEVICE_TEXT
BISTO_MP_CANCEL_TEXT = 'CANCEL'
BISTO_MP_CONNECT_TEXT = 'TAP TO CONNECT'
BISTO_MP_CONNECT_FAIL_TEXT = 'Can\'t connect to'
BISTO_MP_CONNECT_RETRY_TEXT = 'TRY AGAIN'
BISTO_MP_CONNECTED_TEXT = 'Now set up your Google Assistant'
BISTO_MP_CONNECTED_EXIT_TEXT = 'NO THANKS'
BISTO_MP_EXIT_PROMPT_TEXT = 'Exit setup?'
BISTO_MP_EXIT_CONFIRM_TEXT = 'EXIT'
PROFILES_CONNECTED = {
'HFP(pri.)': 'TRUE',
'A2DP(pri)': 'TRUE',
}
PROFILES_DISCONNECTED = {
'HFP(pri.)': 'FALSE',
'A2DP(pri)': 'FALSE',
}
COMP_PROFILE_CONNECTED = {'Comp': 'TRUE'}
COMP_PROFILE_DISCONNECTED = {'Comp': 'FALSE'}
AVRCPSTATUS = 'AvrcpPlayPause'
DEFAULT_TIMEOUT = 60 # wait 60 seconds max for bond/connect.
DEFAULT_CMD_INTERVAL = 0.5 # default interval between serial commands
DEFAULT_CMD_RETRY = 5 # default retry times when a command failed.
DEFAULT_BT_PROFILES = [
'HFP Pri', 'HFP Sec', 'A2DP Pri', 'A2DP Sec', 'CTRL', 'AUDIO', 'DEBUG',
'TRANS'
]
DEFAULT_BT_STATUS = ['A2DP(pri)', 'HFP(pri.)', 'Comp']
class TestActsError(Exception):
"""Exception from Apollo Acts Error."""
class ApolloTestActions(BaseTestAction):
"""Test action class for all Apollo test actions."""
def __init__(self, apollo_dev, logger=None):
"""
Args:
apollo_dev: apollo.lib.apollo_lib.Device the Apollo device
"""
super(ApolloTestActions, self).__init__(logger)
self.dut = apollo_dev
# need a embedded timer for connection time measurements.
self.measurement_timer = TimeRecorder()
def bluetooth_get_status(self):
status = self.dut.get_bt_status()
self.logger.info(status)
def wait_for_bluetooth_disconnection(self, timeout=60):
""" Set pairing mode and disconnect.
This action will wait until the apollo profiles are false.
Args:
timeout: integer, timeout value in seconds.
"""
result = True
apollo_status = self.dut.get_bt_status()
self.logger.info('Waiting for the disconnection.')
time.sleep(1)
ini_time = time.time()
while len(apollo_status) != len(
[s for s in apollo_status.values() if s == 'FALSE']):
apollo_status = self.dut.get_bt_status()
if (time.time() - ini_time) > timeout:
self.logger.warning('Timeout waiting for the disconnection.')
return False
time.sleep(1)
return result
def pair(self, phone, companion_app=True):
"""Pairs phone with apollo and validates bluetooth profiles.
Args:
phone: android phone
apollo: apollo device
companion_app (optional): True if the phone has a companion app
installed. False otherwise.
Raises:
TestActsError: Bluetooth pairing failed/ Dut BT status check failed.
"""
bt_util = BTUtils()
target_addr = self.dut.bluetooth_address
if bt_util.android_device_in_connected_state(phone, target_addr):
self.logger.info('Already paired and connected, skipping pairing.')
else:
if bt_util.android_device_in_paired_state(phone, target_addr):
self.logger.info(
'Device is paired but not connected, unpair first.')
if not bt_util.bt_unpair(phone, self.dut):
raise TestActsError('Unable to unpair the device')
bt_util.bt_pair_and_connect(phone, self.dut)
self.logger.info('DEVICE PAIRED')
if companion_app:
profiles = PROFILES_CONNECTED.copy()
profiles.update(COMP_PROFILE_CONNECTED)
else:
profiles = PROFILES_CONNECTED
self.logger.info(profiles)
if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
raise TestActsError('Dut BT status check failed.')
else:
return True
def unpair(self, phone, companion_app=True, factory_reset_dut=True):
"""Unpairs phone from apollo and validates bluetooth profiles.
Args:
phone: android phone
apollo: apollo device
companion_app (optional): True if the phone has a companion app
installed. False otherwise.
Raises:
TestActsError: Bluetooth unpairing/Dut BT status check failed.
"""
bt_util = BTUtils()
target_addr = self.dut.bluetooth_address
if not bt_util.android_device_in_paired_state(phone, target_addr):
self.logger.info('Device is already unpaired, skipping unpairing.')
else:
result = bt_util.bt_unpair(
phone, self.dut, factory_reset_dut=factory_reset_dut)
if not result:
raise TestActsError('Bluetooth unpairing failed.')
if companion_app:
profiles = PROFILES_DISCONNECTED.copy()
profiles.update(COMP_PROFILE_DISCONNECTED)
else:
profiles = PROFILES_DISCONNECTED
if not bt_util.check_device_bt(device=self.dut, profiles=profiles):
raise TestActsError('Dut BT status check failed.')
else:
return True
def is_paired(self, phone):
"""Check if the given apollo is paired with the android device.
Args:
phone: android phone
apollo: apollo device
Returns:
Bool: True if apollo is paired with the phone.
"""
bt_util = BTUtils()
target_addr = self.dut.bluetooth_address
return bt_util.android_device_in_paired_state(phone, target_addr)
def send_music_play_event_and_validate(self):
"""Send the play event on Apollo and validate the response and DSP
Status.
Raises:
TestActsError: Error while playing the music.
"""
play_detection_timeout = 1
if self.dut.is_streaming():
self.logger.info('Music already streaming. Skipping play event..')
return
self.logger.info('Playing video...')
is_played = self.dut.music_control_events(
AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PLAY_REGEX)
if not is_played:
self.logger.error('AVRCP Played status not found')
raise TestActsError('AVRCP Played status not found.')
wait_until(
lambda: self.dut.is_streaming(),
play_detection_timeout,
sleep_s=0.25)
if not self.dut.is_streaming():
self.logger.error('Device is NOT in a deviceA2DPStreaming state')
raise TestActsError(
'Device is NOT in a deviceA2DPStreaming state.')
def send_music_pause_event_and_validate(self):
"""Send the pause event on Apollo and validate the responses and DSP
Status.
Raises:
TestActsError: Error while pausing the music.
"""
paused_detection_timeout = 10
if not self.dut.is_streaming():
self.logger.info('Music not streaming. Skipping pause event..')
return
self.logger.info("Pausing video...")
is_paused = self.dut.music_control_events(
AVRCPSTATUS, self.dut.apollo_log_regex.AVRCP_PAUSE_REGEX)
if not is_paused:
self.logger.error('AVRCP Paused statue not found')
raise TestActsError('AVRCP Paused status not found.')
wait_until(
lambda: not self.dut.is_streaming(),
paused_detection_timeout,
sleep_s=0.25)
if self.dut.is_streaming():
self.logger.error('Device is still in deviceA2DPStreaming state')
raise TestActsError(
'Device is still in deviceA2DPStreaming state.')
def vol_down_and_validate(self):
"""Send volume down twice and validate by comparing two levels
Raises:
TestActsError: Error
"""
self.logger.info('Decreasing volume')
before_vol = self.dut.volume('Down', 1)
time.sleep(2)
after_vol = self.dut.volume('Down', 1)
if not after_vol or not before_vol or after_vol >= before_vol:
self.logger.error(
'Unable to decrease the volume. Before: %s. After: %s' %
(before_vol, after_vol))
raise TestActsError('error decreasing volume')
def vol_up_and_validate(self):
"""Send volume up twice and validate by comparing two levels
Raises:
TestActsError: Error
"""
self.logger.info('Increasing volume')
before_vol = self.dut.volume('Up', 1)
time.sleep(2)
after_vol = self.dut.volume('Up', 1)
if not after_vol or not before_vol or after_vol <= before_vol:
self.logger.error(
'Unable to increase the volume. Before: %s. After: %s' %
(before_vol, after_vol))
raise TestActsError('error increasing volume')
def call_and_validate_ringing(self,
calling_phone,
number_to_call,
call_retries=10):
for i in range(call_retries):
initiate_call(self.logger, calling_phone, number_to_call)
is_calling = wait_for_droid_in_call(
self.logger, calling_phone, max_time=10)
if is_calling:
self.logger.info('Call initiated!')
break
else:
self.logger.warning('Call is not initiating.')
if i == call_retries:
self.logger.error('Call initiation retries exhausted')
raise TestActsError(
'%s retries failed to initiate the call' %
(call_retries))
self.logger.warning('Retrying call...')
# wait for offhook state and return
wait_until(
(lambda: calling_phone.droid.telecomGetCallState() == 'OFFHOOK'),
timeout_s=40,
condition=True,
sleep_s=.5)
self.logger.info('Phone call initiated on %s' % calling_phone.serial)
def answer_phone_and_validate_call_received(self, receiving_phone):
# wait until the phone rings (assumes that a call is initiated prior to
# running the command)
wait_until(
lambda: receiving_phone.droid.telecomGetCallState() == 'RINGING',
timeout_s=40,
condition=True,
sleep_s=.5)
self.logger.info('Ring detected on %s - now answering the call...' %
(receiving_phone.serial))
# answer the phone call
self.dut.tap()
# wait until OFFHOOK state
wait_until(
lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
timeout_s=40,
condition=True,
sleep_s=.5)
def hangup_phone_and_validate_call_hung(self, receiving_phone):
# wait for phone to be in OFFHOOK state (assumed that a call is answered
# and engaged)
wait_until(
lambda: receiving_phone.droid.telecomGetCallState() == 'OFFHOOK',
timeout_s=40,
condition=True,
sleep_s=.5)
# end the call (post and pre 1663 have different way of ending call)
self.logger.info(
'Hanging up the call on %s...' % receiving_phone.serial)
if self.dut.version < 1663:
self.dut.tap()
else:
self.dut.hold(duration=100)
# wait for idle state
wait_until(
lambda: receiving_phone.droid.telecomGetCallState() == 'IDLE',
timeout_s=40,
condition=True,
sleep_s=.5)
@timed_action
def factory_reset(self):
ret = False
try:
self.dut.factory_reset()
ret = True
except DeviceError as ex:
self.logger.warning('Failed to reset Apollo: %s' % ex)
return ret
@timed_action
def wait_for_magic_pairing_notification(self, android_act, timeout=60):
dut_detected = False
start_time = time.time()
self.logger.info('Waiting for MP prompt: %s' % BISTO_MP_DEVICE_TEXT)
while not dut_detected:
android_act.dut.ui_util.uia.wait.update()
self.sleep(1)
if android_act.dut.ui_util.uia(
textContains=BISTO_MP_DETECT_HEADER, enabled=True).exists:
if android_act.dut.ui_util.uia(
textContains=BISTO_MP_DEVICE_TEXT,
enabled=True).exists:
self.logger.info('DUT Apollo MP prompt detected!')
dut_detected = True
else:
self.logger.info(
'NONE DUT Apollo MP prompt detected! Cancel and RETRY!'
)
android_act.dut.ui_util.click_by_text(BISTO_MP_CANCEL_TEXT)
if time.time() - start_time > timeout:
break
if not dut_detected:
self.logger.info(
'Failed to get %s MP prompt' % BISTO_MP_DEVICE_TEXT)
return dut_detected
@timed_action
def start_magic_pairing(self, android_act, timeout=30, retries=3):
paired = False
android_act.dut.ui_util.click_by_text(
BISTO_MP_CONNECT_TEXT, timeout=timeout)
connect_start_time = time.time()
count = 0
timeout = 30
while not paired and count < retries:
android_act.dut.ui_util.uia.wait.update()
self.sleep(1)
if time.time() - connect_start_time > timeout:
self.logger.info('Time out! %s seconds' % time)
android_act.app_force_close_agsa()
self.logger.info('Timeout(s): %s' % timeout)
break
if android_act.dut.ui_util.uia(
textContains=BISTO_MP_CONNECT_FAIL_TEXT,
enabled=True).exists:
count += 1
self.logger.info('MP FAILED! Retry %s.' % count)
android_act.dut.ui_util.click_by_text(
BISTO_MP_CONNECT_RETRY_TEXT)
connect_start_time = time.time()
elif android_act.dut.ui_util.uia(
textContains=BISTO_MP_CONNECTED_TEXT, enabled=True).exists:
self.logger.info('MP SUCCESSFUL! Exiting AGSA...')
paired = True
android_act.dut.ui_util.click_by_text(
BISTO_MP_CONNECTED_EXIT_TEXT)
android_act.dut.ui_util.wait_for_text(
BISTO_MP_EXIT_PROMPT_TEXT)
android_act.dut.ui_util.click_by_text(
BISTO_MP_EXIT_CONFIRM_TEXT)
return paired
@timed_action
def turn_bluetooth_on(self):
self.dut.cmd('pow 1')
return True
@timed_action
def turn_bluetooth_off(self):
self.dut.cmd('pow 0')
return True
@timed_action
def wait_for_bluetooth_a2dp_hfp(self,
timeout=DEFAULT_TIMEOUT,
interval=DEFAULT_CMD_INTERVAL):
"""Wait for BT connection by checking if A2DP and HFP connected.
This is used for BT pair+connect test.
Args:
timeout: float, timeout value in second.
interval: float, float, interval between polling BT profiles.
timer: TimeRecorder, time recorder to save the connection time.
"""
# Need to check these two profiles
pass_profiles = ['A2DP Pri', 'HFP Pri']
# TODO(b/122730302): Change to just raise an error
ret = False
try:
ret = self._wait_for_bluetooth_profile_connection(
pass_profiles, timeout, interval, self.measurement_timer)
except DeviceError as ex:
self.logger.warning('Failed to wait for BT connection: %s' % ex)
return ret
def _wait_for_bluetooth_profile_connection(self, profiles_to_check,
timeout, interval, timer):
"""A generic method to wait for specified BT profile connection.
Args:
profiles_to_check: list, profile names (A2DP, HFP, etc.) to be
checked.
timeout: float, timeout value in second.
interval: float, interval between polling BT profiles.
timer: TimeRecorder, time recorder to save the connection time.
Returns:
bool, True if checked profiles are connected, False otherwise.
"""
timer.start_timer(profiles_to_check, force=True)
start_time = time.time()
while time.time() - start_time < timeout:
profiles = self._bluetooth_check_profile_connection()
for profile in profiles:
if profiles[profile]:
timer.stop_timer(profile)
# now check if the specified profile connected.
all_connected = True
for profile in profiles_to_check:
if not profiles[profile]:
all_connected = False
break
if all_connected:
return True
time.sleep(interval)
# make sure the profile timer are stopped.
timer.stop_timer(profiles_to_check)
return False
def _bluetooth_check_profile_connection(self):
"""Return profile connection in a boolean dict.
key=<profile name>, val = T/F
"""
profiles = dict()
output = self.dut.get_conn_devices()
# need to strip all whitespaces.
conn_devs = {}
for key in output:
conn_devs[key.strip()] = output[key].strip()
for key in conn_devs:
self.logger.info('%s:%s' % (key, conn_devs[key]))
if 'XXXXXXXX' in conn_devs[key]:
profiles[key] = conn_devs[key]
else:
profiles[key] = False
return profiles
@timed_action
def wait_for_bluetooth_status_connection_all(
self, timeout=DEFAULT_TIMEOUT, interval=DEFAULT_CMD_INTERVAL):
"""Wait for BT connection by checking if A2DP, HFP and COMP connected.
This is used for BT reconnect test.
Args:
timeout: float, timeout value in second.
interval: float, float, interval between polling BT profiles.
"""
ret = False
self.measurement_timer.start_timer(DEFAULT_BT_STATUS, force=True)
# All profile not connected by default.
connected_status = {key: False for key in DEFAULT_BT_STATUS}
start_time = time.time()
while time.time() < start_time + timeout:
try:
time.sleep(interval)
status = self.dut.get_bt_status()
for key in DEFAULT_BT_STATUS:
if (not connected_status[key] and key in status
and 'TRUE' == status[key]):
self.measurement_timer.stop_timer(key)
connected_status[key] = True
self.logger.info(
'BT status %s connected at %fs.' %
(key, self.measurement_timer.elapsed(key)))
if False not in connected_status.values():
ret = True
break
except DeviceError as ex:
self.logger.warning(
'Device exception when waiting for reconnection: %s' % ex)
self.measurement_timer.stop_timer(DEFAULT_BT_STATUS)
return ret
def initiate_ota_via_agsa_verify_transfer_completion_in_logcat(
self,
agsa_action,
dfu_path,
destination=None,
force=True,
apply_image=True,
reconnect=True):
"""
Starts an OTA by issuing an intent to AGSA after copying the dfu file to
the appropriate location on the phone
Args:
agsa_action: projects.agsa.lib.test_actions.agsa_acts
.AgsaTestActions
dfu_path: string - absolute path of dfu file
destination: string - absolute path of file on phone if not
specified will use
/storage/emulated/0/Android/data/com.google.android
.googlequicksearchbox/files/download_cache/apollo.dfu
force: value set in the intent sent to AGSA
True if success False otherwise
"""
try:
agsa_action.initiate_agsa_and_wait_until_transfer(
dfu_path, destination=destination, force=force)
if apply_image:
# set in case
self.dut.set_in_case(reconnect=reconnect)
except AgsaOTAError as ex:
self.logger.error('Failed to OTA via AGSA %s' % ex)
return False
except DeviceError as ex:
self.logger.error('Failed to bring up device %s' % ex)
return False
return True
@timed_action
def wait_for_bluetooth_a2dp_hfp_rfcomm_connect(
self, address, timeout=DEFAULT_TIMEOUT,
interval=DEFAULT_CMD_INTERVAL):
"""Wait for BT reconnection by checking if A2DP, HFP and COMP connected
to the specified address.
This is used for BT connection switch test.
Args:
address: str, MAC of the address to connect.
timeout: float, timeout value in second.
interval: float, float, interval between polling BT profiles.
Returns:
True if the specified address is connected. False otherwise.
"""
last_4_hex = address.replace(':', '')[-4:].lower()
profiles_to_check = ['HFP Pri', 'A2DP Pri', 'CTRL', 'AUDIO']
self.measurement_timer.start_timer(profiles_to_check, force=True)
end_time = time.time() + timeout
all_connected = True
while time.time() < end_time:
all_connected = True
profiles = self._bluetooth_check_profile_connection()
for profile in profiles_to_check:
if (profile in profiles and profiles[profile]
and last_4_hex in profiles[profile].lower()):
self.measurement_timer.stop_timer(profile)
else:
all_connected = False
if all_connected:
break
time.sleep(interval)
# make sure the profile timer are stopped.
self.measurement_timer.stop_timer(profiles_to_check)
return all_connected