blob: 713d3cfad376985a629b3d434fe01572801145d0 [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import socket
import time
import paramiko
import re
from antlion.controllers.cellular_simulator import AbstractCellularSimulator
class UXMCellularSimulator(AbstractCellularSimulator):
"""A cellular simulator for UXM callbox."""
# Keys to obtain data from cell_info dictionary.
KEY_CELL_NUMBER = "cell_number"
KEY_CELL_TYPE = "cell_type"
# UXM socket port
UXM_PORT = 5125
# UXM SCPI COMMAND
SCPI_IMPORT_STATUS_QUERY_CMD = 'SYSTem:SCPI:IMPort:STATus?'
SCPI_SYSTEM_ERROR_CHECK_CMD = 'SYST:ERR?\n'
# require: path to SCPI file
SCPI_IMPORT_SCPI_FILE_CMD = 'SYSTem:SCPI:IMPort "{}"\n'
# require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
SCPI_CELL_ON_CMD = 'BSE:CONFig:{}:{}:ACTive 1'
# require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
SCPI_CELL_OFF_CMD = 'BSE:CONFig:{}:{}:ACTive 0'
# require: 1. cell type (E.g. NR5G), 2. cell number (E.g CELL1)
SCPI_GET_CELL_STATUS = 'BSE:STATus:{}:{}?'
SCPI_CHECK_CONNECTION_CMD = '*IDN?\n'
# UXM's Test Application recovery
TA_BOOT_TIME = 100
# shh command
SSH_START_GUI_APP_CMD_FORMAT = 'psexec -s -d -i 1 "{exe_path}"'
SSH_CHECK_APP_RUNNING_CMD_FORMAT = 'tasklist | findstr /R {regex_app_name}'
# start process success regex
PSEXEC_PROC_STARTED_REGEX_FORMAT = 'started on * with process ID {proc_id}'
def __init__(self, ip_address, custom_files, uxm_user,
ssh_private_key_to_uxm, ta_exe_path, ta_exe_name):
"""Initializes the cellular simulator.
Args:
ip_address: the ip address of host where Keysight Test Application (TA)
is installed.
custom_files: a list of file path for custom files.
uxm_user: username of host where Keysight TA resides.
ssh_private_key_to_uxm: private key for key based ssh to
host where Keysight TA resides.
ta_exe_path: path to TA exe.
ta_exe_name: name of TA exe.
"""
super().__init__()
self.custom_files = custom_files
self.rockbottom_script = None
self.cells = []
self.uxm_ip = ip_address
self.uxm_user = uxm_user
self.ssh_private_key_to_uxm = ssh_private_key_to_uxm
self.ta_exe_path = ta_exe_path
self.ta_exe_name = ta_exe_name
self.ssh_client = self._create_ssh_client()
# get roclbottom file
for file in self.custom_files:
if 'rockbottom_' in file:
self.rockbottom_script = file
# connect to Keysight Test Application via socket
self.recovery_ta()
self.socket = self._socket_connect(self.uxm_ip, self.UXM_PORT)
self.check_socket_connection()
self.timeout = 120
def _create_ssh_client(self):
"""Create a ssh client to host."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
mykey = paramiko.Ed25519Key.from_private_key_file(
self.ssh_private_key_to_uxm)
ssh.connect(hostname=self.uxm_ip, username=self.uxm_user, pkey=mykey)
self.log.info('SSH client to %s is connected' % self.uxm_ip)
return ssh
def is_ta_running(self):
is_running_cmd = self.SSH_CHECK_APP_RUNNING_CMD_FORMAT.format(
regex_app_name=self.ta_exe_name)
stdin, stdout, stderr = self.ssh_client.exec_command(is_running_cmd)
stdin.close()
err = ''.join(stderr.readlines())
out = ''.join(stdout.readlines())
final_output = str(out) + str(err)
self.log.info(final_output)
return (out != '' and err == '')
def _start_test_app(self):
"""Start Test Application on Windows."""
# start GUI exe via ssh
start_app_cmd = self.SSH_START_GUI_APP_CMD_FORMAT.format(
exe_path=self.ta_exe_path)
stdin, stdout, stderr = self.ssh_client.exec_command(start_app_cmd)
self.log.info(f'Command sent to {self.uxm_ip}: {start_app_cmd}')
stdin.close()
err = ''.join(stderr.readlines())
out = ''.join(stdout.readlines())
# psexec return process ID as part of the exit code
exit_status = stderr.channel.recv_exit_status()
is_started = re.search(
self.PSEXEC_PROC_STARTED_REGEX_FORMAT.format(proc_id=exit_status),
err[-1])
if is_started:
raise RuntimeError('Fail to start TA: ' + out + err)
# wait for ta completely boot up
self.log.info('TA is starting')
time.sleep(self.TA_BOOT_TIME)
def recovery_ta(self):
"""Start TA if it is not running."""
if not self.is_ta_running():
self._start_test_app()
# checking if ta booting process complete
# by checking socket connection
s = None
retries = 12
for _ in range(retries):
try:
s = self._socket_connect(self.uxm_ip, self.UXM_PORT)
s.close()
return
except ConnectionRefusedError as cre:
self.log.info(
'Connection refused, wait 10s for TA to boot')
time.sleep(10)
raise RuntimeError('TA does not start on time')
def set_rockbottom_script_path(self, path):
"""Set path to rockbottom script.
Args:
path: path to rockbottom script.
"""
self.rockbottom_script = path
def set_cell_info(self, cell_info):
"""Set type and number for multiple cells.
Args:
cell_info: list of dictionaries,
each dictionary contain cell type
and cell number for each cell
that the simulator need to control.
"""
if not cell_info:
raise ValueError('Missing cell info from configurations file')
self.cells = cell_info
def turn_cell_on(self, cell_type, cell_number):
"""Turn UXM's cell on.
Args:
cell_type: type of cell (e.g NR5G, LTE).
cell_number: ordinal number of a cell.
"""
if cell_type and cell_number:
self._socket_send_SCPI_command(
self.SCPI_CELL_ON_CMD.format(cell_type, cell_number))
else:
raise ValueError('Invalid cell info\n' +
f' cell type: {cell_type}\n' +
f' cell number: {cell_number}\n')
def turn_cell_off(self, cell_type, cell_number):
"""Turn UXM's cell off.
Args:
cell_type: type of cell (e.g NR5G, LTE).
cell_number: ordinal number of a cell.
"""
if cell_type and cell_number:
self._socket_send_SCPI_command(
self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number))
else:
raise ValueError('Invalid cell info\n' +
f' cell type: {cell_type}\n' +
f' cell number: {cell_number}\n')
def get_cell_status(self, cell_type, cell_number):
"""Get status of cell.
Args:
cell_type: type of cell (e.g NR5G, LTE).
cell_number: ordinal number of a cell.
"""
if not cell_type or not cell_number:
raise ValueError('Invalid cell with\n' +
f' cell type: {cell_type}\n' +
f' cell number: {cell_number}\n')
return self._socket_send_SCPI_for_result_command(
self.SCPI_GET_CELL_STATUS.format(cell_type, cell_number))
def check_socket_connection(self):
"""Check if the socket connection is established.
Query the identification of the Keysight Test Application
we are trying to connect to. Empty response indicates
connection fail, and vice versa.
"""
self.socket.sendall(self.SCPI_CHECK_CONNECTION_CMD.encode())
response = self.socket.recv(1024).decode()
if response:
self.log.info(f'Connected to: {response}')
else:
self.log.error('Fail to connect to callbox')
def _socket_connect(self, host, port):
"""Create socket connection.
Args:
host: IP address of desktop where Keysight Test Application resides.
port: port that Keysight Test Application is listening for socket
communication.
Return:
s: socket object.
"""
self.log.info('Establishing connection to callbox via socket')
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
return s
def _socket_send_SCPI_command(self, command):
"""Send SCPI command without expecting response.
Args:
command: a string SCPI command.
"""
# make sure there is a line break for the socket to send command
command = command + '\n'
# send command
self.socket.sendall(command.encode())
self.log.info(f'Sent {command}')
def _socket_receive_SCPI_result(self):
"""Receive response from socket. """
i = 1
response = ''
while i < self.timeout and not response:
response = self.socket.recv(1024).decode()
i += 1
return response
def _socket_send_SCPI_for_result_command(self, command):
"""Send SCPI command and expecting response.
Args:
command: a string SCPI command.
"""
self._socket_send_SCPI_command(command)
response = self._socket_receive_SCPI_result()
return response
def check_system_error(self):
"""Query system error from Keysight Test Application.
Return:
status: a message indicate the number of errors
and detail of errors if any.
a string `0,"No error"` indicates no error.
"""
status = self._socket_send_SCPI_for_result_command(
self.SCPI_SYSTEM_ERROR_CHECK_CMD)
self.log.info(f'System error status: {status}')
return status
def import_configuration(self, path):
"""Import SCPI config file.
Args:
path: path to SCPI file.
"""
self._socket_send_SCPI_command(
self.SCPI_IMPORT_SCPI_FILE_CMD.format(path))
time.sleep(45)
def destroy(self):
"""Close socket connection with UXM. """
self.socket.close()
def setup_lte_scenario(self, path):
"""Configures the equipment for an LTE simulation.
Args:
path: path to SCPI config file.
"""
self.import_configuration(path)
def dut_rockbottom(self, dut):
"""Set the dut to rockbottom state.
Args:
dut: a CellularAndroid controller.
"""
# The rockbottom script might include a device reboot, so it is
# necessary to stop SL4A during its execution.
dut.ad.stop_services()
self.log.info('Executing rockbottom script for ' + dut.ad.model)
os.chmod(self.rockbottom_script, 0o777)
os.system('{} {}'.format(self.rockbottom_script, dut.ad.serial))
# Make sure the DUT is in root mode after coming back
dut.ad.root_adb()
# Restart SL4A
dut.ad.start_services()
def wait_until_attached_one_cell(self,
cell_type,
cell_number,
dut,
wait_for_camp_interval,
attach_retries,
change_dut_setting_allow=True):
"""Wait until connect to given UXM cell.
After turn off airplane mode, sleep for
wait_for_camp_interval seconds for device to camp.
If not device is not connected after the wait,
either toggle airplane mode on/off or reboot device.
Args:
cell_type: type of cell
which we are trying to connect to.
cell_number: ordinal number of a cell
which we are trying to connect to.
dut: a CellularAndroid controller.
wait_for_camp_interval: sleep interval,
wait for device to camp.
attach_retries: number of retry
to wait for device
to connect to 1 basestation.
change_dut_setting_allow: turn on/off APM
or reboot device helps with device camp time.
However, if we are trying to connect to second cell
changing APM status or reboot is not allowed.
Raise:
AbstractCellularSimulator.CellularSimulatorError:
device unable to connect to cell.
"""
# airplane mode off
# dut.ad.adb.shell('settings put secure adaptive_connectivity_enabled 0')
dut.toggle_airplane_mode(False)
time.sleep(5)
# turn cell on
self.turn_cell_on(cell_type, cell_number)
time.sleep(5)
# waits for connect
for index in range(1, attach_retries):
# airplane mode on
time.sleep(wait_for_camp_interval)
cell_state = self.get_cell_status(cell_type, cell_number)
self.log.info(f'cell state: {cell_state}')
if cell_state == 'CONN\n':
return True
if cell_state == 'OFF\n':
self.turn_cell_on(cell_type, cell_number)
time.sleep(5)
if change_dut_setting_allow:
if (index % 4) == 0:
dut.ad.reboot()
if self.rockbottom_script:
self.dut_rockbottom(dut)
else:
self.log.warning(
f'Rockbottom script {self} was not executed after reboot'
)
else:
# airplane mode on
dut.toggle_airplane_mode(True)
time.sleep(5)
# airplane mode off
dut.toggle_airplane_mode(False)
# Phone cannot connected to basestation of callbox
raise RuntimeError(
f'Phone was unable to connect to cell: {cell_type}-{cell_number}')
def wait_until_attached(self, dut, timeout, attach_retries):
"""Waits until the DUT is attached to all required cells.
Args:
dut: a CellularAndroid controller.
timeout: sleep interval,
wait for device to camp in 1 try.
attach_retries: number of retry
to wait for device
to connect to 1 basestation.
"""
# get cell info
first_cell_type = self.cells[0][self.KEY_CELL_TYPE]
first_cell_number = self.cells[0][self.KEY_CELL_NUMBER]
if len(self.cells) == 2:
second_cell_type = self.cells[1][self.KEY_CELL_TYPE]
second_cell_number = self.cells[1][self.KEY_CELL_NUMBER]
# connect to 1st cell
try:
self.wait_until_attached_one_cell(first_cell_type,
first_cell_number, dut, timeout,
attach_retries)
except Exception as exc:
raise RuntimeError(f'Cannot connect to first cell') from exc
# connect to 2nd cell
if len(self.cells) == 2:
self.turn_cell_on(
second_cell_type,
second_cell_number,
)
self._socket_send_SCPI_command(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL None')
self._socket_send_SCPI_command(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:UL None')
self._socket_send_SCPI_command(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
self._socket_send_SCPI_command(
'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:DL CELL1')
time.sleep(1)
self._socket_send_SCPI_command(
"BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly")
try:
self.wait_until_attached_one_cell(second_cell_type,
second_cell_number, dut,
timeout, attach_retries,
False)
except Exception as exc:
raise RuntimeError(f'Cannot connect to second cell') from exc
def set_lte_rrc_state_change_timer(self, enabled, time=10):
"""Configures the LTE RRC state change timer.
Args:
enabled: a boolean indicating if the timer should be on or off.
time: time in seconds for the timer to expire.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_band(self, bts_index, band):
"""Sets the band for the indicated base station.
Args:
bts_index: the base station number.
band: the new band.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def get_duplex_mode(self, band):
"""Determines if the band uses FDD or TDD duplex mode
Args:
band: a band number.
Returns:
an variable of class DuplexMode indicating if band is FDD or TDD.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_input_power(self, bts_index, input_power):
"""Sets the input power for the indicated base station.
Args:
bts_index: the base station number.
input_power: the new input power.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_output_power(self, bts_index, output_power):
"""Sets the output power for the indicated base station.
Args:
bts_index: the base station number.
output_power: the new output power.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_tdd_config(self, bts_index, tdd_config):
"""Sets the tdd configuration number for the indicated base station.
Args:
bts_index: the base station number.
tdd_config: the new tdd configuration number.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_ssf_config(self, bts_index, ssf_config):
"""Sets the Special Sub-Frame config number for the indicated.
base station.
Args:
bts_index: the base station number.
ssf_config: the new ssf config number.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_bandwidth(self, bts_index, bandwidth):
"""Sets the bandwidth for the indicated base station.
Args:
bts_index: the base station number
bandwidth: the new bandwidth
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_downlink_channel_number(self, bts_index, channel_number):
"""Sets the downlink channel number for the indicated base station.
Args:
bts_index: the base station number.
channel_number: the new channel number.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_mimo_mode(self, bts_index, mimo_mode):
"""Sets the mimo mode for the indicated base station.
Args:
bts_index: the base station number
mimo_mode: the new mimo mode
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_transmission_mode(self, bts_index, tmode):
"""Sets the transmission mode for the indicated base station.
Args:
bts_index: the base station number.
tmode: the new transmission mode.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_scheduling_mode(self,
bts_index,
scheduling,
mcs_dl=None,
mcs_ul=None,
nrb_dl=None,
nrb_ul=None):
"""Sets the scheduling mode for the indicated base station.
Args:
bts_index: the base station number.
scheduling: the new scheduling mode.
mcs_dl: Downlink MCS.
mcs_ul: Uplink MCS.
nrb_dl: Number of RBs for downlink.
nrb_ul: Number of RBs for uplink.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_dl_256_qam_enabled(self, bts_index, enabled):
"""Determines what MCS table should be used for the downlink.
This only saves the setting that will be used when configuring MCS.
Args:
bts_index: the base station number.
enabled: whether 256 QAM should be used.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_ul_64_qam_enabled(self, bts_index, enabled):
"""Determines what MCS table should be used for the uplink.
This only saves the setting that will be used when configuring MCS.
Args:
bts_index: the base station number.
enabled: whether 64 QAM should be used.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_mac_padding(self, bts_index, mac_padding):
"""Enables or disables MAC padding in the indicated base station.
Args:
bts_index: the base station number.
mac_padding: the new MAC padding setting.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_cfi(self, bts_index, cfi):
"""Sets the Channel Format Indicator for the indicated base station.
Args:
bts_index: the base station number.
cfi: the new CFI setting.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_paging_cycle(self, bts_index, cycle_duration):
"""Sets the paging cycle duration for the indicated base station.
Args:
bts_index: the base station number.
cycle_duration: the new paging cycle duration in milliseconds.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def set_phich_resource(self, bts_index, phich):
"""Sets the PHICH Resource setting for the indicated base station.
Args:
bts_index: the base station number.
phich: the new PHICH resource setting.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def lte_attach_secondary_carriers(self, ue_capability_enquiry):
"""Activates the secondary carriers for CA.
Requires the DUT to be attached to the primary carrier first.
Args:
ue_capability_enquiry: UE capability enquiry message to be sent to
the UE before starting carrier aggregation.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def wait_until_communication_state(self, timeout=120):
"""Waits until the DUT is in Communication state.
Args:
timeout: after this amount of time the method will raise
a CellularSimulatorError exception. Default is 120 seconds.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def wait_until_idle_state(self, timeout=120):
"""Waits until the DUT is in Idle state.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
for cell in self.cells:
cell_type = cell[self.KEY_CELL_TYPE]
cell_number = cell[self.KEY_CELL_NUMBER]
self._socket_send_SCPI_command(
self.SCPI_CELL_OFF_CMD.format(cell_type, cell_number))
def stop(self):
"""Stops current simulation.
After calling this method, the simulator will need to be set up again.
"""
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def start_data_traffic(self):
"""Starts transmitting data from the instrument to the DUT. """
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')
def stop_data_traffic(self):
"""Stops transmitting data from the instrument to the DUT. """
raise NotImplementedError(
'This UXM callbox simulator does not support this feature.')