blob: 4ac414b246a8bcaacfac6272f0d4ebe78c2023ac [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from acts.controllers.rohdeschwarz_lib import cmw500
from acts.controllers import cellular_simulator as cc
from acts.controllers.rohdeschwarz_lib import cmw500_scenario_generator as cg
from acts.controllers.cellular_lib import LteSimulation
CMW_TM_MAPPING = {
LteSimulation.TransmissionMode.TM1: cmw500.TransmissionModes.TM1,
LteSimulation.TransmissionMode.TM2: cmw500.TransmissionModes.TM2,
LteSimulation.TransmissionMode.TM3: cmw500.TransmissionModes.TM3,
LteSimulation.TransmissionMode.TM4: cmw500.TransmissionModes.TM4,
LteSimulation.TransmissionMode.TM7: cmw500.TransmissionModes.TM7,
LteSimulation.TransmissionMode.TM8: cmw500.TransmissionModes.TM8,
LteSimulation.TransmissionMode.TM9: cmw500.TransmissionModes.TM9
}
CMW_SCH_MAPPING = {
LteSimulation.SchedulingMode.STATIC: cmw500.SchedulingMode.USERDEFINEDCH
}
CMW_MIMO_MAPPING = {
LteSimulation.MimoMode.MIMO_1x1: cmw500.MimoModes.MIMO1x1,
LteSimulation.MimoMode.MIMO_2x2: cmw500.MimoModes.MIMO2x2,
LteSimulation.MimoMode.MIMO_4x4: cmw500.MimoModes.MIMO4x4
}
MIMO_ANTENNA_COUNT = {
LteSimulation.MimoMode.MIMO_1x1: 1,
LteSimulation.MimoMode.MIMO_2x2: 2,
LteSimulation.MimoMode.MIMO_4x4: 4,
}
IP_ADDRESS_TYPE_MAPPING = {
LteSimulation.IPAddressType.IPV4: cmw500.IPAddressType.IPV4,
LteSimulation.IPAddressType.IPV6: cmw500.IPAddressType.IPV6,
LteSimulation.IPAddressType.IPV4V6: cmw500.IPAddressType.IPV4V6,
}
# get mcs vs tbsi map with 256-qam disabled(downlink)
get_mcs_tbsi_map_dl = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9
},
cmw500.ModulationType.Q16: {
10: 9,
11: 10,
12: 11,
13: 12,
14: 13,
15: 14,
16: 15
},
cmw500.ModulationType.Q64: {
17: 15,
18: 16,
19: 17,
20: 18,
21: 19,
22: 20,
23: 21,
24: 22,
25: 23,
26: 24,
27: 25,
28: 26
}
}
# get mcs vs tbsi map with 256-qam enabled(downlink)
get_mcs_tbsi_map_for_256qam_dl = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 2,
2: 4,
3: 6,
4: 8,
},
cmw500.ModulationType.Q16: {
5: 10,
6: 11,
7: 12,
8: 13,
9: 14,
10: 15
},
cmw500.ModulationType.Q64: {
11: 16,
12: 17,
13: 18,
14: 19,
15: 20,
16: 21,
17: 22,
18: 23,
19: 24
},
cmw500.ModulationType.Q256: {
20: 25,
21: 27,
22: 28,
23: 29,
24: 30,
25: 31,
26: 32,
27: 33
}
}
# get mcs vs tbsi map (uplink)
get_mcs_tbsi_map_ul = {
cmw500.ModulationType.QPSK: {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 6,
7: 7,
8: 8,
9: 9
},
cmw500.ModulationType.Q16: {
10: 10,
11: 10,
12: 11,
13: 12,
14: 13,
15: 14,
16: 15,
17: 16,
18: 17,
19: 18,
20: 19,
21: 19,
22: 20,
23: 21,
24: 22,
25: 23,
26: 24,
27: 25,
28: 26
}
}
class CMW500CellularSimulator(cc.AbstractCellularSimulator):
""" A cellular simulator for telephony simulations based on the CMW 500
controller. """
# The maximum number of carriers that this simulator can support for LTE
LTE_MAX_CARRIERS = 1
def __init__(self, ip_address, port):
""" Initializes the cellular simulator.
Args:
ip_address: the ip address of the CMW500
port: the port number for the CMW500 controller
"""
super().__init__()
try:
self.cmw = cmw500.Cmw500(ip_address, port)
except cmw500.CmwError:
raise cc.CellularSimulatorError('Could not connect to CMW500.')
self.bts = None
self.dl_modulation = None
self.ul_modulation = None
def destroy(self):
""" Sends finalization commands to the cellular equipment and closes
the connection. """
self.cmw.disconnect()
def setup_lte_scenario(self):
""" Configures the equipment for an LTE simulation. """
self.cmw.connection_type = cmw500.ConnectionType.DAU
# if bts has not yet been initialized
if not self.bts:
self.bts = [self.cmw.get_base_station()]
self.cmw.switch_lte_signalling(cmw500.LteState.LTE_ON)
def set_band_combination(self, bands, mimo_modes):
""" Prepares the test equipment for the indicated band/mimo combination.
Args:
bands: a list of bands represented as ints or strings
mimo_modes: a list of LteSimulation.MimoMode to use for each carrier
"""
self.num_carriers = len(bands)
self.bts = [self.cmw.get_base_station(cmw500.BtsNumber.BTS1)]
for i in range(1, len(bands)):
self.bts.append(
self.cmw.get_base_station(cmw500.BtsNumber('SCC{}'.format(i))))
antennas = [MIMO_ANTENNA_COUNT[m] for m in mimo_modes]
self.set_scenario(bands, antennas)
def get_band_combination(self):
return [int(bts.band.strip('OB')) for bts in self.bts]
def set_lte_rrc_state_change_timer(self, enabled, time=10):
""" Configures the LTE RRC state change timer.
Args:
enabled: a boolean indicating if the timer should be on or off.
time: time in seconds for the timer to expire
"""
if enabled:
self.cmw.rrc_connection = cmw500.RrcState.RRC_OFF
self.cmw.rrc_connection_timer = time
else:
self.cmw.rrc_connection = cmw500.RrcState.RRC_ON
def set_band(self, bts_index, band):
""" Sets the band for the indicated base station.
Args:
bts_index: the base station number
band: the new band
"""
bts = self.bts[bts_index]
bts.duplex_mode = self.get_duplex_mode(band)
band = 'OB' + band
bts.band = band
self.log.debug('Band set to {}'.format(band))
def get_duplex_mode(self, band):
""" Determines if the band uses FDD or TDD duplex mode
Args:
band: a band number
Returns:
an variable of class DuplexMode indicating if band is FDD or TDD
"""
if 33 <= int(band) <= 46:
return cmw500.DuplexMode.TDD
else:
return cmw500.DuplexMode.FDD
def set_input_power(self, bts_index, input_power):
""" Sets the input power for the indicated base station.
Args:
bts_index: the base station number
input_power: the new input power
"""
bts = self.bts[bts_index]
if input_power > 23:
self.log.warning('Open loop supports-50dBm to 23 dBm. '
'Setting it to max power 23 dBm')
input_power = 23
bts.tpc_closed_loop_target_power = input_power
bts.uplink_power_control = input_power
bts.tpc_power_control = cmw500.TpcPowerControl.CLOSED_LOOP
def set_output_power(self, bts_index, output_power):
""" Sets the output power for the indicated base station.
Args:
bts_index: the base station number
output_power: the new output power
"""
bts = self.bts[bts_index]
bts.downlink_power_level = output_power
def set_tdd_config(self, bts_index, tdd_config):
""" Sets the tdd configuration number for the indicated base station.
Args:
bts_index: the base station number
tdd_config: the new tdd configuration number
"""
self.bts[bts_index].uldl_configuration = tdd_config
def set_ssf_config(self, bts_index, ssf_config):
""" Sets the Special Sub-Frame config number for the indicated
base station.
Args:
bts_index: the base station number
ssf_config: the new ssf config number
"""
if not 0 <= ssf_config <= 9:
raise ValueError('The Special Sub-Frame configuration has to be a '
'number between 0 and 9.')
self.bts[bts_index].tdd_special_subframe = ssf_config
def set_bandwidth(self, bts_index, bandwidth):
""" Sets the bandwidth for the indicated base station.
Args:
bts_index: the base station number
bandwidth: the new bandwidth
"""
bts = self.bts[bts_index]
if bandwidth == 20:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_20MHz
elif bandwidth == 15:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_15MHz
elif bandwidth == 10:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_10MHz
elif bandwidth == 5:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_5MHz
elif bandwidth == 3:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_3MHz
elif bandwidth == 1.4:
bts.bandwidth = cmw500.LteBandwidth.BANDWIDTH_1MHz
else:
msg = 'Bandwidth {} MHz is not valid for LTE'.format(bandwidth)
raise ValueError(msg)
def set_downlink_channel_number(self, bts_index, channel_number):
""" Sets the downlink channel number for the indicated base station.
Args:
bts_index: the base station number
channel_number: the new channel number
"""
bts = self.bts[bts_index]
bts.dl_channel = channel_number
self.log.debug('Downlink Channel set to {}'.format(bts.dl_channel))
def set_mimo_mode(self, bts_index, mimo_mode):
""" Sets the mimo mode for the indicated base station.
Args:
bts_index: the base station number
mimo_mode: the new mimo mode
"""
bts = self.bts[bts_index]
mimo_mode = CMW_MIMO_MAPPING[mimo_mode]
if mimo_mode == cmw500.MimoModes.MIMO1x1:
self.set_bts_antenna(bts_index, 1)
bts.dl_antenna = cmw500.MimoModes.MIMO1x1
elif mimo_mode == cmw500.MimoModes.MIMO2x2:
self.set_bts_antenna(bts_index, 2)
bts.dl_antenna = cmw500.MimoModes.MIMO2x2
# set default transmission mode and DCI for this scenario
bts.transmode = cmw500.TransmissionModes.TM3
bts.dci_format = cmw500.DciFormat.D2A
elif mimo_mode == cmw500.MimoModes.MIMO4x4:
self.set_bts_antenna(bts_index, 4)
bts.dl_antenna = cmw500.MimoModes.MIMO4x4
else:
raise RuntimeError('The requested MIMO mode is not supported.')
def set_bts_antenna(self, bts_index, antenna_count):
""" Sets the mimo mode for a particular component carrier.
Args:
bts_index: index of the base station to configure.
antenna_count: number of antennas to use for the component carrier
either (1, 2, 4)
"""
antennas = self.get_antenna_combination()
antennas[bts_index] = antenna_count
bands = self.get_band_combination()
self.set_scenario(bands, antennas)
def get_antenna_combination(self):
""" Gets the current antenna configuration.
Returns:
antenna_count: a list containing the number of antennas to use for
each bts/CC.
"""
cmd = 'ROUTe:LTE:SIGN:SCENario?'
scenario_name = self.cmw.send_and_recv(cmd)
return cg.get_antennas(scenario_name)
def set_scenario(self, bands, antennas):
""" Sets the MIMO scenario on the CMW.
Args:
bands: a list defining the bands to use for each CC.
antennas: a list of integers defining the number of antennas to use
for each bts/CC.
"""
self.log.debug('Setting scenario: bands: {}, antennas: {}'.format(
bands, antennas))
scenario = cg.get_scenario(bands, antennas)
cmd = 'ROUTe:LTE:SIGN:SCENario:{}:FLEXible {}'.format(
scenario.name, scenario.routing)
self.cmw.send_and_recv(cmd)
self.cmw.scc_activation_mode = cmw500.SccActivationMode.AUTO
# apply requested bands now
for i, band in enumerate(bands):
self.set_band(i, str(band))
def set_transmission_mode(self, bts_index, tmode):
""" Sets the transmission mode for the indicated base station.
Args:
bts_index: the base station number
tmode: the new transmission mode
"""
bts = self.bts[bts_index]
tmode = CMW_TM_MAPPING[tmode]
if (tmode in [
cmw500.TransmissionModes.TM1, cmw500.TransmissionModes.TM7
] and bts.dl_antenna == cmw500.MimoModes.MIMO1x1.value):
bts.transmode = tmode
elif (tmode.value in cmw500.TransmissionModes.__members__
and bts.dl_antenna == cmw500.MimoModes.MIMO2x2.value):
bts.transmode = tmode
elif (tmode in [
cmw500.TransmissionModes.TM2, cmw500.TransmissionModes.TM3,
cmw500.TransmissionModes.TM4, cmw500.TransmissionModes.TM9
] and bts.dl_antenna == cmw500.MimoModes.MIMO4x4.value):
bts.transmode = tmode
else:
raise ValueError('Transmission modes should support the current '
'mimo mode')
def set_scheduling_mode(self,
bts_index,
scheduling,
mcs_dl=None,
mcs_ul=None,
nrb_dl=None,
nrb_ul=None):
""" Sets the scheduling mode for the indicated base station.
Args:
bts_index: the base station number.
scheduling: the new scheduling mode.
mcs_dl: Downlink MCS.
mcs_ul: Uplink MCS.
nrb_dl: Number of RBs for downlink.
nrb_ul: Number of RBs for uplink.
"""
bts = self.bts[bts_index]
bts.reduced_pdcch = cmw500.ReducedPdcch.ON
scheduling = CMW_SCH_MAPPING[scheduling]
bts.scheduling_mode = scheduling
if not (self.ul_modulation and self.dl_modulation):
raise ValueError('Modulation should be set prior to scheduling '
'call')
if scheduling == cmw500.SchedulingMode.RMC:
if not nrb_ul and nrb_dl:
raise ValueError('nrb_ul and nrb dl should not be none')
bts.rb_configuration_ul = (nrb_ul, self.ul_modulation, 'KEEP')
self.log.info('ul rb configurations set to {}'.format(
bts.rb_configuration_ul))
time.sleep(1)
self.log.debug('Setting rb configurations for down link')
bts.rb_configuration_dl = (nrb_dl, self.dl_modulation, 'KEEP')
self.log.info('dl rb configurations set to {}'.format(
bts.rb_configuration_ul))
elif scheduling == cmw500.SchedulingMode.USERDEFINEDCH:
if not all([nrb_ul, nrb_dl, mcs_dl, mcs_ul]):
raise ValueError('All parameters are mandatory.')
tbs = get_mcs_tbsi_map_ul[self.ul_modulation][mcs_ul]
bts.rb_configuration_ul = (nrb_ul, 0, self.ul_modulation, tbs)
self.log.info('ul rb configurations set to {}'.format(
bts.rb_configuration_ul))
time.sleep(1)
if self.dl_256_qam_enabled:
tbs = get_mcs_tbsi_map_for_256qam_dl[
self.dl_modulation][mcs_dl]
else:
tbs = get_mcs_tbsi_map_dl[self.dl_modulation][mcs_dl]
bts.rb_configuration_dl = (nrb_dl, 0, self.dl_modulation, tbs)
self.log.info('dl rb configurations set to {}'.format(
bts.rb_configuration_dl))
def set_dl_256_qam_enabled(self, bts_index, enabled):
""" Determines what MCS table should be used for the downlink.
This only saves the setting that will be used when configuring MCS.
Args:
bts_index: the base station number
enabled: whether 256 QAM should be used
"""
self.log.info('Set 256 QAM DL MCS enabled: ' + str(enabled))
self.dl_modulation = cmw500.ModulationType.Q256 if enabled \
else cmw500.ModulationType.Q64
self.dl_256_qam_enabled = enabled
def set_ul_64_qam_enabled(self, bts_index, enabled):
""" Determines what MCS table should be used for the uplink.
This only saves the setting that will be used when configuring MCS.
Args:
bts_index: the base station number
enabled: whether 64 QAM should be used
"""
self.log.info('Set 64 QAM UL MCS enabled: ' + str(enabled))
self.ul_modulation = cmw500.ModulationType.Q64 if enabled \
else cmw500.ModulationType.Q16
self.ul_64_qam_enabled = enabled
def set_mac_padding(self, bts_index, mac_padding):
""" Enables or disables MAC padding in the indicated base station.
Args:
bts_index: the base station number
mac_padding: the new MAC padding setting
"""
# TODO (b/143918664): CMW500 doesn't have an equivalent setting.
def set_cfi(self, bts_index, cfi):
""" Sets the Channel Format Indicator for the indicated base station.
Args:
bts_index: the base station number
cfi: the new CFI setting
"""
# TODO (b/143497738): implement.
self.log.error('Setting CFI is not yet implemented in the CMW500 '
'controller.')
def set_paging_cycle(self, bts_index, cycle_duration):
""" Sets the paging cycle duration for the indicated base station.
Args:
bts_index: the base station number
cycle_duration: the new paging cycle duration in milliseconds
"""
# TODO (b/146068532): implement.
self.log.error('Setting the paging cycle duration is not yet '
'implemented in the CMW500 controller.')
def set_phich_resource(self, bts_index, phich):
""" Sets the PHICH Resource setting for the indicated base station.
Args:
bts_index: the base station number
phich: the new PHICH resource setting
"""
self.log.error('Configuring the PHICH resource setting is not yet '
'implemented in the CMW500 controller.')
def set_drx_connected_mode(self, bts_index, active):
""" Sets the time interval to wait before entering DRX mode
Args:
bts_index: the base station number
active: Boolean indicating whether cDRX mode
is active
"""
self.cmw.drx_connected_mode = (cmw500.DrxMode.USER_DEFINED
if active else cmw500.DrxMode.OFF)
def set_drx_on_duration_timer(self, bts_index, timer):
""" Sets the amount of PDCCH subframes to wait for data after
waking up from a DRX cycle
Args:
bts_index: the base station number
timer: Number of PDCCH subframes to wait and check for user data
after waking from the DRX cycle
"""
timer = 'PSF{}'.format(timer)
self.cmw.drx_on_duration_timer = timer
def set_drx_inactivity_timer(self, bts_index, timer):
""" Sets the number of PDCCH subframes to wait before entering DRX mode
Args:
bts_index: the base station number
timer: The amount of time to wait before entering DRX mode
"""
timer = 'PSF{}'.format(timer)
self.cmw.drx_inactivity_timer = timer
def set_drx_retransmission_timer(self, bts_index, timer):
""" Sets the number of consecutive PDCCH subframes to wait
for retransmission
Args:
bts_index: the base station number
timer: Number of PDCCH subframes to remain active
"""
timer = 'PSF{}'.format(timer)
self.cmw.drx_retransmission_timer = timer
def set_drx_long_cycle(self, bts_index, cycle):
""" Sets the amount of subframes representing a DRX long cycle.
Args:
bts_index: the base station number
cycle: The amount of subframes representing one long DRX cycle.
One cycle consists of DRX sleep + DRX on duration
"""
cycle = 'SF{}'.format(cycle)
self.cmw.drx_long_cycle = cycle
def set_drx_long_cycle_offset(self, bts_index, offset):
""" Sets the offset used to determine the subframe number
to begin the long drx cycle
Args:
bts_index: the base station number
offset: Number in range 0 to (long cycle - 1)
"""
self.cmw.drx_long_cycle_offset = offset
def set_apn(self, apn):
""" Configures the callbox network Access Point Name.
Args:
apn: the APN name
"""
self.cmw.apn = apn
def set_ip_type(self, ip_type):
""" Configures the callbox network IP type.
Args:
ip_type: the network type to use.
"""
self.cmw.ip_type = IP_ADDRESS_TYPE_MAPPING[ip_type]
def set_mtu(self, mtu):
""" Configures the callbox network Maximum Transmission Unit.
Args:
mtu: the MTU size.
"""
self.cmw.mtu = mtu
def lte_attach_secondary_carriers(self, ue_capability_enquiry):
""" Activates the secondary carriers for CA. Requires the DUT to be
attached to the primary carrier first.
Args:
ue_capability_enquiry: UE capability enquiry message to be sent to
the UE before starting carrier aggregation.
"""
for i in range(1, len(self.bts)):
bts = self.bts[i]
if bts.scc_state == cmw500.SccState.OFF.value:
if (self.cmw.scc_activation_mode ==
cmw500.SccActivationMode.MANUAL.value):
self.cmw.switch_scc_state(i, cmw500.SccState.ON)
# SEMI_AUTO -> directly to MAC activation
if (self.cmw.scc_activation_mode ==
cmw500.SccActivationMode.SEMI_AUTO.value):
self.cmw.switch_scc_state(i, cmw500.SccState.MAC)
# no need to do anything for auto
if self.cmw.scc_activation_mode != cmw500.SccActivationMode.AUTO.value:
if bts.scc_state == cmw500.SccState.ON.value:
self.cmw.switch_scc_state(i, cmw500.SccState.RRC)
if bts.scc_state == cmw500.SccState.RRC.value:
self.cmw.switch_scc_state(i, cmw500.SccState.MAC)
self.cmw.wait_for_scc_state(i, [cmw500.SccState.MAC])
def wait_until_attached(self, timeout=120):
""" Waits until the DUT is attached to the primary carrier.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_attached_state(timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'attached state before '
'the timeout period ended.')
def wait_until_communication_state(self, timeout=120):
""" Waits until the DUT is in Communication state.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_rrc_state(cmw500.LTE_CONN_RESP, timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'Communication state before '
'the timeout period ended.')
def wait_until_idle_state(self, timeout=120):
""" Waits until the DUT is in Idle state.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
try:
self.cmw.wait_for_rrc_state(cmw500.LTE_IDLE_RESP, timeout=timeout)
except cmw500.CmwError:
raise cc.CellularSimulatorError('The phone was not in '
'Idle state before '
'the timeout period ended.')
def wait_until_quiet(self, timeout=120):
"""Waits for all pending operations to finish on the simulator.
Args:
timeout: after this amount of time the method will raise a
CellularSimulatorError exception. Default is 120 seconds.
"""
self.cmw.send_and_recv('*OPC?')
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
self.cmw.detach()
def stop(self):
""" Stops current simulation. After calling this method, the simulator
will need to be set up again. """
self.detach()
self.cmw.switch_lte_signalling(cmw500.LteState.LTE_OFF)
def start_data_traffic(self):
""" Starts transmitting data from the instrument to the DUT. """
raise NotImplementedError()
def stop_data_traffic(self):
""" Stops transmitting data from the instrument to the DUT. """
raise NotImplementedError()
def send_sms(self, message):
""" Sends an SMS message to the DUT.
Args:
message: the SMS message to send.
"""
self.cmw.set_sms(message)
self.cmw.send_sms()