blob: 2ee35d3bb86766607ce57f9b3ca77bfd667aeef3 [file]
#!/usr/bin/env python3
#
# Copyright 2022 - Google
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import re
import statistics
from acts import signals
from acts_contrib.test_utils.tel.tel_defines import INVALID_SUB_ID
from acts_contrib.test_utils.tel.tel_subscription_utils import get_slot_index_from_data_sub_id
from acts_contrib.test_utils.tel.tel_subscription_utils import get_slot_index_from_voice_sub_id
from acts_contrib.test_utils.tel.tel_subscription_utils import get_subid_from_slot_index
SETUP_DATA_CALL = 'SETUP_DATA_CALL'
SETUP_DATA_CALL_REQUEST = '> SETUP_DATA_CALL'
SETUP_DATA_CALL_RESPONSE = '< SETUP_DATA_CALL'
IS_CAPTIVEPORTAL = r'isCaptivePortal: isSuccessful()=true'
DEACTIVATE_DATA_CALL = 'DEACTIVATE_DATA_CALL'
DEACTIVATE_DATA_CALL_REQUEST = '> DEACTIVATE_DATA_CALL'
DEACTIVATE_DATA_CALL_RESPONSE = '< DEACTIVATE_DATA_CALL'
UNSOL_DATA_CALL_LIST_CHANGED = 'UNSOL_DATA_CALL_LIST_CHANGED'
IWLAN_DATA_SERVICE = 'IWlanDataService'
IWLAN_SETUP_DATA_CALL_REQUEST = '> REQUEST_SETUP_DATA_CALL'
IWLAN_SETUP_DATA_CALL_RESPONSE = 'setupDataCallResponse'
IWLAN_SEND_ACK = '> send ACK for serial'
IWLAN_DEACTIVATE_DATA_CALL_REQUEST = '> REQUEST_DEACTIVATE_DATA_CALL'
IWLAN_DEACTIVATE_DATA_CALL_RESPONSE = 'deactivateDataCallResponse'
SET_PREFERRED_DATA_MODEM = 'SET_PREFERRED_DATA_MODEM'
WHI_IWLAN_DATA_SERVICE = 'IwlanDataService'
WHI_IWLAN_SETUP_DATA_CALL_REQUEST = r'IwlanDataService\[\d\]: Setup data call'
WHI_IWLAN_SETUP_DATA_CALL_RESPONSE = r'IwlanDataService\[\d\]: Tunnel opened!'
WHI_IWLAN_DEACTIVATE_DATA_CALL_REQUEST = r'IwlanDataService\[\d\]: Deactivate data call'
WHI_IWLAN_DEACTIVATE_DATA_CALL_RESPONSE = r'IwlanDataService\[\d\]: Tunnel closed!'
ON_ENABLE_APN_IMS_SLOT0 = 'DCT-C-0 : onEnableApn: apnType=ims, request type=NORMAL'
ON_ENABLE_APN_IMS_SLOT1 = 'DCT-C-1 : onEnableApn: apnType=ims, request type=NORMAL'
ON_ENABLE_APN_IMS_HANDOVER_SLOT0 = 'DCT-C-0 : onEnableApn: apnType=ims, request type=HANDOVER'
ON_ENABLE_APN_IMS_HANDOVER_SLOT1 = 'DCT-C-1 : onEnableApn: apnType=ims, request type=HANDOVER'
RADIO_ON_4G_SLOT0 = r'GsmCdmaPhone: \[0\] Event EVENT_RADIO_ON Received'
RADIO_ON_4G_SLOT1 = r'GsmCdmaPhone: \[1\] Event EVENT_RADIO_ON Received'
RADIO_ON_IWLAN = 'Switching to new default network.*WIFI CONNECTED'
WIFI_OFF = 'setWifiEnabled.*enable=false'
ON_IMS_MM_TEL_CONNECTED_4G_SLOT0 = r'ImsPhone: \[0\].*onImsMmTelConnected imsRadioTech=WWAN'
ON_IMS_MM_TEL_CONNECTED_4G_SLOT1 = r'ImsPhone: \[1\].*onImsMmTelConnected imsRadioTech=WWAN'
ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT0 = r'ImsPhone: \[0\].*onImsMmTelConnected imsRadioTech=WLAN'
ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT1 = r'ImsPhone: \[1\].*onImsMmTelConnected imsRadioTech=WLAN'
DEFAULT_MO_SMS_BODY = 'MO SMS body not yet found'
DEFAULT_MT_SMS_BODY = 'MT SMS body not yet found'
MMS_SERVICE = 'MmsService:'
MMS_SEND_REQUEST_ID_PATTERN = r'SendRequest@(\S+)'
MMS_DOWNLOAD_REQUEST_ID_PATTERN = r'DownloadRequest@(\S+)'
MMS_START_NEW_NW_REQUEST = 'start new network request'
MMS_200_OK = '200 OK'
SMS_SEND_TEXT_MESSAGE = 'smsSendTextMessage'
MO_SMS_LOGCAT_PATTERN = r'smsSendTextMessage.*"(\S+)", true|false'
SEND_SMS = 'SEND_SMS'
SEND_SMS_REQUEST = '> SEND_SMS'
SEND_SMS_RESPONSE = '< SEND_SMS'
SEND_SMS_EXPECT_MORE = 'SEND_SMS_EXPECT_MORE'
UNSOL_RESPONSE_NEW_SMS = '< UNSOL_RESPONSE_NEW_SMS'
SMS_RECEIVED = 'SmsReceived'
MT_SMS_CONTENT_PATTERN = 'sl4a.*?SmsReceived.*?"Text":"(.*?)"'
SEND_SMS_OVER_IMS = r'ImsSmsDispatcher \[(\d)\]'
SEND_SMS_REQUEST_OVER_IMS = 'sendSms: mRetryCount'
SEND_SMS_RESPONSE_OVER_IMS = 'onSendSmsResult token'
SMS_RECEIVED_OVER_IMS = 'SMS received'
SMS_RECEIVED_OVER_IMS_SLOT0 = r'ImsSmsDispatcher \[0\]: SMS received'
SMS_RECEIVED_OVER_IMS_SLOT1 = r'ImsSmsDispatcher \[1\]: SMS received'
IMS_REGISTERED_CST_SLOT0 = 'IMS_REGISTERED.*CrossStackEpdg.*SLID:0'
IMS_REGISTERED_CST_SLOT1 = 'IMS_REGISTERED.*CrossStackEpdg.*SLID:1'
ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT0 = r'ImsPhone: \[0\].*onImsMmTelConnected imsRadioTech=WLAN'
ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT1 = r'ImsPhone: \[1\].*onImsMmTelConnected imsRadioTech=WLAN'
def print_nested_dict(ad, d):
divider = "------"
for k, v in d.items():
if isinstance(v, dict):
ad.log.info('%s %s %s', divider, k, divider)
print_nested_dict(ad, v)
else:
ad.log.info('%s: %s', k, v)
def get_slot_from_logcat(msg):
"""Get slot index from specific pattern in logcat
Args:
msg: logcat message string
Returns:
0 for pSIM or 1 for eSIM
"""
res = re.findall(r'\[(PHONE[\d])\]', msg)
try:
phone = res[0]
except:
phone = None
return phone
def get_apn_from_logcat(msg):
"""Get APN from logcat
Args:
msg: logcat message string
Returns:
APN
"""
res = re.findall(r'DataProfile=[^/]+/[^/]+/[^/]+/([^/]+)/', msg)
try:
apn = res[0]
except:
apn = None
return apn
def parse_setup_data_call(ad, apn='internet', dds_switch=False):
"""Search in logcat for lines containing data call setup procedure.
Calculate the data call setup time with given APN and validation
time on LTE.
Args:
ad: Android object
apn: access point name
dds_switch: True for switching DDS. Otherwise False.
Returns:
setup_data_call: Dictionary containing data call setup request and
response messages for each data call. The format is shown as
below:
{
message_id:
{
'request':
{
'message': logcat message body of data call setup
request message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'apn': access point name of this request
'phone': 0 for pSIM or 1 for eSIM
}
'response':
{
'message': logcat message body of data call setup
response message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'cause': failure cause if data call setup failed
'cid': PDP context ID
'ifname': the name of the interface of the network
'phone': 0 for pSIM or 1 for eSIM
'unsol_data_call_list_changed': message of
unsol_data_call_list_changed
'unsol_data_call_list_changed_time': time stamp of
the message unsol_data_call_list_changed
'is_captive_portal': message of LTE validation pass
'data_call_setup_time': time between data call setup
request and unsol_data_call_list_changed
'validation_time_on_lte': time between data call
setup response and LTE validation pass
}
}
}
data_call_setup_time_list: List. This is a summary of necessary
messages of data call setup procedure The format is shown as
below:
[
{
'request': logcat message body of data call setup
request message
'response': logcat message body of data call setup
response message
'unsol_data_call_list_changed': message of
unsol_data_call_list_changed
'start': time stamp of data call setup request
'end': time stamp of the message
unsol_data_call_list_changed
'duration': time between data call setup request and
unsol_data_call_list_changed
'validation_time_on_lte': time between data call
setup response and LTE validation pass
}
]
avg_data_call_setup_time: average of data call setup time
avg_validation_time_on_lte: average of time for validation time on
LTE
"""
ad.log.info('====== Start to search logcat ====== ')
logcat = ad.search_logcat(
r'%s\|%s\|%s\|%s' % (
SET_PREFERRED_DATA_MODEM,
SETUP_DATA_CALL,
UNSOL_DATA_CALL_LIST_CHANGED, IS_CAPTIVEPORTAL))
if not logcat:
return False
for msg in logcat:
ad.log.info(msg["log_message"])
dds_slot = get_slot_index_from_data_sub_id(ad)
set_preferred_data_modem = {}
setup_data_call = {}
data_call_setup_time_list = []
last_message_id = None
for line in logcat:
if line['message_id']:
if SET_PREFERRED_DATA_MODEM in line['log_message']:
set_preferred_data_modem['message'] = line['log_message']
set_preferred_data_modem['time_stamp'] = line['time_stamp']
set_preferred_data_modem[
'datetime_obj'] = line['datetime_obj']
if SETUP_DATA_CALL_REQUEST in line['log_message']:
found_apn = get_apn_from_logcat(line['log_message'])
if found_apn != apn:
continue
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if not dds_switch:
if str(dds_slot) not in phone:
continue
msg_id = line['message_id']
last_message_id = line['message_id']
if msg_id not in setup_data_call:
setup_data_call[msg_id] = {}
setup_data_call[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'apn': found_apn,
'phone': phone}
if set_preferred_data_modem:
setup_data_call[msg_id]['request'][
'set_preferred_data_modem_message'] = set_preferred_data_modem['message']
setup_data_call[msg_id]['request'][
'set_preferred_data_modem_time_stamp'] = set_preferred_data_modem['time_stamp']
setup_data_call[msg_id]['request'][
'set_preferred_data_modem_datetime_obj'] = set_preferred_data_modem['datetime_obj']
set_preferred_data_modem = {}
if SETUP_DATA_CALL_RESPONSE in line['log_message']:
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if not dds_switch:
if str(dds_slot) not in phone:
continue
msg_id = line['message_id']
if msg_id not in setup_data_call:
continue
if 'request' not in setup_data_call[msg_id]:
continue
last_message_id = line['message_id']
setup_data_call[msg_id]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'cause': '0',
'cid': None,
'ifname': None,
'phone': phone,
'unsol_data_call_list_changed': None,
'unsol_data_call_list_changed_time': None,
'is_captive_portal': None,
'data_call_setup_time': None,
'validation_time_on_lte': None}
res = re.findall(r'cause=(\d+)', line['log_message'])
try:
cause = res[0]
setup_data_call[msg_id]['response']['cause'] = cause
except:
pass
res = re.findall(r'cid=(\d+)', line['log_message'])
try:
cid = res[0]
setup_data_call[msg_id]['response']['cid'] = cid
except:
pass
res = re.findall(r'ifname=(\S+)', line['log_message'])
try:
ifname = res[0]
setup_data_call[msg_id]['response']['ifname'] = ifname
except:
pass
if UNSOL_DATA_CALL_LIST_CHANGED in line['log_message']:
if not last_message_id:
continue
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if not dds_switch:
if str(dds_slot) not in phone:
continue
if 'request' not in setup_data_call[last_message_id]:
continue
if 'response' not in setup_data_call[last_message_id]:
continue
cid = setup_data_call[last_message_id]['response']['cid']
if 'cid = %s' % cid not in line['log_message']:
continue
if setup_data_call[last_message_id]['response']['cause'] != '0':
continue
if dds_switch:
if 'set_preferred_data_modem_message' not in setup_data_call[
last_message_id]['request']:
continue
data_call_start_time = setup_data_call[last_message_id][
'request']['set_preferred_data_modem_datetime_obj']
else:
data_call_start_time = setup_data_call[last_message_id][
'request']['datetime_obj']
data_call_end_time = line['datetime_obj']
setup_data_call[last_message_id]['response'][
'unsol_data_call_list_changed_time'] = data_call_end_time
setup_data_call[last_message_id]['response'][
'unsol_data_call_list_changed'] = line['log_message']
data_call_setup_time = data_call_end_time - data_call_start_time
setup_data_call[last_message_id]['response'][
'data_call_setup_time'] = data_call_setup_time.total_seconds()
if apn == 'ims':
data_call_setup_time_list.append(
{'request': setup_data_call[
last_message_id]['request']['message'],
'response': setup_data_call[
last_message_id]['response']['message'],
'unsol_data_call_list_changed': setup_data_call[
last_message_id]['response'][
'unsol_data_call_list_changed'],
'start': data_call_start_time,
'end': data_call_end_time,
'duration': setup_data_call[last_message_id]['response'][
'data_call_setup_time']})
last_message_id = None
if IS_CAPTIVEPORTAL in line['log_message']:
if not last_message_id:
continue
if 'request' not in setup_data_call[last_message_id]:
continue
if 'response' not in setup_data_call[last_message_id]:
continue
if dds_switch:
data_call_start_time = setup_data_call[last_message_id][
'request']['set_preferred_data_modem_datetime_obj']
else:
data_call_start_time = setup_data_call[last_message_id][
'request']['datetime_obj']
setup_data_call[last_message_id]['response'][
'is_captive_portal'] = line['log_message']
validation_start_time_on_lte = setup_data_call[
last_message_id]['response']['datetime_obj']
validation_end_time_on_lte = line['datetime_obj']
validation_time_on_lte = (
validation_end_time_on_lte - validation_start_time_on_lte).total_seconds()
setup_data_call[last_message_id]['response'][
'validation_time_on_lte'] = validation_time_on_lte
data_call_setup_time_list.append(
{'request': setup_data_call[last_message_id]['request'][
'message'],
'response': setup_data_call[last_message_id]['response'][
'message'],
'unsol_data_call_list_changed': setup_data_call[
last_message_id]['response']['unsol_data_call_list_changed'],
'start': data_call_start_time,
'end': setup_data_call[last_message_id]['response'][
'unsol_data_call_list_changed_time'],
'duration': setup_data_call[last_message_id]['response'][
'data_call_setup_time'],
'validation_time_on_lte': validation_time_on_lte})
last_message_id = None
duration_list = []
for item in data_call_setup_time_list:
if 'duration' in item:
duration_list.append(item['duration'])
try:
avg_data_call_setup_time = statistics.mean(duration_list)
except:
avg_data_call_setup_time = None
validation_time_on_lte_list = []
for item in data_call_setup_time_list:
if 'validation_time_on_lte' in item:
validation_time_on_lte_list.append(
item['validation_time_on_lte'])
try:
avg_validation_time_on_lte = statistics.mean(
validation_time_on_lte_list)
except:
avg_validation_time_on_lte = None
return (
setup_data_call,
data_call_setup_time_list,
avg_data_call_setup_time,
avg_validation_time_on_lte)
def parse_setup_data_call_on_iwlan(ad):
"""Search in logcat for lines containing data call setup procedure.
Calculate the data call setup time with given APN on iwlan.
Args:
ad: Android object
apn: access point name
Returns:
setup_data_call: Dictionary containing data call setup request and
response messages for each data call. The format is shown as
below:
{
message_id:
{
'request':
{
'message': logcat message body of data call setup
request message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
}
'response':
{
'message': logcat message body of data call setup
response message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'cause': failure cause if data call setup failed
'data_call_setup_time': time between data call setup
request and response
}
}
}
data_call_setup_time_list:
List. This is a summary of mecessary messages of data call setup
procedure The format is shown as below:
[
{
'request': logcat message body of data call setup
request message
'response': logcat message body of data call setup
response message
'start': time stamp of data call setup request
'end': time stamp of data call setup response
'duration': time between data call setup request and
response
}
]
avg_data_call_setup_time: average of data call setup time
"""
ad.log.info('====== Start to search logcat ====== ')
logcat = ad.search_logcat(r'%s\|%s' % (
IWLAN_DATA_SERVICE, WHI_IWLAN_DATA_SERVICE))
found_iwlan_data_service = 1
if not logcat:
found_iwlan_data_service = 0
if not found_iwlan_data_service:
(
setup_data_call,
data_call_setup_time_list,
avg_data_call_setup_time,
_) = parse_setup_data_call(ad, apn='ims')
return (
setup_data_call,
data_call_setup_time_list,
avg_data_call_setup_time)
for msg in logcat:
ad.log.info(msg["log_message"])
setup_data_call = {}
data_call_setup_time_list = []
last_message_id = None
whi_msg_index = None
for line in logcat:
serial = None
cause = None
if IWLAN_SETUP_DATA_CALL_REQUEST in line['log_message']:
match_res = re.findall(
r'%s:\s(\d+)' % IWLAN_DATA_SERVICE, line['log_message'])
if match_res:
try:
serial = match_res[0]
except:
pass
if not serial:
continue
msg_id = serial
last_message_id = serial
if msg_id not in setup_data_call:
setup_data_call[msg_id] = {}
setup_data_call[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
else:
if re.search(
WHI_IWLAN_SETUP_DATA_CALL_REQUEST, line['log_message']):
if whi_msg_index is None:
whi_msg_index = 0
else:
whi_msg_index = whi_msg_index + 1
if str(whi_msg_index) not in setup_data_call:
setup_data_call[str(whi_msg_index)] = {}
setup_data_call[str(whi_msg_index)]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if IWLAN_SETUP_DATA_CALL_RESPONSE in line['log_message']:
match_res = re.findall(r'Serial = (\d+)', line['log_message'])
if match_res:
try:
serial = match_res[0]
except:
pass
if serial:
msg_id = serial
else:
msg_id = last_message_id
if msg_id not in setup_data_call:
continue
if 'request' not in setup_data_call[msg_id]:
continue
setup_data_call[msg_id]['response'] = {
'message': None,
'time_stamp': None,
'datetime_obj': None,
'cause': None,
'data_call_setup_time': None}
match_res = re.findall(
r'Fail Cause = (\d+)', line['log_message'])
if match_res:
try:
cause = match_res[0]
except:
cause = None
if cause != '0':
continue
setup_data_call[msg_id]['response']['message'] = line[
'log_message']
setup_data_call[msg_id]['response']['time_stamp'] = line[
'time_stamp']
setup_data_call[msg_id]['response']['datetime_obj'] = line[
'datetime_obj']
setup_data_call[msg_id]['response']['cause'] = 0
data_call_start_time = setup_data_call[last_message_id][
'request']['datetime_obj']
data_call_end_time = line['datetime_obj']
data_call_setup_time = data_call_end_time - data_call_start_time
setup_data_call[last_message_id]['response'][
'data_call_setup_time'] = data_call_setup_time.total_seconds()
data_call_setup_time_list.append(
{'request': setup_data_call[last_message_id]['request'][
'message'],
'response': setup_data_call[last_message_id]['response'][
'message'],
'start': setup_data_call[last_message_id]['request'][
'datetime_obj'],
'end': setup_data_call[last_message_id]['response'][
'datetime_obj'],
'duration': setup_data_call[last_message_id]['response'][
'data_call_setup_time']})
last_message_id = None
else:
if re.search(
WHI_IWLAN_SETUP_DATA_CALL_RESPONSE, line['log_message']):
if whi_msg_index is None:
continue
if 'response' in setup_data_call[str(whi_msg_index)]:
ad.log.error('Duplicated setup data call response is '
'found or the request message is lost.')
continue
setup_data_call[str(whi_msg_index)]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'data_call_setup_time': None}
data_call_start_time = setup_data_call[str(whi_msg_index)][
'request']['datetime_obj']
data_call_end_time = line['datetime_obj']
data_call_setup_time = data_call_end_time - data_call_start_time
setup_data_call[str(whi_msg_index)]['response'][
'data_call_setup_time'] = data_call_setup_time.total_seconds()
data_call_setup_time_list.append(
{'request': setup_data_call[str(whi_msg_index)][
'request']['message'],
'response': setup_data_call[str(whi_msg_index)][
'response']['message'],
'start': setup_data_call[str(whi_msg_index)]['request'][
'datetime_obj'],
'end': setup_data_call[str(whi_msg_index)]['response'][
'datetime_obj'],
'duration': setup_data_call[str(whi_msg_index)][
'response']['data_call_setup_time']})
duration_list = []
for item in data_call_setup_time_list:
if 'duration' in item:
duration_list.append(item['duration'])
try:
avg_data_call_setup_time = statistics.mean(duration_list)
except:
avg_data_call_setup_time = None
ad.log.warning('setup_data_call: %s', setup_data_call)
ad.log.warning('duration list: %s', duration_list)
ad.log.warning('avg_data_call_setup_time: %s', avg_data_call_setup_time)
return (
setup_data_call,
data_call_setup_time_list,
avg_data_call_setup_time)
def parse_deactivate_data_call(ad):
"""Search in logcat for lines containing data call deactivation procedure.
Calculate the data call deactivation time on LTE.
Args:
ad: Android object
Returns:
deactivate_data_call: Dictionary containing data call deactivation
request and response messages for each data call. The format is
shown as below:
{
message_id:
{
'request':
{
'message': logcat message body of data call
deactivation request message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'cid': PDP context ID
'phone': 0 for pSIM or 1 for eSIM
}
'response':
{
'message': logcat message body of data call
deactivation response message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'phone': 0 for pSIM or 1 for eSIM
'unsol_data_call_list_changed': message of
unsol_data_call_list_changed
'deactivate_data_call_time': time between data call
deactivation request and unsol_data_call_list_changed
}
}
}
deactivate_data_call_time_list: List. This is a summary of necessary
messages of data call deactivation procedure The format is shown
as below:
[
{
'request': logcat message body of data call
deactivation request message
'response': logcat message body of data call
deactivation response message
'unsol_data_call_list_changed': message of
unsol_data_call_list_changed
'start': time stamp of data call deactivation request
'end': time stamp of the message
unsol_data_call_list_changed
'duration': time between data call deactivation
request and unsol_data_call_list_changed
}
]
avg_deactivate_data_call_time: average of data call deactivation time
"""
ad.log.info('====== Start to search logcat ====== ')
logcat = ad.search_logcat(
r'%s\|%s' % (DEACTIVATE_DATA_CALL, UNSOL_DATA_CALL_LIST_CHANGED))
if not logcat:
return False
for msg in logcat:
ad.log.info(msg["log_message"])
dds_slot = get_slot_index_from_data_sub_id(ad)
deactivate_data_call = {}
deactivate_data_call_time_list = []
last_message_id = None
for line in logcat:
if line['message_id']:
if DEACTIVATE_DATA_CALL_REQUEST in line['log_message']:
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if str(dds_slot) not in phone:
continue
msg_id = line['message_id']
last_message_id = line['message_id']
if msg_id not in deactivate_data_call:
deactivate_data_call[msg_id] = {}
deactivate_data_call[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'cid': None,
'phone': dds_slot}
res = re.findall(r'cid = (\d+)', line['log_message'])
try:
cid = res[0]
deactivate_data_call[msg_id]['request']['cid'] = cid
except:
pass
if DEACTIVATE_DATA_CALL_RESPONSE in line['log_message']:
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if str(dds_slot) not in phone:
continue
msg_id = line['message_id']
if msg_id not in deactivate_data_call:
continue
if 'request' not in deactivate_data_call[msg_id]:
continue
last_message_id = line['message_id']
deactivate_data_call[msg_id]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'phone': dds_slot,
'unsol_data_call_list_changed': None,
'deactivate_data_call_time': None}
if UNSOL_DATA_CALL_LIST_CHANGED in line['log_message']:
if not last_message_id:
continue
phone = get_slot_from_logcat(line['log_message'])
if not phone:
continue
if str(dds_slot) not in phone:
continue
if 'request' not in deactivate_data_call[last_message_id]:
continue
if 'response' not in deactivate_data_call[last_message_id]:
continue
cid = deactivate_data_call[last_message_id]['request']['cid']
if 'cid = %s' % cid not in line['log_message']:
continue
deactivate_data_call_start_time = deactivate_data_call[
last_message_id]['request']['datetime_obj']
deactivate_data_call_end_time = line['datetime_obj']
deactivate_data_call[last_message_id]['response'][
'unsol_data_call_list_changed'] = line['log_message']
deactivate_data_call_time = (
deactivate_data_call_end_time - deactivate_data_call_start_time)
deactivate_data_call[last_message_id]['response'][
'deactivate_data_call_time'] = deactivate_data_call_time.total_seconds()
deactivate_data_call_time_list.append(
{'request': deactivate_data_call[last_message_id][
'request']['message'],
'response': deactivate_data_call[last_message_id][
'response']['message'],
'unsol_data_call_list_changed': deactivate_data_call[
last_message_id]['response'][
'unsol_data_call_list_changed'],
'start': deactivate_data_call_start_time,
'end': deactivate_data_call_end_time,
'duration': deactivate_data_call_time.total_seconds()})
last_message_id = None
duration_list = []
for item in deactivate_data_call_time_list:
if 'duration' in item:
duration_list.append(item['duration'])
try:
avg_deactivate_data_call_time = statistics.mean(duration_list)
except:
avg_deactivate_data_call_time = None
return (
deactivate_data_call,
deactivate_data_call_time_list,
avg_deactivate_data_call_time)
def parse_deactivate_data_call_on_iwlan(ad):
"""Search in logcat for lines containing data call deactivation procedure.
Calculate the data call deactivation time on iwlan.
Args:
ad: Android object
Returns:
deactivate_data_call: Dictionary containing data call deactivation
request and response messages for each data call. The format is
shown as below:
{
message_id:
{
'request':
{
'message': logcat message body of data call
deactivation request message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
}
'response':
{
'message': logcat message body of data call
deactivation response message
'time_stamp': time stamp in text format
'datetime_obj': datetime object of time stamp
'send_ack_for_serial_time': time stamp of ACK
'deactivate_data_call_time': time between data call
deactivation request and ACK
}
}
}
deactivate_data_call_time_list: List. This is a summary of necessary
messages of data call deactivation procedure The format is shown
as below:
[
{
'request': logcat message body of data call
deactivation request message
'response': logcat message body of data call
deactivation response message
'start': time stamp of data call deactivation request
'end': time stamp of the ACK
'duration': time between data call deactivation
request and ACK
}
]
avg_deactivate_data_call_time: average of data call deactivation time
"""
ad.log.info('====== Start to search logcat ====== ')
logcat = ad.search_logcat(r'%s\|%s' % (
IWLAN_DATA_SERVICE, WHI_IWLAN_DATA_SERVICE))
found_iwlan_data_service = 1
if not logcat:
found_iwlan_data_service = 0
if not found_iwlan_data_service:
(
deactivate_data_call,
deactivate_data_call_time_list,
avg_deactivate_data_call_time) = parse_deactivate_data_call(ad)
return (
deactivate_data_call,
deactivate_data_call_time_list,
avg_deactivate_data_call_time)
for msg in logcat:
ad.log.info(msg["log_message"])
deactivate_data_call = {}
deactivate_data_call_time_list = []
last_message_id = None
whi_msg_index = None
for line in logcat:
serial = None
if IWLAN_DEACTIVATE_DATA_CALL_REQUEST in line['log_message']:
match_res = re.findall(
r'%s:\s(\d+)' % IWLAN_DATA_SERVICE, line['log_message'])
if match_res:
try:
serial = match_res[0]
except:
serial = None
if not serial:
continue
msg_id = serial
last_message_id = serial
if msg_id not in deactivate_data_call:
deactivate_data_call[msg_id] = {}
deactivate_data_call[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
else:
if re.search(WHI_IWLAN_DEACTIVATE_DATA_CALL_REQUEST, line[
'log_message']):
if whi_msg_index is None:
whi_msg_index = 0
else:
whi_msg_index = whi_msg_index + 1
if str(whi_msg_index) not in deactivate_data_call:
deactivate_data_call[str(whi_msg_index)] = {}
deactivate_data_call[str(whi_msg_index)]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if IWLAN_DEACTIVATE_DATA_CALL_RESPONSE in line['log_message']:
if 'response' not in deactivate_data_call[last_message_id]:
deactivate_data_call[msg_id]['response'] = {}
deactivate_data_call[msg_id]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'send_ack_for_serial_time': None,
'deactivate_data_call_time': None}
else:
if re.search(WHI_IWLAN_DEACTIVATE_DATA_CALL_RESPONSE, line[
'log_message']):
if whi_msg_index is None:
continue
if 'response' in deactivate_data_call[str(whi_msg_index)]:
ad.log.error('Duplicated deactivate data call response'
'is found or the request message is lost.')
continue
deactivate_data_call[str(whi_msg_index)]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'deactivate_data_call_time': None}
deactivate_data_call_start_time = deactivate_data_call[
str(whi_msg_index)]['request']['datetime_obj']
deactivate_data_call_end_time = line['datetime_obj']
deactivate_data_call_time = (
deactivate_data_call_end_time - deactivate_data_call_start_time)
deactivate_data_call[str(whi_msg_index)]['response'][
'deactivate_data_call_time'] = deactivate_data_call_time.total_seconds()
deactivate_data_call_time_list.append(
{'request': deactivate_data_call[str(whi_msg_index)][
'request']['message'],
'response': deactivate_data_call[str(whi_msg_index)][
'response']['message'],
'start': deactivate_data_call_start_time,
'end': deactivate_data_call_end_time,
'duration': deactivate_data_call_time.total_seconds()})
if IWLAN_SEND_ACK in line['log_message']:
match_res = re.findall(
r'%s:\s(\d+)' % IWLAN_DATA_SERVICE, line['log_message'])
if match_res:
try:
serial = match_res[0]
except:
serial = None
if not serial:
continue
msg_id = serial
if msg_id not in deactivate_data_call:
continue
if 'response' not in deactivate_data_call[msg_id]:
continue
deactivate_data_call[msg_id]['response'][
'send_ack_for_serial_time'] = line['datetime_obj']
deactivate_data_call_start_time = deactivate_data_call[msg_id][
'request']['datetime_obj']
deactivate_data_call_end_time = line['datetime_obj']
deactivate_data_call_time = (
deactivate_data_call_end_time - deactivate_data_call_start_time)
deactivate_data_call[msg_id]['response'][
'deactivate_data_call_time'] = deactivate_data_call_time.total_seconds()
deactivate_data_call_time_list.append(
{'request': deactivate_data_call[msg_id]['request'][
'message'],
'response': deactivate_data_call[msg_id]['response'][
'message'],
'start': deactivate_data_call_start_time,
'end': deactivate_data_call_end_time,
'duration': deactivate_data_call_time.total_seconds()})
last_message_id = None
duration_list = []
for item in deactivate_data_call_time_list:
if 'duration' in item:
duration_list.append(item['duration'])
try:
avg_deactivate_data_call_time = statistics.mean(duration_list)
except:
avg_deactivate_data_call_time = None
return (
deactivate_data_call,
deactivate_data_call_time_list,
avg_deactivate_data_call_time)
def parse_ims_reg(
ad,
search_intervals=None,
rat='4g',
reboot_or_apm='reboot',
slot=None):
"""Search in logcat for lines containing messages about IMS registration.
Args:
ad: Android object
search_intervals: List. Only lines with time stamp in given time
intervals will be parsed.
E.g., [(begin_time1, end_time1), (begin_time2, end_time2)]
Both begin_time and end_time should be datetime object.
rat: "4g" for IMS over LTE or "iwlan" for IMS over Wi-Fi
reboot_or_apm: specify the scenario "reboot" or "apm"
slot: 0 for pSIM and 1 for eSIM
Returns:
(ims_reg, parsing_fail, avg_ims_reg_duration)
ims_reg: List of dictionaries containing found lines for start and
end time stamps. Each dict represents a cycle of the test.
[
{'start': message on start time stamp,
'end': message on end time stamp,
'duration': time difference between start and end}
]
parsing_fail: List of dictionaries containing the cycle number and
missing messages of each failed cycle
[
'attempt': failed cycle number
'missing_msg' missing messages which should be found
]
avg_ims_reg_duration: average of the duration in ims_reg
"""
if slot is None:
slot = get_slot_index_from_voice_sub_id(ad)
ad.log.info('Default voice slot: %s', slot)
else:
if get_subid_from_slot_index(ad.log, ad, slot) == INVALID_SUB_ID:
ad.log.error('Slot %s is invalid.', slot)
raise signals.TestFailure('Failed',
extras={'fail_reason': 'Slot %s is invalid.' % slot})
ad.log.info('Assigned slot: %s', slot)
start_command = {
'reboot': {
'0': {'4g': ON_ENABLE_APN_IMS_SLOT0,
'iwlan': ON_ENABLE_APN_IMS_HANDOVER_SLOT0 + '\|' + ON_ENABLE_APN_IMS_SLOT0},
'1': {'4g': ON_ENABLE_APN_IMS_SLOT1,
'iwlan': ON_ENABLE_APN_IMS_HANDOVER_SLOT1 + '\|' + ON_ENABLE_APN_IMS_SLOT1}
},
'apm':{
'0': {'4g': RADIO_ON_4G_SLOT0, 'iwlan': RADIO_ON_IWLAN},
'1': {'4g': RADIO_ON_4G_SLOT1, 'iwlan': RADIO_ON_IWLAN}
},
'wifi_off':{
'0': {'4g': WIFI_OFF, 'iwlan': WIFI_OFF},
'1': {'4g': WIFI_OFF, 'iwlan': WIFI_OFF}
},
}
end_command = {
'0': {'4g': ON_IMS_MM_TEL_CONNECTED_4G_SLOT0,
'iwlan': ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT0},
'1': {'4g': ON_IMS_MM_TEL_CONNECTED_4G_SLOT1,
'iwlan': ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT1}
}
ad.log.info('====== Start to search logcat ======')
logcat = ad.search_logcat('%s\|%s' % (
start_command[reboot_or_apm][str(slot)][rat],
end_command[str(slot)][rat]))
if not logcat:
raise signals.TestFailure('Failed',
extras={'fail_reason': 'No line matching the given pattern can '
'be found in logcat.'})
for msg in logcat:
ad.log.info(msg["log_message"])
ims_reg = []
ims_reg_duration_list = []
parsing_fail = []
start_command['reboot'] = {
'0': {'4g': ON_ENABLE_APN_IMS_SLOT0,
'iwlan': ON_ENABLE_APN_IMS_HANDOVER_SLOT0 + '|' + ON_ENABLE_APN_IMS_SLOT0},
'1': {'4g': ON_ENABLE_APN_IMS_SLOT1,
'iwlan': ON_ENABLE_APN_IMS_HANDOVER_SLOT1 + '|' + ON_ENABLE_APN_IMS_SLOT1}
}
keyword_dict = {
'start': start_command[reboot_or_apm][str(slot)][rat],
'end': end_command[str(slot)][rat]
}
for attempt, interval in enumerate(search_intervals):
if isinstance(interval, list):
try:
begin_time, end_time = interval
except Exception as e:
ad.log.error(e)
continue
ad.log.info('Parsing begin time: %s', begin_time)
ad.log.info('Parsing end time: %s', end_time)
temp_keyword_dict = copy.deepcopy(keyword_dict)
for line in logcat:
if begin_time and line['datetime_obj'] < begin_time:
continue
if end_time and line['datetime_obj'] > end_time:
break
for key in temp_keyword_dict:
if temp_keyword_dict[key] and not isinstance(
temp_keyword_dict[key], dict):
res = re.findall(
temp_keyword_dict[key], line['log_message'])
if res:
ad.log.info('Found: %s', line['log_message'])
temp_keyword_dict[key] = {
'message': line['log_message'],
'time_stamp': line['datetime_obj']}
break
for key in temp_keyword_dict:
if temp_keyword_dict[key] == keyword_dict[key]:
ad.log.error(
'"%s" is missing in cycle %s.',
keyword_dict[key],
attempt)
parsing_fail.append({
'attempt': attempt,
'missing_msg': keyword_dict[key]})
try:
ims_reg_duration = (
temp_keyword_dict['end'][
'time_stamp'] - temp_keyword_dict[
'start'][
'time_stamp']).total_seconds()
ims_reg_duration_list.append(ims_reg_duration)
ims_reg.append({
'start': temp_keyword_dict['start'][
'message'],
'end': temp_keyword_dict['end'][
'message'],
'duration': ims_reg_duration})
except Exception as e:
ad.log.error(e)
try:
avg_ims_reg_duration = statistics.mean(ims_reg_duration_list)
except:
avg_ims_reg_duration = None
return ims_reg, parsing_fail, avg_ims_reg_duration
def parse_mo_sms(logcat):
"""Search in logcat for lines containing messages about SMS sending on
LTE.
Args:
logcat: List containing lines of logcat
Returns:
send_sms: Dictionary containing found lines for each SMS
request and response messages together with their time stamps.
{
'message_id':{
'request':{
'message': logcat message body of SMS request
'time_stamp': time stamp in text format
'datetime_obj': datetime object of the time stamp
},
'response':{
'message': logcat message body of SMS response
'time_stamp': time stamp in text format
'datetime_obj': datetime object of the time stamp
'sms_delivery_time': time between SMS request and
response
}
}
}
summary: the format is listed below:
{
'request': logcat message body of SMS request
'response': logcat message body of SMS response
'unsol_response_new_sms': unsolicited response message upon
SMS receiving on MT UE
'sms_body': message body of SMS
'mo_start': time stamp of MO SMS request message
'mo_end': time stamp of MO SMS response message
'mo_signal_duration': time between MO SMS request and response
'delivery_time': time between MO SMS request and
unsol_response_new_sms on MT UE
}
avg_setup_time: average of mo_signal_duration
"""
send_sms = {}
summary = []
sms_body = DEFAULT_MO_SMS_BODY
msg_id = None
if not logcat:
return False
for line in logcat:
res = re.findall(MO_SMS_LOGCAT_PATTERN, line['log_message'])
if res:
try:
sms_body = res[0]
except:
sms_body = 'Cannot find MO SMS body'
if line['message_id']:
msg_id = line['message_id']
if SEND_SMS_REQUEST in line[
'log_message'] and SEND_SMS_EXPECT_MORE not in line[
'log_message']:
if msg_id not in send_sms:
send_sms[msg_id] = {}
send_sms[msg_id]['sms_body'] = sms_body
sms_body = DEFAULT_MO_SMS_BODY
send_sms[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if SEND_SMS_RESPONSE in line[
'log_message'] and SEND_SMS_EXPECT_MORE not in line[
'log_message']:
if msg_id not in send_sms:
continue
if 'request' not in send_sms[msg_id]:
continue
if "error" in line['log_message']:
continue
send_sms[msg_id]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'sms_delivery_time': None}
mo_sms_start_time = send_sms[msg_id]['request'][
'datetime_obj']
mo_sms_end_time = line['datetime_obj']
sms_delivery_time = mo_sms_end_time - mo_sms_start_time
send_sms[msg_id]['response'][
'sms_delivery_time'] = sms_delivery_time.total_seconds()
summary.append(
{'request': send_sms[msg_id]['request']['message'],
'response': send_sms[msg_id]['response']['message'],
'unsol_response_new_sms': None,
'sms_body': send_sms[msg_id]['sms_body'],
'mo_start': mo_sms_start_time,
'mo_end': mo_sms_end_time,
'mo_signal_duration': sms_delivery_time.total_seconds(),
'delivery_time': None})
duration_list = []
for item in summary:
if 'mo_signal_duration' in item:
duration_list.append(item['mo_signal_duration'])
try:
avg_setup_time = statistics.mean(duration_list)
except:
avg_setup_time = None
return send_sms, summary, avg_setup_time
def parse_mo_sms_iwlan(logcat):
"""Search in logcat for lines containing messages about SMS sending on
iwlan.
Args:
logcat: List containing lines of logcat
Returns:
send_sms: Dictionary containing found lines for each SMS
request and response messages together with their time stamps.
{
'message_id':{
'request':{
'message': logcat message body of SMS request
'time_stamp': time stamp in text format
'datetime_obj': datetime object of the time stamp
},
'response':{
'message': logcat message body of SMS response
'time_stamp': time stamp in text format
'datetime_obj': datetime object of the time stamp
'sms_delivery_time': time between SMS request and
response
}
}
}
summary: List containing dictionaries for each SMS. The format is
listed below:
[
{
'request': logcat message body of SMS request
'response': logcat message body of SMS response
'sms_body': message body of SMS
'mo_start': time stamp of MO SMS request message
'mo_end': time stamp of MO SMS response message
'mo_signal_duration': time between MO SMS request and
response
'delivery_time': time between MO SMS request and
MT SMS received message
}
]
avg_setup_time: average of mo_signal_duration
"""
send_sms = {}
summary = []
sms_body = DEFAULT_MO_SMS_BODY
msg_id = None
if not logcat:
return False
for line in logcat:
res = re.findall(MO_SMS_LOGCAT_PATTERN, line['log_message'])
if res:
try:
sms_body = res[0]
except:
sms_body = 'Cannot find MO SMS body'
if SEND_SMS_REQUEST_OVER_IMS in line['log_message']:
if msg_id is None:
msg_id = '0'
else:
msg_id = str(int(msg_id) + 1)
if msg_id not in send_sms:
send_sms[msg_id] = {}
send_sms[msg_id]['sms_body'] = sms_body
sms_body = DEFAULT_MO_SMS_BODY
send_sms[msg_id]['request'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if SEND_SMS_RESPONSE_OVER_IMS in line['log_message']:
if msg_id not in send_sms:
continue
if 'request' not in send_sms[msg_id]:
continue
if "error" in line['log_message']:
continue
send_sms[msg_id]['response'] = {
'message': line['log_message'],
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'sms_delivery_time': None}
mo_sms_start_time = send_sms[msg_id]['request'][
'datetime_obj']
mo_sms_end_time = line['datetime_obj']
sms_delivery_time = mo_sms_end_time - mo_sms_start_time
send_sms[msg_id]['response'][
'sms_delivery_time'] = sms_delivery_time.total_seconds()
summary.append(
{'request': send_sms[msg_id]['request']['message'],
'response': send_sms[msg_id]['response']['message'],
'unsol_response_new_sms': None,
'sms_body': send_sms[msg_id]['sms_body'],
'mo_start': mo_sms_start_time,
'mo_end': mo_sms_end_time,
'mo_signal_duration': sms_delivery_time.total_seconds(),
'delivery_time': None})
duration_list = []
for item in summary:
if 'mo_signal_duration' in item:
duration_list.append(item['mo_signal_duration'])
try:
avg_setup_time = statistics.mean(duration_list)
except:
avg_setup_time = None
return send_sms, summary, avg_setup_time
def parse_mt_sms(logcat):
"""Search in logcat for lines containing messages about SMS receiving on
LTE.
Args:
logcat: List containing lines of logcat
Returns:
received_sms_list: List containing dictionaries for each received
SMS. The format is listed below:
[
{
'message': logcat message body of unsolicited response
message
'sms_body': message body of SMS
'time_stamp': time stamp of unsolicited response message in
text format
'datetime_obj': datetime object of the time stamp
'sms_delivery_time': time between SMS request and
response
}
]
"""
received_sms_list = []
if not logcat:
return False
for line in logcat:
if UNSOL_RESPONSE_NEW_SMS in line['log_message']:
# if received_sms_list:
# if received_sms_list[-1]['sms_body'] is None:
# del received_sms_list[-1]
received_sms_list.append(
{'message': line['log_message'],
'sms_body': DEFAULT_MT_SMS_BODY,
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']})
else:
res = re.findall(MT_SMS_CONTENT_PATTERN, line['log_message'])
if res:
try:
sms_body = res[0]
except:
sms_body = 'Cannot find MT SMS body'
if received_sms_list[-1]['sms_body'] == DEFAULT_MT_SMS_BODY:
received_sms_list[-1]['sms_body'] = sms_body
continue
return received_sms_list
def parse_mt_sms_iwlan(logcat):
"""Search in logcat for lines containing messages about SMS receiving on
iwlan.
Args:
logcat: List containing lines of logcat
Returns:
received_sms_list: List containing dictionaries for each received
SMS. The format is listed below:
[
{
'message': logcat message body of SMS received message
'sms_body': message body of SMS
'time_stamp': time stamp of SMS received message in
text format
'datetime_obj': datetime object of the time stamp
}
]
"""
received_sms_list = []
if not logcat:
return False
for line in logcat:
if re.findall(
SMS_RECEIVED_OVER_IMS_SLOT0 + '|' + SMS_RECEIVED_OVER_IMS_SLOT1,
line['log_message']):
received_sms_list.append(
{'message': line['log_message'],
'sms_body': DEFAULT_MT_SMS_BODY,
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']})
else:
res = re.findall(MT_SMS_CONTENT_PATTERN, line['log_message'])
if res:
try:
sms_body = res[0]
except:
sms_body = 'Cannot find MT SMS body'
if received_sms_list[-1]['sms_body'] == DEFAULT_MT_SMS_BODY:
received_sms_list[-1]['sms_body'] = sms_body
continue
return received_sms_list
def parse_sms_delivery_time(log, ad_mo, ad_mt, rat='4g'):
"""Calculate the SMS delivery time (time between MO SMS request and MT
unsolicited response message or MT SMS received message) from logcat
of both MO and MT UE.
Args:
ad_mo: MO Android object
ad_mt: MT Android object
rat: '4g' for LTE and 'iwlan' for iwlan
Returns:
None
"""
ad_mo.log.info('====== Start to search logcat ====== ')
mo_logcat = ad_mo.search_logcat(
r'%s\|%s\|%s\|%s' % (
SMS_SEND_TEXT_MESSAGE,
SEND_SMS,
SEND_SMS_REQUEST_OVER_IMS,
SEND_SMS_RESPONSE_OVER_IMS))
ad_mt.log.info('====== Start to search logcat ====== ')
mt_logcat = ad_mt.search_logcat(
r'%s\|%s\|%s' % (
UNSOL_RESPONSE_NEW_SMS, SMS_RECEIVED, SMS_RECEIVED_OVER_IMS))
for msg in mo_logcat:
ad_mo.log.info(msg["log_message"])
for msg in mt_logcat:
ad_mt.log.info(msg["log_message"])
if rat == 'iwlan':
_, mo_sms_summary, avg = parse_mo_sms_iwlan(mo_logcat)
received_sms_list = parse_mt_sms_iwlan(mt_logcat)
else:
_, mo_sms_summary, avg = parse_mo_sms(mo_logcat)
received_sms_list = parse_mt_sms(mt_logcat)
sms_delivery_time = []
for mo_sms in mo_sms_summary:
for mt_sms in received_sms_list:
if mo_sms['sms_body'] == mt_sms['sms_body']:
mo_sms['delivery_time'] = (
mt_sms['datetime_obj'] - mo_sms['mo_start']).total_seconds()
mo_sms['unsol_response_new_sms'] = mt_sms['message']
sms_delivery_time.append(mo_sms['delivery_time'])
try:
avg_sms_delivery_time = statistics.mean(sms_delivery_time)
except:
avg_sms_delivery_time = None
ad_mo.log.info('====== MO SMS summary ======')
for item in mo_sms_summary:
ad_mo.log.info('------------------')
print_nested_dict(ad_mo, item)
ad_mt.log.info('====== Received SMS list ======')
for item in received_sms_list:
ad_mt.log.info('------------------')
print_nested_dict(ad_mt, item)
ad_mo.log.info('%s SMS were actually sent.', len(mo_sms_summary))
ad_mt.log.info('%s SMS were actually received.', len(received_sms_list))
ad_mo.log.info('Average MO SMS setup time: %.2f sec.', avg)
log.info(
'Average SMS delivery time: %.2f sec.', avg_sms_delivery_time)
def parse_mms(ad_mo, ad_mt):
"""Search in logcat for lines containing messages about SMS sending and
receiving. Calculate MO & MT MMS setup time.
Args:
ad_mo: MO Android object
ad_mt: MT Android object
Returns:
send_mms: Dictionary containing each sent MMS. The format is shown
as below:
{
mms_msg_id:
{
MMS_START_NEW_NW_REQUEST:
{
'time_stamp': time stamp of MMS request on MO UE in
text format
'datetime_obj': datetime object of time stamp
},
MMS_200_OK:
{
'time_stamp': time stamp of '200 OK' for MMS request
in text format
'datetime_obj': datetime object of time stamp
'setup_time': MO MMS setup time. Time between MMS
request and 200 OK
}
}
}
mo_avg_setup_time: average of MO MMS setup time
receive_mms: Dictionary containing each received MMS. The format is
shown as below:
{
mms_msg_id:
{
MMS_START_NEW_NW_REQUEST:
{
'time_stamp': time stamp of MMS request on MT UE in
text format
'datetime_obj': datetime object of time stamp
},
MMS_200_OK:
{
'time_stamp': time stamp of '200 OK' for MMS request
in text format
'datetime_obj': datetime object of time stamp
'setup_time': MT MMS setup time. Time between MMS
request and 200 OK
}
}
}
mt_avg_setup_time: average of MT MMS setup time
"""
send_mms = {}
receive_mms = {}
mo_setup_time_list = []
mt_setup_time_list = []
ad_mo.log.info('====== Start to search logcat ====== ')
mo_logcat = ad_mo.search_logcat(MMS_SERVICE)
for msg in mo_logcat:
ad_mo.log.info(msg["log_message"])
ad_mt.log.info('====== Start to search logcat ====== ')
mt_logcat = ad_mt.search_logcat(MMS_SERVICE)
for msg in mt_logcat:
ad_mt.log.info(msg["log_message"])
if not mo_logcat or not mt_logcat:
return False
for line in mo_logcat:
find_res = re.findall(
MMS_SEND_REQUEST_ID_PATTERN, line['log_message'])
message_id = None
try:
message_id = find_res[0]
except:
pass
if message_id:
mms_msg_id = message_id
if mms_msg_id not in send_mms:
send_mms[mms_msg_id] = {}
if MMS_START_NEW_NW_REQUEST in line['log_message']:
send_mms[mms_msg_id][MMS_START_NEW_NW_REQUEST] = {
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if MMS_200_OK in line['log_message']:
send_mms[mms_msg_id][MMS_200_OK] = {
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'setup_time': None}
if MMS_START_NEW_NW_REQUEST in send_mms[mms_msg_id]:
setup_time = line['datetime_obj'] - send_mms[mms_msg_id][
MMS_START_NEW_NW_REQUEST]['datetime_obj']
send_mms[mms_msg_id][MMS_200_OK][
'setup_time'] = setup_time.total_seconds()
mo_setup_time_list.append(setup_time.total_seconds())
for line in mt_logcat:
find_res = re.findall(
MMS_DOWNLOAD_REQUEST_ID_PATTERN, line['log_message'])
message_id = None
try:
message_id = find_res[0]
except:
pass
if message_id:
mms_msg_id = message_id
if mms_msg_id not in receive_mms:
receive_mms[mms_msg_id] = {}
if MMS_START_NEW_NW_REQUEST in line['log_message']:
receive_mms[mms_msg_id][MMS_START_NEW_NW_REQUEST] = {
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj']}
if MMS_200_OK in line['log_message']:
receive_mms[mms_msg_id][MMS_200_OK] = {
'time_stamp': line['time_stamp'],
'datetime_obj': line['datetime_obj'],
'setup_time': None}
if MMS_START_NEW_NW_REQUEST in receive_mms[mms_msg_id]:
setup_time = line['datetime_obj'] - receive_mms[
mms_msg_id][MMS_START_NEW_NW_REQUEST]['datetime_obj']
receive_mms[mms_msg_id][MMS_200_OK][
'setup_time'] = setup_time.total_seconds()
mt_setup_time_list.append(setup_time.total_seconds())
try:
mo_avg_setup_time = statistics.mean(mo_setup_time_list)
except:
mo_avg_setup_time = None
try:
mt_avg_setup_time = statistics.mean(mt_setup_time_list)
except:
mt_avg_setup_time = None
return send_mms, mo_avg_setup_time, receive_mms, mt_avg_setup_time
def parse_cst_reg(ad, slot, search_intervals=None):
""" Check if IMS CST and WFC is registered at given slot by parsing logcat.
Args:
ad: Android object
slot: 0 for pSIM and 1 for eSIM
search_intervals: List. Only lines with time stamp in given time
intervals will be parsed.
E.g., [(begin_time1, end_time1), (begin_time2, end_time2)]
Both begin_time and end_time should be datetime object.
Returns: List of attampt number and error messages of not found pattern
of failing cycles
"""
cst = {
'ims_registered': {
'0': IMS_REGISTERED_CST_SLOT0,
'1': IMS_REGISTERED_CST_SLOT1
},
'iwlan': {
'0': ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT0,
'1': ON_IMS_MM_TEL_CONNECTED_IWLAN_SLOT1
}
}
ad.log.info('====== Start to search logcat ====== ')
logcat = ad.search_logcat(
'%s\|%s' % (
cst['ims_registered'][str(slot)], cst['iwlan'][str(slot)]))
for line in logcat:
msg = line["log_message"]
ad.log.info(msg)
parsing_fail = []
keyword_dict = {
'ims_registered': cst['ims_registered'][str(slot)],
'iwlan': cst['iwlan'][str(slot)]
}
for attempt, interval in enumerate(search_intervals):
if isinstance(interval, list):
try:
begin_time, end_time = interval
except Exception as e:
ad.log.error(e)
continue
ad.log.info('Parsing begin time: %s', begin_time)
ad.log.info('Parsing end time: %s', end_time)
temp_keyword_dict = copy.deepcopy(keyword_dict)
for line in logcat:
if begin_time and line['datetime_obj'] < begin_time:
continue
if end_time and line['datetime_obj'] > end_time:
break
for key in temp_keyword_dict:
if temp_keyword_dict[key] and not isinstance(
temp_keyword_dict[key], dict):
res = re.findall(
temp_keyword_dict[key], line['log_message'])
if res:
ad.log.info('Found: %s', line['log_message'])
temp_keyword_dict[key] = {
'message': line['log_message'],
'time_stamp': line['datetime_obj']}
break
for key in temp_keyword_dict:
if temp_keyword_dict[key] == keyword_dict[key]:
ad.log.error(
'"%s" is missing in cycle %s.',
keyword_dict[key],
attempt+1)
parsing_fail.append({
'attempt': attempt+1,
'missing_msg': keyword_dict[key]})
return parsing_fail
def check_ims_cst_reg(ad, slot, search_interval=None):
""" Check if IMS CST is registered at given slot by parsing logcat.
Args:
ad: Android object
slot: 0 for pSIM and 1 for eSIM
search_intervals: List. Only lines with time stamp in given time
intervals will be parsed.
E.g., [(begin_time1, end_time1), (begin_time2, end_time2)]
Both begin_time and end_time should be datetime object.
Returns: True for successful registration. Otherwise False
"""
ims_cst_reg = {
'0': IMS_REGISTERED_CST_SLOT0,
'1': IMS_REGISTERED_CST_SLOT1
}
logcat = ad.search_logcat('%s' % ims_cst_reg[str(slot)])
if isinstance(search_interval, list):
try:
begin_time, end_time = search_interval
except Exception as e:
ad.log.error(e)
for line in logcat:
if begin_time and line['datetime_obj'] < begin_time:
continue
if end_time and line['datetime_obj'] > end_time:
break
res = re.findall(ims_cst_reg[str(slot)], line['log_message'])
if res:
ad.log.info(
'IMS CST is registered due to following message '
'found: %s', line['log_message'])
return True
return False