| #!/usr/bin/env python3 |
| # |
| # Copyright 2021 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import time |
| import sys |
| |
| from enum import Enum |
| from os import path |
| from acts.controllers import abstract_inst |
| |
| DEFAULT_XLAPI_PATH = '/home/mobileharness/Rohde-Schwarz/XLAPI/latest/venv/lib/python3.7/site-packages' |
| DEFAULT_LTE_STATE_CHANGE_TIMER = 10 |
| DEFAULT_CELL_SWITCH_ON_TIMER = 60 |
| DEFAULT_ENDC_TIMER = 300 |
| |
| logger = logging.getLogger('Xlapi_cmx500') |
| |
| LTE_CELL_PROPERTIES = [ |
| 'band', |
| 'bandwidth', |
| 'dl_earfcn', |
| 'ul_earfcn', |
| 'total_dl_power', |
| 'p_b', |
| 'dl_epre', |
| 'ref_signal_power', |
| 'm', |
| 'beamforming_antenna_ports', |
| 'p0_nominal_pusch', |
| ] |
| |
| LTE_MHZ_UPPER_BOUND_TO_RB = [ |
| (1.5, 6), |
| (4.0, 15), |
| (7.5, 25), |
| (12.5, 50), |
| (17.5, 75), |
| ] |
| |
| |
| class DciFormat(Enum): |
| """Support DCI Formats for MIMOs.""" |
| DCI_FORMAT_0 = 1 |
| DCI_FORMAT_1 = 2 |
| DCI_FORMAT_1A = 3 |
| DCI_FORMAT_1B = 4 |
| DCI_FORMAT_1C = 5 |
| DCI_FORMAT_2 = 6 |
| DCI_FORMAT_2A = 7 |
| DCI_FORMAT_2B = 8 |
| DCI_FORMAT_2C = 9 |
| DCI_FORMAT_2D = 10 |
| |
| |
| class DuplexMode(Enum): |
| """Duplex Modes.""" |
| FDD = 'FDD' |
| TDD = 'TDD' |
| DL_ONLY = 'DL_ONLY' |
| |
| |
| class LteBandwidth(Enum): |
| """Supported LTE bandwidths.""" |
| BANDWIDTH_1MHz = 6 # MHZ_1 is RB_6 |
| BANDWIDTH_3MHz = 15 # MHZ_3 is RB_15 |
| BANDWIDTH_5MHz = 25 # MHZ_5 is RB_25 |
| BANDWIDTH_10MHz = 50 # MHZ_10 is RB_50 |
| BANDWIDTH_15MHz = 75 # MHZ_15 is RB_75 |
| BANDWIDTH_20MHz = 100 # MHZ_20 is RB_100 |
| |
| |
| class LteState(Enum): |
| """LTE ON and OFF.""" |
| LTE_ON = 'ON' |
| LTE_OFF = 'OFF' |
| |
| |
| class MimoModes(Enum): |
| """MIMO Modes dl antennas.""" |
| MIMO1x1 = 1 |
| MIMO2x2 = 2 |
| MIMO4x4 = 4 |
| |
| |
| class ModulationType(Enum): |
| """Supported Modulation Types.""" |
| Q16 = 0 |
| Q64 = 1 |
| Q256 = 2 |
| |
| |
| class NasState(Enum): |
| """NAS state between callbox and dut.""" |
| DEREGISTERED = 'OFF' |
| EMM_REGISTERED = 'EMM' |
| MM5G_REGISTERED = 'NR' |
| |
| |
| class RrcState(Enum): |
| """States to enable/disable rrc.""" |
| RRC_ON = 'ON' |
| RRC_OFF = 'OFF' |
| |
| |
| class RrcConnectionState(Enum): |
| """RRC Connection states, describes possible DUT RRC connection states.""" |
| IDLE = 1 |
| IDLE_PAGING = 2 |
| IDLE_CONNECTION_ESTABLISHMENT = 3 |
| CONNECTED = 4 |
| CONNECTED_CONNECTION_REESTABLISHMENT = 5 |
| CONNECTED_SCG_FAILURE = 6 |
| CONNECTED_HANDOVER = 7 |
| CONNECTED_CONNECTION_RELEASE = 8 |
| |
| |
| class SchedulingMode(Enum): |
| """Supported scheduling modes.""" |
| USERDEFINEDCH = 'UDCHannels' |
| |
| |
| class TransmissionModes(Enum): |
| """Supported transmission modes.""" |
| TM1 = 1 |
| TM2 = 2 |
| TM3 = 3 |
| TM4 = 4 |
| TM7 = 7 |
| TM8 = 8 |
| TM9 = 9 |
| |
| |
| # For mimo 1x1, also set_num_crs_antenna_ports to 1 |
| MIMO_MAX_LAYER_MAPPING = { |
| MimoModes.MIMO1x1: 2, |
| MimoModes.MIMO2x2: 2, |
| MimoModes.MIMO4x4: 4, |
| } |
| |
| |
| class Cmx500(abstract_inst.SocketInstrument): |
| def __init__(self, ip_addr, port, xlapi_path=DEFAULT_XLAPI_PATH): |
| """Init method to setup variables for the controller. |
| |
| Args: |
| ip_addr: Controller's ip address. |
| port: Port. |
| """ |
| |
| # keeps the socket connection for debug purpose for now |
| super().__init__(ip_addr, port) |
| if not xlapi_path in sys.path: |
| sys.path.insert(0, xlapi_path) |
| self._initial_xlapi() |
| self._settings.system.set_instrument_address(ip_addr) |
| logger.info('The instrument address is {}'.format( |
| self._settings.system.get_instrument_address())) |
| |
| self.bts = [] |
| |
| # Stops all active cells if there is any |
| self.disconnect() |
| |
| self.dut = self._network.get_dut() |
| self.lte_cell = self._network.create_lte_cell('ltecell0') |
| self.nr_cell = self._network.create_nr_cell('nrcell0') |
| self._config_antenna_ports() |
| |
| self.lte_rrc_state_change_timer = DEFAULT_LTE_STATE_CHANGE_TIMER |
| self.rrc_state_change_time_enable = False |
| self.cell_switch_on_timer = DEFAULT_CELL_SWITCH_ON_TIMER |
| |
| # _config_antenna_ports for the special RF connection with cmw500 + cmx500. |
| def _config_antenna_ports(self): |
| from rs_mrt.testenvironment.signaling.sri.rat.common import CsiRsAntennaPorts |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import CrsAntennaPorts |
| |
| max_csi_rs_ports = CsiRsAntennaPorts.NUMBER_CSI_RS_ANTENNA_PORTS_FOUR |
| max_crs_ports = CrsAntennaPorts.NUMBER_CRS_ANTENNA_PORTS_FOUR |
| |
| lte_cell_max_config = self.lte_cell.stub.GetMaximumConfiguration() |
| lte_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports |
| lte_cell_max_config.crs_antenna_ports = max_crs_ports |
| self.lte_cell.stub.SetMaximumConfiguration(lte_cell_max_config) |
| |
| nr_cell_max_config = self.nr_cell.stub.GetMaximumConfiguration() |
| nr_cell_max_config.csi_rs_antenna_ports = max_csi_rs_ports |
| self.nr_cell.stub.SetMaximumConfiguration(nr_cell_max_config) |
| |
| def _initial_xlapi(self): |
| import xlapi |
| import mrtype |
| from xlapi import network |
| from xlapi import settings |
| |
| self._xlapi = xlapi |
| self._network = network |
| self._settings = settings |
| |
| def configure_mimo_settings(self, mimo, bts_index=0): |
| """Sets the mimo scenario for the test. |
| |
| Args: |
| mimo: mimo scenario to set. |
| """ |
| self.bts[bts_index].set_mimo_mode(mimo) |
| |
| @property |
| def connection_type(self): |
| """Gets the connection type applied in callbox.""" |
| state = self.dut.state.rrc_connection_state |
| return RrcConnectionState(state.value) |
| |
| def create_base_station(self, cell): |
| """Creates the base station object with cell and current object. |
| |
| Args: |
| cell: the XLAPI cell. |
| |
| Returns: |
| base station object. |
| Raise: |
| CmxError if the cell is neither LTE nor NR. |
| """ |
| from xlapi.lte_cell import LteCell |
| from xlapi.nr_cell import NrCell |
| if isinstance(cell, LteCell): |
| return LteBaseStation(self, cell) |
| elif isinstance(cell, NrCell): |
| return NrBaseStation(self, cell) |
| else: |
| raise CmxError('The cell type is neither LTE nor NR') |
| |
| def detach(self): |
| """Detach callbox and controller.""" |
| for bts in self.bts: |
| bts.stop() |
| |
| def disable_packet_switching(self): |
| """Disable packet switching in call box.""" |
| raise NotImplementedError() |
| |
| def disconnect(self): |
| """Disconnect controller from device and switch to local mode.""" |
| |
| self.bts.clear() |
| self._network.reset() |
| |
| def enable_packet_switching(self): |
| """Enable packet switching in call box.""" |
| raise NotImplementedError() |
| |
| def get_cell_configs(self, cell): |
| """Getes cell settings. |
| |
| This method is for debugging purpose. In XLAPI, there are many get |
| methods in the cell or component carrier object. When this method is |
| called, the corresponding value could be recorded with logging, which is |
| useful for debug. |
| |
| Args: |
| cell: the lte or nr cell in xlapi |
| """ |
| cell_getters = [attr for attr in dir(cell) if attr.startswith('get')] |
| for attr in cell_getters: |
| try: |
| getter = getattr(cell, attr) |
| logger.info('The {} is {}'.format(attr, getter())) |
| except Exception as e: |
| logger.warning('Error in get {}: {}'.format(attr, e)) |
| |
| def get_base_station(self, bts_index=0): |
| """Gets the base station object based on bts num. By default |
| bts_index set to 0 (PCC). |
| |
| Args: |
| bts_num: base station identifier |
| |
| Returns: |
| base station object. |
| """ |
| return self.bts[bts_index] |
| |
| def get_network(self): |
| """ Gets the network object from cmx500 object.""" |
| return self._network |
| |
| def init_lte_measurement(self): |
| """Gets the class object for lte measurement which can be used to |
| initiate measurements. |
| |
| Returns: |
| lte measurement object. |
| """ |
| raise NotImplementedError() |
| |
| def reset(self): |
| """System level reset.""" |
| |
| self.disconnect() |
| |
| @property |
| def rrc_connection(self): |
| """Gets the RRC connection state.""" |
| return self.dut.state.rrc.is_connected |
| |
| def set_timer(self, timeout): |
| """Sets timer for the Cmx500 class.""" |
| self.rrc_state_change_time_enable = True |
| self.lte_rrc_state_change_timer = timeout |
| |
| def switch_lte_signalling(self, state): |
| """ Turns LTE signalling ON/OFF. |
| |
| Args: |
| state: an instance of LteState indicating the state to which LTE |
| signal has to be set. |
| """ |
| if not isinstance(state, LteState): |
| raise ValueError('state should be the instance of LteState.') |
| |
| if self.bts: |
| self.disconnect() |
| self.bts.append(LteBaseStation(self, self.lte_cell)) |
| # Switch on the primary Lte cell for on state and switch all lte cells |
| # if the state is off state |
| if state.value == 'ON': |
| self.bts[0].start() |
| cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer) |
| if cell_status: |
| logger.info('The LTE pcell status is on') |
| else: |
| raise CmxError('The LTE pcell cannot be switched on') |
| else: |
| for bts in self.bts: |
| if isinstance(bts, LteBaseStation): |
| bts.stop() |
| logger.info('The LTE cell status is {} after stop'.format( |
| bts.is_on())) |
| |
| def switch_on_nsa_signalling(self): |
| |
| from mrtype.counters import N310 |
| |
| if self.bts: |
| self.disconnect() |
| logger.info('Switches on NSA signalling') |
| |
| # Sets n310 timer to N310.N20 to make endc more stable |
| logger.info('set nr cell n310 timer to N310.N20') |
| self.nr_cell.set_n310(N310.N20) |
| self.bts.append(LteBaseStation(self, self.lte_cell)) |
| self.bts.append(NrBaseStation(self, self.nr_cell)) |
| |
| self.bts[0].start() |
| lte_cell_status = self.bts[0].wait_cell_on(self.cell_switch_on_timer) |
| if lte_cell_status: |
| logger.info('The LTE pcell status is on') |
| else: |
| raise CmxError('The LTE pcell cannot be switched on') |
| |
| self.bts[1].start() |
| nr_cell_status = self.bts[1].wait_cell_on(self.cell_switch_on_timer) |
| if nr_cell_status: |
| logger.info('The NR cell status is on') |
| else: |
| raise CmxError('The NR cell cannot be switched on') |
| time.sleep(5) |
| |
| def update_lte_cell_config(self, config): |
| """Updates lte cell settings with config.""" |
| set_counts = 0 |
| for property in LTE_CELL_PROPERTIES: |
| if property in config: |
| setter_name = 'set_' + property |
| setter = getattr(self.lte_cell, setter_name) |
| setter(config[property]) |
| set_counts += 1 |
| if set_counts < len(config): |
| logger.warning('Not all configs were set in update_cell_config') |
| |
| @property |
| def use_carrier_specific(self): |
| """Gets current status of carrier specific duplex configuration.""" |
| raise NotImplementedError() |
| |
| @use_carrier_specific.setter |
| def use_carrier_specific(self, state): |
| """Sets the carrier specific duplex configuration. |
| |
| Args: |
| state: ON/OFF UCS configuration. |
| """ |
| raise NotImplementedError() |
| |
| def wait_for_rrc_state(self, state, timeout=120): |
| """ Waits until a certain RRC state is set. |
| |
| Args: |
| state: the RRC state that is being waited for. |
| timeout: timeout for phone to be in connected state. |
| |
| Raises: |
| CmxError on time out. |
| """ |
| is_idle = (state.value == 'OFF') |
| for idx in range(timeout): |
| time.sleep(1) |
| if self.dut.state.rrc.is_idle == is_idle: |
| logger.info('{} reached at {} s'.format(state.value, idx)) |
| return True |
| error_message = 'Waiting for {} state timeout after {}'.format( |
| state.value, timeout) |
| logger.error(error_message) |
| raise CmxError(error_message) |
| |
| def wait_until_attached(self, timeout=120): |
| """Waits until Lte attached. |
| |
| Args: |
| timeout: timeout for phone to get attached. |
| |
| Raises: |
| CmxError on time out. |
| """ |
| try: |
| self.dut.signaling.wait_for_lte_attach(self.lte_cell, timeout) |
| except: |
| raise CmxError( |
| 'wait_until_attached timeout after {}'.format(timeout)) |
| |
| def send_sms(self, message): |
| """ Sends an SMS message to the DUT. |
| |
| Args: |
| message: the SMS message to send. |
| """ |
| self.dut.signaling.mt_sms(message) |
| |
| |
| class BaseStation(object): |
| """Class to interact with different the base stations.""" |
| def __init__(self, cmx, cell): |
| """Init method to setup variables for base station. |
| |
| Args: |
| cmx: Controller (Cmx500) object. |
| cell: The cell for the base station. |
| """ |
| |
| self._cell = cell |
| self._cmx = cmx |
| self._cc = cmx.dut.cc(cell) |
| self._network = cmx.get_network() |
| |
| @property |
| def band(self): |
| """Gets the current band of cell. |
| |
| Return: |
| the band number in int. |
| """ |
| cell_band = self._cell.get_band() |
| return int(cell_band) |
| |
| @property |
| def dl_power(self): |
| """Gets RSPRE level. |
| |
| Return: |
| the power level in dbm. |
| """ |
| return self._cell.get_total_dl_power().in_dBm() |
| |
| @property |
| def duplex_mode(self): |
| """Gets current duplex of cell.""" |
| band = self._cell.get_band() |
| if band.is_fdd(): |
| return DuplexMode.FDD |
| if band.is_tdd(): |
| return DuplexMode.TDD |
| if band.is_dl_only(): |
| return DuplexMode.DL_ONLY |
| |
| def get_cc(self): |
| """Gets component carrier of the cell.""" |
| return self._cc |
| |
| def is_on(self): |
| """Verifies if the cell is turned on. |
| |
| Return: |
| boolean (if the cell is on). |
| """ |
| return self._cell.is_on() |
| |
| def set_band(self, band): |
| """Sets the Band of cell. |
| |
| Args: |
| band: band of cell. |
| """ |
| self._cell.set_band(band) |
| logger.info('The band is set to {} and is {} after setting'.format( |
| band, self.band)) |
| |
| def set_dl_mac_padding(self, state): |
| """Enables/Disables downlink padding at the mac layer. |
| |
| Args: |
| state: a boolean |
| """ |
| self._cc.set_dl_mac_padding(state) |
| |
| def set_dl_power(self, pwlevel): |
| """Modifies RSPRE level. |
| |
| Args: |
| pwlevel: power level in dBm. |
| """ |
| self._cell.set_total_dl_power(pwlevel) |
| |
| def set_ul_power(self, ul_power): |
| """Sets ul power |
| |
| Args: |
| ul_power: the uplink power in dbm |
| """ |
| self._cc.set_target_ul_power(ul_power) |
| |
| def start(self): |
| """Starts the cell.""" |
| self._cell.start() |
| |
| def stop(self): |
| """Stops the cell.""" |
| self._cell.stop() |
| |
| def wait_cell_on(self, timeout): |
| """Waits the cell on. |
| |
| Args: |
| timeout: the time for waiting the cell on. |
| |
| Raises: |
| CmxError on time out. |
| """ |
| waiting_time = 0 |
| while waiting_time < timeout: |
| if self._cell.is_on(): |
| return True |
| waiting_time += 1 |
| time.sleep(1) |
| return self._cell.is_on() |
| |
| |
| class LteBaseStation(BaseStation): |
| """ LTE base station.""" |
| def __init__(self, cmx, cell): |
| """Init method to setup variables for the LTE base station. |
| |
| Args: |
| cmx: Controller (Cmx500) object. |
| cell: The cell for the LTE base station. |
| """ |
| from xlapi.lte_cell import LteCell |
| if not isinstance(cell, LteCell): |
| raise CmxError( |
| 'The cell is not a LTE cell, LTE base station fails' |
| ' to create.') |
| super().__init__(cmx, cell) |
| |
| def _config_scheduler(self, |
| dl_mcs=None, |
| dl_rb_alloc=None, |
| dl_dci_ncce=None, |
| dl_dci_format=None, |
| dl_tm=None, |
| dl_num_layers=None, |
| dl_mcs_table=None, |
| ul_mcs=None, |
| ul_rb_alloc=None, |
| ul_dci_ncce=None): |
| |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import DciFormat |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import MaxLayersMIMO |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import McsTable |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import PdcchFormat |
| |
| log_list = [] |
| if dl_mcs: |
| log_list.append('dl_mcs: {}'.format(dl_mcs)) |
| if ul_mcs: |
| log_list.append('ul_mcs: {}'.format(ul_mcs)) |
| if dl_rb_alloc: |
| log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) |
| if ul_rb_alloc: |
| log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) |
| if dl_dci_ncce: |
| dl_dci_ncce = PdcchFormat(dl_dci_ncce) |
| log_list.append('dl_dci_ncce: {}'.format(dl_dci_ncce)) |
| if ul_dci_ncce: |
| ul_dci_ncce = PdcchFormat(ul_dci_ncce) |
| log_list.append('ul_dci_ncce: {}'.format(ul_dci_ncce)) |
| if dl_dci_format: |
| dl_dci_format = DciFormat(dl_dci_format) |
| log_list.append('dl_dci_format: {}'.format(dl_dci_format)) |
| if dl_tm: |
| dl_tm = DlTransmissionMode(dl_tm.value) |
| log_list.append('dl_tm: {}'.format(dl_tm)) |
| if dl_num_layers: |
| dl_num_layers = MaxLayersMIMO(dl_num_layers) |
| log_list.append('dl_num_layers: {}'.format(dl_num_layers)) |
| if dl_mcs_table: |
| dl_mcs_table = McsTable(dl_mcs_table) |
| log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) |
| |
| num_crs_antenna_ports = self._cell.get_num_crs_antenna_ports() |
| |
| # Sets num of crs antenna ports to 4 for configuring |
| self._cell.set_num_crs_antenna_ports(4) |
| scheduler = self._cmx.dut.get_scheduler(self._cell) |
| logger.info('configure scheduler for {}'.format(','.join(log_list))) |
| scheduler.configure_scheduler(dl_mcs=dl_mcs, |
| dl_rb_alloc=dl_rb_alloc, |
| dl_dci_ncce=dl_dci_ncce, |
| dl_dci_format=dl_dci_format, |
| dl_tm=dl_tm, |
| dl_num_layers=dl_num_layers, |
| dl_mcs_table=dl_mcs_table, |
| ul_mcs=ul_mcs, |
| ul_rb_alloc=ul_rb_alloc, |
| ul_dci_ncce=ul_dci_ncce) |
| logger.info('Configure scheduler succeeds') |
| |
| # Sets num of crs antenna ports back to previous value |
| self._cell.set_num_crs_antenna_ports(num_crs_antenna_ports) |
| self._network.apply_changes() |
| |
| @property |
| def bandwidth(self): |
| """Get the channel bandwidth of the cell. |
| |
| Return: |
| the number rb of the bandwidth. |
| """ |
| return self._cell.get_bandwidth().num_rb |
| |
| @property |
| def dl_channel(self): |
| """Gets the downlink channel of cell. |
| |
| Return: |
| the downlink channel (earfcn) in int. |
| """ |
| return int(self._cell.get_dl_earfcn()) |
| |
| @property |
| def dl_frequency(self): |
| """Get the downlink frequency of the cell.""" |
| from mrtype.frequency import Frequency |
| return self._cell.get_dl_earfcn().to_freq().in_units( |
| Frequency.Units.GHz) |
| |
| def _to_rb_bandwidth(self, bandwidth): |
| for idx in range(5): |
| if bandwidth < LTE_MHZ_UPPER_BOUND_TO_RB[idx][0]: |
| return LTE_MHZ_UPPER_BOUND_TO_RB[idx][1] |
| return 100 |
| |
| def disable_all_ul_subframes(self): |
| """Disables all ul subframes for LTE cell.""" |
| self._cc.disable_all_ul_subframes() |
| self._network.apply_changes() |
| |
| def set_bandwidth(self, bandwidth): |
| """Sets the channel bandwidth of the cell. |
| |
| Args: |
| bandwidth: channel bandwidth of cell in MHz. |
| """ |
| self._cell.set_bandwidth(self._to_rb_bandwidth(bandwidth)) |
| self._network.apply_changes() |
| |
| def set_cdrx_config(self): |
| """Sets LTE cdrx config for endc.""" |
| from mrtype.lte.drx import ( |
| LteDrxConfig, |
| LteDrxInactivityTimer, |
| LteDrxLongCycleStartOffset, |
| LteDrxOnDurationTimer, |
| LteDrxRetransmissionTimer, |
| ) |
| |
| logger.info('Config Lte drx config') |
| lte_drx_config = LteDrxConfig( |
| on_duration_timer=LteDrxOnDurationTimer.PSF_10, |
| inactivity_timer=LteDrxInactivityTimer.PSF_200, |
| retransmission_timer=LteDrxRetransmissionTimer.PSF_33, |
| long_cycle_start_offset=LteDrxLongCycleStartOffset.ms160(83), |
| short_drx=None) |
| self._cmx.dut.lte_cell_group().set_drx_and_adjust_scheduler( |
| drx_config=lte_drx_config) |
| self._network.apply_changes() |
| |
| def set_cell_frequency_band(self, tdd_cfg=None, ssf_cfg=None): |
| """Sets cell frequency band with tdd and ssf config. |
| |
| Args: |
| tdd_cfg: the tdd subframe assignment config in number (from 0-6). |
| ssf_cfg: the special subframe pattern config in number (from 1-9). |
| """ |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import SpecialSubframePattern |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import SubFrameAssignment |
| from rs_mrt.testenvironment.signaling.sri.rat.lte.config import CellFrequencyBand |
| from rs_mrt.testenvironment.signaling.sri.rat.lte.config import Tdd |
| tdd_subframe = None |
| ssf_pattern = None |
| if tdd_cfg: |
| tdd_subframe = SubFrameAssignment(tdd_cfg + 1) |
| if ssf_cfg: |
| ssf_pattern = SpecialSubframePattern(ssf_cfg) |
| tdd = Tdd(tdd_config=Tdd.TddConfigSignaling( |
| subframe_assignment=tdd_subframe, |
| special_subframe_pattern=ssf_pattern)) |
| self._cell.stub.SetCellFrequencyBand(CellFrequencyBand(tdd=tdd)) |
| self._network.apply_changes() |
| |
| def set_cfi(self, cfi): |
| """Sets number of pdcch symbols (cfi). |
| |
| Args: |
| cfi: the value of NumberOfPdcchSymbols |
| """ |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import NumberOfPdcchSymbols |
| from rs_mrt.testenvironment.signaling.sri.rat.lte.config import PdcchRegionReq |
| |
| logger.info('The cfi enum to set is {}'.format( |
| NumberOfPdcchSymbols(cfi))) |
| req = PdcchRegionReq() |
| req.num_pdcch_symbols = NumberOfPdcchSymbols(cfi) |
| self._cell.stub.SetPdcchControlRegion(req) |
| |
| def set_dci_format(self, dci_format): |
| """Selects the downlink control information (DCI) format. |
| |
| Args: |
| dci_format: supported dci. |
| """ |
| if not isinstance(dci_format, DciFormat): |
| raise CmxError('Wrong type for dci_format') |
| self._config_scheduler(dl_dci_format=dci_format.value) |
| |
| def set_dl_channel(self, channel): |
| """Sets the downlink channel number of cell. |
| |
| Args: |
| channel: downlink channel number of cell. |
| """ |
| if self.dl_channel == channel: |
| logger.info('The dl_channel was at {}'.format(self.dl_channel)) |
| return |
| self._cell.set_earfcn(channel) |
| logger.info('The dl_channel was set to {}'.format(self.dl_channel)) |
| |
| def set_dl_modulation_table(self, modulation): |
| """Sets down link modulation table. |
| |
| Args: |
| modulation: modulation table setting (ModulationType). |
| """ |
| if not isinstance(modulation, ModulationType): |
| raise CmxError('The modulation is not the type of Modulation') |
| self._config_scheduler(dl_mcs_table=modulation.value) |
| |
| def set_mimo_mode(self, mimo): |
| """Sets mimo mode for Lte scenario. |
| |
| Args: |
| mimo: the mimo mode. |
| """ |
| if not isinstance(mimo, MimoModes): |
| raise CmxError("Wrong type of mimo mode") |
| |
| self._cell.set_num_crs_antenna_ports(mimo.value) |
| |
| def set_scheduling_mode(self, |
| mcs_dl=None, |
| mcs_ul=None, |
| nrb_dl=None, |
| nrb_ul=None): |
| """Sets scheduling mode. |
| |
| Args: |
| scheduling: the new scheduling mode. |
| mcs_dl: Downlink MCS. |
| mcs_ul: Uplink MCS. |
| nrb_dl: Number of RBs for downlink. |
| nrb_ul: Number of RBs for uplink. |
| """ |
| self._config_scheduler(dl_mcs=mcs_dl, |
| ul_mcs=mcs_ul, |
| dl_rb_alloc=nrb_dl, |
| ul_rb_alloc=nrb_ul) |
| |
| def set_ssf_config(self, ssf_config): |
| """Sets ssf subframe assignment with tdd_config. |
| |
| Args: |
| ssf_config: the special subframe pattern config (from 1-9). |
| """ |
| self.set_cell_frequency_band(ssf_cfg=ssf_config) |
| |
| def set_tdd_config(self, tdd_config): |
| """Sets tdd subframe assignment with tdd_config. |
| |
| Args: |
| tdd_config: the subframe assignemnt config (from 0-6). |
| """ |
| self.set_cell_frequency_band(tdd_cfg=tdd_config) |
| |
| def set_transmission_mode(self, transmission_mode): |
| """Sets transmission mode with schedular. |
| |
| Args: |
| transmission_mode: the download link transmission mode. |
| """ |
| from rs_mrt.testenvironment.signaling.sri.rat.lte import DlTransmissionMode |
| if not isinstance(transmission_mode, TransmissionModes): |
| raise CmxError('Wrong type of the trasmission mode') |
| dl_tm = DlTransmissionMode(transmission_mode.value) |
| logger.info('set dl tm to {}'.format(dl_tm)) |
| self._cc.set_dl_tm(dl_tm) |
| self._network.apply_changes() |
| |
| def set_ul_channel(self, channel): |
| """Sets the up link channel number of cell. |
| |
| Args: |
| channel: up link channel number of cell. |
| """ |
| if self.ul_channel == channel: |
| logger.info('The ul_channel is at {}'.format(self.ul_channel)) |
| return |
| self._cell.set_earfcn(channel) |
| logger.info('The dl_channel was set to {}'.format(self.ul_channel)) |
| |
| @property |
| def ul_channel(self): |
| """Gets the uplink channel of cell. |
| |
| Return: |
| the uplink channel (earfcn) in int |
| """ |
| return int(self._cell.get_ul_earfcn()) |
| |
| @property |
| def ul_frequency(self): |
| """Get the uplink frequency of the cell. |
| |
| Return: |
| The uplink frequency in GHz. |
| """ |
| from mrtype.frequency import Frequency |
| return self._cell.get_ul_earfcn().to_freq().in_units( |
| Frequency.Units.GHz) |
| |
| def set_ul_modulation_table(self, modulation): |
| """Sets up link modulation table. |
| |
| Args: |
| modulation: modulation table setting (ModulationType). |
| """ |
| if not isinstance(modulation, ModulationType): |
| raise CmxError('The modulation is not the type of Modulation') |
| if modulation == ModulationType.Q16: |
| self._cell.stub.SetPuschCommonConfig(False) |
| else: |
| self._cell.stub.SetPuschCommonConfig(True) |
| |
| |
| class NrBaseStation(BaseStation): |
| """ NR base station.""" |
| def __init__(self, cmx, cell): |
| """Init method to setup variables for the NR base station. |
| |
| Args: |
| cmx: Controller (Cmx500) object. |
| cell: The cell for the NR base station. |
| """ |
| from xlapi.nr_cell import NrCell |
| if not isinstance(cell, NrCell): |
| raise CmxError('the cell is not a NR cell, NR base station fails' |
| ' to creat.') |
| |
| super().__init__(cmx, cell) |
| |
| def _config_scheduler(self, |
| dl_mcs=None, |
| dl_mcs_table=None, |
| dl_rb_alloc=None, |
| dl_mimo_mode=None, |
| ul_mcs=None, |
| ul_mcs_table=None, |
| ul_rb_alloc=None, |
| ul_mimo_mode=None): |
| |
| from rs_mrt.testenvironment.signaling.sri.rat.nr import McsTable |
| |
| log_list = [] |
| if dl_mcs: |
| log_list.append('dl_mcs: {}'.format(dl_mcs)) |
| if ul_mcs: |
| log_list.append('ul_mcs: {}'.format(ul_mcs)) |
| |
| # If rb alloc is not a tuple, add 0 as start RBs for XLAPI NR scheduler |
| if dl_rb_alloc: |
| if not isinstance(dl_rb_alloc, tuple): |
| dl_rb_alloc = (0, dl_rb_alloc) |
| log_list.append('dl_rb_alloc: {}'.format(dl_rb_alloc)) |
| if ul_rb_alloc: |
| if not isinstance(ul_rb_alloc, tuple): |
| ul_rb_alloc = (0, ul_rb_alloc) |
| log_list.append('ul_rb_alloc: {}'.format(ul_rb_alloc)) |
| if dl_mcs_table: |
| dl_mcs_table = McsTable(dl_mcs_table) |
| log_list.append('dl_mcs_table: {}'.format(dl_mcs_table)) |
| if ul_mcs_table: |
| ul_mcs_table = McsTable(ul_mcs_table) |
| log_list.append('ul_mcs_table: {}'.format(ul_mcs_table)) |
| if dl_mimo_mode: |
| log_list.append('dl_mimo_mode: {}'.format(dl_mimo_mode)) |
| if ul_mimo_mode: |
| log_list.append('ul_mimo_mode: {}'.format(ul_mimo_mode)) |
| |
| scheduler = self._cmx.dut.get_scheduler(self._cell) |
| logger.info('configure scheduler for {}'.format(','.join(log_list))) |
| |
| scheduler.configure_ue_scheduler(dl_mcs=dl_mcs, |
| dl_mcs_table=dl_mcs_table, |
| dl_rb_alloc=dl_rb_alloc, |
| dl_mimo_mode=dl_mimo_mode, |
| ul_mcs=ul_mcs, |
| ul_mcs_table=ul_mcs_table, |
| ul_rb_alloc=ul_rb_alloc, |
| ul_mimo_mode=ul_mimo_mode) |
| logger.info('Configure scheduler succeeds') |
| self._network.apply_changes() |
| |
| def attach_as_secondary_cell(self, endc_timer=DEFAULT_ENDC_TIMER): |
| """Enable endc mode for NR cell. |
| |
| Args: |
| endc_timer: timeout for endc state |
| """ |
| logger.info('enable endc mode for nsa dual connection') |
| self._cmx.dut.signaling.nsa_dual_connect(self._cell) |
| time_count = 0 |
| while time_count < endc_timer: |
| if str(self._cmx.dut.state.radio_connectivity) == \ |
| 'RadioConnectivityMode.EPS_LTE_NR': |
| logger.info('enter endc mode') |
| return |
| time.sleep(1) |
| time_count += 1 |
| if time_count % 30 == 0: |
| logger.info('did not reach endc at {} s'.format(time_count)) |
| raise CmxError('Cannot reach endc after {} s'.format(endc_timer)) |
| |
| def config_flexible_slots(self): |
| """Configs flexible slots for NR cell.""" |
| |
| from rs_mrt.testenvironment.signaling.sri.rat.nr.config import CellFrequencyBandReq |
| from rs_mrt.testenvironment.signaling.sri.rat.common import SetupRelease |
| |
| logger.info('Config flexible slots') |
| req = CellFrequencyBandReq() |
| req.band.frequency_range.fr1.tdd.tdd_config_common.setup_release = SetupRelease.RELEASE |
| self._cell.stub.SetCellFrequencyBand(req) |
| self._network.apply_changes() |
| |
| def disable_all_ul_slots(self): |
| """Disables all ul slots for NR cell""" |
| self._cc.disable_all_ul_slots() |
| |
| @property |
| def dl_channel(self): |
| """Gets the downlink channel of cell. |
| |
| Return: |
| the downlink channel (nr_arfcn) in int. |
| """ |
| return int(self._cell.get_dl_ref_a()) |
| |
| def _bandwidth_to_carrier_bandwidth(self, bandwidth): |
| """Converts bandwidth in MHz to CarrierBandwidth. |
| CarrierBandwidth Enum in XLAPI: |
| MHZ_5 = 0 |
| MHZ_10 = 1 |
| MHZ_15 = 2 |
| MHZ_20 = 3 |
| MHZ_25 = 4 |
| MHZ_30 = 5 |
| MHZ_40 = 6 |
| MHZ_50 = 7 |
| MHZ_60 = 8 |
| MHZ_70 = 9 |
| MHZ_80 = 10 |
| MHZ_90 = 11 |
| MHZ_100 = 12 |
| MHZ_200 = 13 |
| MHZ_400 = 14 |
| Args: |
| bandwidth: channel bandwidth in MHz. |
| |
| Return: |
| the corresponding NR Carrier Bandwidth. |
| """ |
| from mrtype.nr.frequency import CarrierBandwidth |
| if bandwidth > 100: |
| return CarrierBandwidth(12 + bandwidth // 200) |
| elif bandwidth > 30: |
| return CarrierBandwidth(2 + bandwidth // 10) |
| else: |
| return CarrierBandwidth(bandwidth // 5 - 1) |
| |
| def set_bandwidth(self, bandwidth, scs=None): |
| """Sets the channel bandwidth of the cell. |
| |
| Args: |
| bandwidth: channel bandwidth of cell. |
| scs: subcarrier spacing (SCS) of resource grid 0 |
| """ |
| if not scs: |
| scs = self._cell.get_scs() |
| self._cell.set_carrier_bandwidth_and_scs( |
| self._bandwidth_to_carrier_bandwidth(bandwidth), scs) |
| logger.info( |
| 'The bandwidth in MHz is {}. After setting, the value is {}'. |
| format(bandwidth, str(self._cell.get_carrier_bandwidth()))) |
| |
| def set_dl_channel(self, channel): |
| """Sets the downlink channel number of cell. |
| |
| Args: |
| channel: downlink channel number of cell or Frequency Range ('LOW', |
| 'MID' or 'HIGH'). |
| """ |
| from mrtype.nr.frequency import NrArfcn |
| from mrtype.frequency import FrequencyRange |
| |
| # When the channel is string, set it use Frenquency Range |
| if isinstance(channel, str): |
| logger.info('Sets dl channel Frequency Range {}'.format(channel)) |
| frequency_range = FrequencyRange.LOW |
| if channel.upper() == 'MID': |
| frequency_range = FrequencyRange.MID |
| elif channel.upper() == 'HIGH': |
| frequency_range = FrequencyRange.HIGH |
| self._cell.set_dl_ref_a_offset(self.band, frequency_range) |
| logger.info('The dl_channel was set to {}'.format(self.dl_channel)) |
| return |
| if self.dl_channel == channel: |
| logger.info('The dl_channel was at {}'.format(self.dl_channel)) |
| return |
| self._cell.set_dl_ref_a_offset(self.band, NrArfcn(channel)) |
| logger.info('The dl_channel was set to {}'.format(self.dl_channel)) |
| |
| def set_dl_modulation_table(self, modulation): |
| """Sets down link modulation table. |
| |
| Args: |
| modulation: modulation table setting (ModulationType). |
| """ |
| if not isinstance(modulation, ModulationType): |
| raise CmxError('The modulation is not the type of Modulation') |
| self._config_scheduler(dl_mcs_table=modulation.value) |
| |
| def set_mimo_mode(self, mimo): |
| """Sets mimo mode for NR nsa scenario. |
| |
| Args: |
| mimo: the mimo mode. |
| """ |
| from rs_mrt.testenvironment.signaling.sri.rat.nr import DownlinkMimoMode |
| if not isinstance(mimo, MimoModes): |
| raise CmxError("Wrong type of mimo mode") |
| |
| self._config_scheduler(dl_mimo_mode=DownlinkMimoMode.Enum(mimo.value)) |
| |
| def set_scheduling_mode(self, |
| mcs_dl=None, |
| mcs_ul=None, |
| nrb_dl=None, |
| nrb_ul=None): |
| """Sets scheduling mode. |
| |
| Args: |
| mcs_dl: Downlink MCS. |
| mcs_ul: Uplink MCS. |
| nrb_dl: Number of RBs for downlink. |
| nrb_ul: Number of RBs for uplink. |
| """ |
| self._config_scheduler(dl_mcs=mcs_dl, |
| ul_mcs=mcs_ul, |
| dl_rb_alloc=nrb_dl, |
| ul_rb_alloc=nrb_ul) |
| |
| def set_ssf_config(self, ssf_config): |
| """Sets ssf subframe assignment with tdd_config. |
| |
| Args: |
| ssf_config: the special subframe pattern config (from 1-9). |
| """ |
| raise CmxError('the set ssf config for nr did not implemente yet') |
| |
| def set_tdd_config(self, tdd_config): |
| """Sets tdd subframe assignment with tdd_config. |
| |
| Args: |
| tdd_config: the subframe assignemnt config (from 0-6). |
| """ |
| raise CmxError('the set tdd config for nr did not implemente yet') |
| |
| def set_transmission_mode(self, transmission_mode): |
| """Sets transmission mode with schedular. |
| |
| Args: |
| transmission_mode: the download link transmission mode. |
| """ |
| logger.info('The set transmission mode for nr is set by mimo mode') |
| |
| def set_ul_modulation_table(self, modulation): |
| """Sets down link modulation table. |
| |
| Args: |
| modulation: modulation table setting (ModulationType). |
| """ |
| if not isinstance(modulation, ModulationType): |
| raise CmxError('The modulation is not the type of Modulation') |
| self._config_scheduler(ul_mcs_table=modulation.value) |
| |
| |
| class CmxError(Exception): |
| """Class to raise exceptions related to cmx.""" |