blob: c18741cce249a03eab839f0ac0c3f1ecdf6946df [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic telephony utility functions. Cloned from test_utils.tel."""
import re
import struct
import time
from queue import Empty
from antlion.logger import epoch_to_log_line_timestamp
from antlion.controllers.adb_lib.error import AdbCommandError
INCALL_UI_DISPLAY_FOREGROUND = "foreground"
INCALL_UI_DISPLAY_BACKGROUND = "background"
INCALL_UI_DISPLAY_DEFAULT = "default"
# Max time to wait after caller make a call and before
# callee start ringing
MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT = 30
# Max time to wait after toggle airplane mode and before
# get expected event
MAX_WAIT_TIME_AIRPLANEMODE_EVENT = 90
# Wait time between state check retry
WAIT_TIME_BETWEEN_STATE_CHECK = 5
# Constant for Data Roaming State
DATA_ROAMING_ENABLE = 1
DATA_ROAMING_DISABLE = 0
# Constant for Telephony Manager Call State
TELEPHONY_STATE_RINGING = "RINGING"
TELEPHONY_STATE_IDLE = "IDLE"
TELEPHONY_STATE_OFFHOOK = "OFFHOOK"
TELEPHONY_STATE_UNKNOWN = "UNKNOWN"
# Constant for Service State
SERVICE_STATE_EMERGENCY_ONLY = "EMERGENCY_ONLY"
SERVICE_STATE_IN_SERVICE = "IN_SERVICE"
SERVICE_STATE_OUT_OF_SERVICE = "OUT_OF_SERVICE"
SERVICE_STATE_POWER_OFF = "POWER_OFF"
SERVICE_STATE_UNKNOWN = "UNKNOWN"
# Constant for Network Mode
NETWORK_MODE_GSM_ONLY = "NETWORK_MODE_GSM_ONLY"
NETWORK_MODE_WCDMA_ONLY = "NETWORK_MODE_WCDMA_ONLY"
NETWORK_MODE_LTE_ONLY = "NETWORK_MODE_LTE_ONLY"
# Constant for Events
EVENT_CALL_STATE_CHANGED = "CallStateChanged"
EVENT_SERVICE_STATE_CHANGED = "ServiceStateChanged"
class CallStateContainer:
INCOMING_NUMBER = "incomingNumber"
SUBSCRIPTION_ID = "subscriptionId"
CALL_STATE = "callState"
class ServiceStateContainer:
VOICE_REG_STATE = "voiceRegState"
VOICE_NETWORK_TYPE = "voiceNetworkType"
DATA_REG_STATE = "dataRegState"
DATA_NETWORK_TYPE = "dataNetworkType"
OPERATOR_NAME = "operatorName"
OPERATOR_ID = "operatorId"
IS_MANUAL_NW_SELECTION = "isManualNwSelection"
ROAMING = "roaming"
IS_EMERGENCY_ONLY = "isEmergencyOnly"
NETWORK_ID = "networkId"
SYSTEM_ID = "systemId"
SUBSCRIPTION_ID = "subscriptionId"
SERVICE_STATE = "serviceState"
def dumpsys_last_call_info(ad):
"""Get call information by dumpsys telecom."""
num = dumpsys_last_call_number(ad)
output = ad.adb.shell("dumpsys telecom")
result = re.search(r"Call TC@%s: {(.*?)}" % num, output, re.DOTALL)
call_info = {"TC": num}
if result:
result = result.group(1)
for attr in (
"startTime",
"endTime",
"direction",
"isInterrupted",
"callTechnologies",
"callTerminationsReason",
"isVideoCall",
"callProperties",
):
match = re.search(r"%s: (.*)" % attr, result)
if match:
if attr in ("startTime", "endTime"):
call_info[attr] = epoch_to_log_line_timestamp(int(match.group(1)))
else:
call_info[attr] = match.group(1)
ad.log.debug("call_info = %s", call_info)
return call_info
def dumpsys_last_call_number(ad):
output = ad.adb.shell("dumpsys telecom")
call_nums = re.findall("Call TC@(\d+):", output)
if not call_nums:
return 0
else:
return int(call_nums[-1])
def get_device_epoch_time(ad):
return int(1000 * float(ad.adb.shell("date +%s.%N")))
def get_outgoing_voice_sub_id(ad):
"""Get outgoing voice subscription id"""
if hasattr(ad, "outgoing_voice_sub_id"):
return ad.outgoing_voice_sub_id
else:
return ad.droid.subscriptionGetDefaultVoiceSubId()
def get_rx_tx_power_levels(log, ad):
"""Obtains Rx and Tx power levels from the MDS application.
The method requires the MDS app to be installed in the DUT.
Args:
log: logger object
ad: an android device
Return:
A tuple where the first element is an array array with the RSRP value
in Rx chain, and the second element is the transmitted power in dBm.
Values for invalid Rx / Tx chains are set to None.
"""
cmd = (
'am instrument -w -e request "80 00 e8 03 00 08 00 00 00" -e '
'response wait "com.google.mdstest/com.google.mdstest.instrument.'
'ModemCommandInstrumentation"'
)
try:
output = ad.adb.shell(cmd)
except AdbCommandError as e:
log.error(e)
output = None
if not output or "result=SUCCESS" not in output:
raise RuntimeError(
"Could not obtain Tx/Rx power levels from MDS. Is " "the MDS app installed?"
)
response = re.search(r"(?<=response=).+", output)
if not response:
raise RuntimeError("Invalid response from the MDS app:\n" + output)
# Obtain a list of bytes in hex format from the response string
response_hex = response.group(0).split(" ")
def get_bool(pos):
"""Obtain a boolean variable from the byte array."""
return response_hex[pos] == "01"
def get_int32(pos):
"""Obtain an int from the byte array. Bytes are printed in
little endian format."""
return struct.unpack(
"<i", bytearray.fromhex("".join(response_hex[pos : pos + 4]))
)[0]
rx_power = []
RX_CHAINS = 4
for i in range(RX_CHAINS):
# Calculate starting position for the Rx chain data structure
start = 12 + i * 22
# The first byte in the data structure indicates if the rx chain is
# valid.
if get_bool(start):
rx_power.append(get_int32(start + 2) / 10)
else:
rx_power.append(None)
# Calculate the position for the tx chain data structure
tx_pos = 12 + RX_CHAINS * 22
tx_valid = get_bool(tx_pos)
if tx_valid:
tx_power = get_int32(tx_pos + 2) / -10
else:
tx_power = None
return rx_power, tx_power
def get_telephony_signal_strength(ad):
# {'evdoEcio': -1, 'asuLevel': 28, 'lteSignalStrength': 14, 'gsmLevel': 0,
# 'cdmaAsuLevel': 99, 'evdoDbm': -120, 'gsmDbm': -1, 'cdmaEcio': -160,
# 'level': 2, 'lteLevel': 2, 'cdmaDbm': -120, 'dbm': -112, 'cdmaLevel': 0,
# 'lteAsuLevel': 28, 'gsmAsuLevel': 99, 'gsmBitErrorRate': 0,
# 'lteDbm': -112, 'gsmSignalStrength': 99}
try:
signal_strength = ad.droid.telephonyGetSignalStrength()
if not signal_strength:
signal_strength = {}
except Exception as e:
ad.log.error(e)
signal_strength = {}
return signal_strength
def initiate_call(
log,
ad,
callee_number,
emergency=False,
incall_ui_display=INCALL_UI_DISPLAY_FOREGROUND,
video=False,
):
"""Make phone call from caller to callee.
Args:
log: log object.
ad: Caller android device object.
callee_number: Callee phone number.
emergency : specify the call is emergency.
Optional. Default value is False.
incall_ui_display: show the dialer UI foreground or background
video: whether to initiate as video call
Returns:
result: if phone call is placed successfully.
"""
ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
sub_id = get_outgoing_voice_sub_id(ad)
begin_time = get_device_epoch_time(ad)
ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
try:
# Make a Call
ad.log.info("Make a phone call to %s", callee_number)
if emergency:
ad.droid.telecomCallEmergencyNumber(callee_number)
else:
ad.droid.telecomCallNumber(callee_number, video)
# Verify OFFHOOK state
if not wait_for_call_offhook_for_subscription(
log, ad, sub_id, event_tracking_started=True
):
ad.log.info("sub_id %s not in call offhook state", sub_id)
last_call_drop_reason(ad, begin_time=begin_time)
return False
else:
return True
finally:
if hasattr(ad, "sdm_log") and getattr(ad, "sdm_log"):
ad.adb.shell("i2cset -fy 3 64 6 1 b", ignore_status=True)
ad.adb.shell("i2cset -fy 3 65 6 1 b", ignore_status=True)
ad.droid.telephonyStopTrackingCallStateChangeForSubscription(sub_id)
if incall_ui_display == INCALL_UI_DISPLAY_FOREGROUND:
ad.droid.telecomShowInCallScreen()
elif incall_ui_display == INCALL_UI_DISPLAY_BACKGROUND:
ad.droid.showHomeScreen()
def is_event_match(event, field, value):
"""Return if <field> in "event" match <value> or not.
Args:
event: event to test. This event need to have <field>.
field: field to match.
value: value to match.
Returns:
True if <field> in "event" match <value>.
False otherwise.
"""
return is_event_match_for_list(event, field, [value])
def is_event_match_for_list(event, field, value_list):
"""Return if <field> in "event" match any one of the value
in "value_list" or not.
Args:
event: event to test. This event need to have <field>.
field: field to match.
value_list: a list of value to match.
Returns:
True if <field> in "event" match one of the value in "value_list".
False otherwise.
"""
try:
value_in_event = event["data"][field]
except KeyError:
return False
for value in value_list:
if value_in_event == value:
return True
return False
def is_phone_in_call(log, ad):
"""Return True if phone in call.
Args:
log: log object.
ad: android device.
"""
try:
return ad.droid.telecomIsInCall()
except:
return "mCallState=2" in ad.adb.shell(
"dumpsys telephony.registry | grep mCallState"
)
def last_call_drop_reason(ad, begin_time=None):
reasons = ad.search_logcat(
"qcril_qmi_voice_map_qmi_to_ril_last_call_failure_cause", begin_time
)
reason_string = ""
if reasons:
log_msg = "Logcat call drop reasons:"
for reason in reasons:
log_msg = "%s\n\t%s" % (log_msg, reason["log_message"])
if "ril reason str" in reason["log_message"]:
reason_string = reason["log_message"].split(":")[-1].strip()
ad.log.info(log_msg)
reasons = ad.search_logcat("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION", begin_time)
if reasons:
ad.log.warning("ACTION_FORBIDDEN_NO_SERVICE_AUTHORIZATION is seen")
ad.log.info("last call dumpsys: %s", sorted(dumpsys_last_call_info(ad).items()))
return reason_string
def toggle_airplane_mode(log, ad, new_state=None, strict_checking=True):
"""Toggle the state of airplane mode.
Args:
log: log handler.
ad: android_device object.
new_state: Airplane mode state to set to.
If None, opposite of the current state.
strict_checking: Whether to turn on strict checking that checks all features.
Returns:
result: True if operation succeed. False if error happens.
"""
if ad.skip_sl4a:
return toggle_airplane_mode_by_adb(log, ad, new_state)
else:
return toggle_airplane_mode_msim(
log, ad, new_state, strict_checking=strict_checking
)
def toggle_airplane_mode_by_adb(log, ad, new_state=None):
"""Toggle the state of airplane mode.
Args:
log: log handler.
ad: android_device object.
new_state: Airplane mode state to set to.
If None, opposite of the current state.
Returns:
result: True if operation succeed. False if error happens.
"""
cur_state = bool(int(ad.adb.shell("settings get global airplane_mode_on")))
if new_state == cur_state:
ad.log.info("Airplane mode already in %s", new_state)
return True
elif new_state is None:
new_state = not cur_state
ad.log.info("Change airplane mode from %s to %s", cur_state, new_state)
try:
ad.adb.shell("settings put global airplane_mode_on %s" % int(new_state))
ad.adb.shell("am broadcast -a android.intent.action.AIRPLANE_MODE")
except Exception as e:
ad.log.error(e)
return False
changed_state = bool(int(ad.adb.shell("settings get global airplane_mode_on")))
return changed_state == new_state
def toggle_airplane_mode_msim(log, ad, new_state=None, strict_checking=True):
"""Toggle the state of airplane mode.
Args:
log: log handler.
ad: android_device object.
new_state: Airplane mode state to set to.
If None, opposite of the current state.
strict_checking: Whether to turn on strict checking that checks all features.
Returns:
result: True if operation succeed. False if error happens.
"""
cur_state = ad.droid.connectivityCheckAirplaneMode()
if cur_state == new_state:
ad.log.info("Airplane mode already in %s", new_state)
return True
elif new_state is None:
new_state = not cur_state
ad.log.info("Toggle APM mode, from current tate %s to %s", cur_state, new_state)
sub_id_list = []
active_sub_info = ad.droid.subscriptionGetAllSubInfoList()
if active_sub_info:
for info in active_sub_info:
sub_id_list.append(info["subscriptionId"])
ad.ed.clear_all_events()
time.sleep(0.1)
service_state_list = []
if new_state:
service_state_list.append(SERVICE_STATE_POWER_OFF)
ad.log.info("Turn on airplane mode")
else:
# If either one of these 3 events show up, it should be OK.
# Normal SIM, phone in service
service_state_list.append(SERVICE_STATE_IN_SERVICE)
# NO SIM, or Dead SIM, or no Roaming coverage.
service_state_list.append(SERVICE_STATE_OUT_OF_SERVICE)
service_state_list.append(SERVICE_STATE_EMERGENCY_ONLY)
ad.log.info("Turn off airplane mode")
for sub_id in sub_id_list:
ad.droid.telephonyStartTrackingServiceStateChangeForSubscription(sub_id)
timeout_time = time.time() + MAX_WAIT_TIME_AIRPLANEMODE_EVENT
ad.droid.connectivityToggleAirplaneMode(new_state)
try:
try:
event = ad.ed.wait_for_event(
EVENT_SERVICE_STATE_CHANGED,
is_event_match_for_list,
timeout=MAX_WAIT_TIME_AIRPLANEMODE_EVENT,
field=ServiceStateContainer.SERVICE_STATE,
value_list=service_state_list,
)
ad.log.info("Got event %s", event)
except Empty:
ad.log.warning(
"Did not get expected service state change to %s", service_state_list
)
finally:
for sub_id in sub_id_list:
ad.droid.telephonyStopTrackingServiceStateChangeForSubscription(sub_id)
except Exception as e:
ad.log.error(e)
# APM on (new_state=True) will turn off bluetooth but may not turn it on
try:
if new_state and not _wait_for_bluetooth_in_state(
log, ad, False, timeout_time - time.time()
):
ad.log.error("Failed waiting for bluetooth during airplane mode toggle")
if strict_checking:
return False
except Exception as e:
ad.log.error("Failed to check bluetooth state due to %s", e)
if strict_checking:
raise
# APM on (new_state=True) will turn off wifi but may not turn it on
if new_state and not _wait_for_wifi_in_state(
log, ad, False, timeout_time - time.time()
):
ad.log.error("Failed waiting for wifi during airplane mode toggle on")
if strict_checking:
return False
if ad.droid.connectivityCheckAirplaneMode() != new_state:
ad.log.error("Set airplane mode to %s failed", new_state)
return False
return True
def toggle_cell_data_roaming(ad, state):
"""Enable cell data roaming for default data subscription.
Wait for the data roaming status to be DATA_STATE_CONNECTED
or DATA_STATE_DISCONNECTED.
Args:
ad: Android Device Object.
state: True or False for enable or disable cell data roaming.
Returns:
True if success.
False if failed.
"""
state_int = {True: DATA_ROAMING_ENABLE, False: DATA_ROAMING_DISABLE}[state]
action_str = {True: "Enable", False: "Disable"}[state]
if ad.droid.connectivityCheckDataRoamingMode() == state:
ad.log.info("Data roaming is already in state %s", state)
return True
if not ad.droid.connectivitySetDataRoaming(state_int):
ad.error.info("Fail to config data roaming into state %s", state)
return False
if ad.droid.connectivityCheckDataRoamingMode() == state:
ad.log.info("Data roaming is configured into state %s", state)
return True
else:
ad.log.error("Data roaming is not configured into state %s", state)
return False
def wait_for_call_offhook_event(
log,
ad,
sub_id,
event_tracking_started=False,
timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT,
):
"""Wait for an incoming call on specified subscription.
Args:
log: log object.
ad: android device object.
event_tracking_started: True if event tracking already state outside
timeout: time to wait for event
Returns:
True: if call offhook event is received.
False: if call offhook event is not received.
"""
if not event_tracking_started:
ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
try:
ad.ed.wait_for_event(
EVENT_CALL_STATE_CHANGED,
is_event_match,
timeout=timeout,
field=CallStateContainer.CALL_STATE,
value=TELEPHONY_STATE_OFFHOOK,
)
ad.log.info("Got event %s", TELEPHONY_STATE_OFFHOOK)
except Empty:
ad.log.info("No event for call state change to OFFHOOK")
return False
finally:
if not event_tracking_started:
ad.droid.telephonyStopTrackingCallStateChangeForSubscription(sub_id)
return True
def wait_for_call_offhook_for_subscription(
log,
ad,
sub_id,
event_tracking_started=False,
timeout=MAX_WAIT_TIME_ACCEPT_CALL_TO_OFFHOOK_EVENT,
interval=WAIT_TIME_BETWEEN_STATE_CHECK,
):
"""Wait for an incoming call on specified subscription.
Args:
log: log object.
ad: android device object.
sub_id: subscription ID
timeout: time to wait for ring
interval: checking interval
Returns:
True: if incoming call is received and answered successfully.
False: for errors
"""
if not event_tracking_started:
ad.ed.clear_events(EVENT_CALL_STATE_CHANGED)
ad.droid.telephonyStartTrackingCallStateForSubscription(sub_id)
offhook_event_received = False
end_time = time.time() + timeout
try:
while time.time() < end_time:
if not offhook_event_received:
if wait_for_call_offhook_event(log, ad, sub_id, True, interval):
offhook_event_received = True
telephony_state = ad.droid.telephonyGetCallStateForSubscription(sub_id)
telecom_state = ad.droid.telecomGetCallState()
if telephony_state == TELEPHONY_STATE_OFFHOOK and (
telecom_state == TELEPHONY_STATE_OFFHOOK
):
ad.log.info("telephony and telecom are in OFFHOOK state")
return True
else:
ad.log.info(
"telephony in %s, telecom in %s, expecting OFFHOOK state",
telephony_state,
telecom_state,
)
if offhook_event_received:
time.sleep(interval)
finally:
if not event_tracking_started:
ad.droid.telephonyStopTrackingCallStateChangeForSubscription(sub_id)
def _wait_for_bluetooth_in_state(log, ad, state, max_wait):
# FIXME: These event names should be defined in a common location
_BLUETOOTH_STATE_ON_EVENT = "BluetoothStateChangedOn"
_BLUETOOTH_STATE_OFF_EVENT = "BluetoothStateChangedOff"
ad.ed.clear_events(_BLUETOOTH_STATE_ON_EVENT)
ad.ed.clear_events(_BLUETOOTH_STATE_OFF_EVENT)
ad.droid.bluetoothStartListeningForAdapterStateChange()
try:
bt_state = ad.droid.bluetoothCheckState()
if bt_state == state:
return True
if max_wait <= 0:
ad.log.error(
"Time out: bluetooth state still %s, expecting %s", bt_state, state
)
return False
event = {False: _BLUETOOTH_STATE_OFF_EVENT, True: _BLUETOOTH_STATE_ON_EVENT}[
state
]
event = ad.ed.pop_event(event, max_wait)
ad.log.info("Got event %s", event["name"])
return True
except Empty:
ad.log.error(
"Time out: bluetooth state still in %s, expecting %s", bt_state, state
)
return False
finally:
ad.droid.bluetoothStopListeningForAdapterStateChange()
def wait_for_droid_in_call(log, ad, max_time):
"""Wait for android to be in call state.
Args:
log: log object.
ad: android device.
max_time: maximal wait time.
Returns:
If phone become in call state within max_time, return True.
Return False if timeout.
"""
return _wait_for_droid_in_state(log, ad, max_time, is_phone_in_call)
def _wait_for_droid_in_state(log, ad, max_time, state_check_func, *args, **kwargs):
while max_time >= 0:
if state_check_func(log, ad, *args, **kwargs):
return True
time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK)
max_time -= WAIT_TIME_BETWEEN_STATE_CHECK
return False
# TODO: replace this with an event-based function
def _wait_for_wifi_in_state(log, ad, state, max_wait):
return _wait_for_droid_in_state(
log,
ad,
max_wait,
lambda log, ad, state: (True if ad.droid.wifiCheckState() == state else False),
state,
)