Merge "Add cmx event watcher"
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
index 6336b7a..a59a43a 100644
--- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_cellular_simulator.py
@@ -15,6 +15,7 @@
# limitations under the License.
import time
+from enum import Enum
from acts.controllers.rohdeschwarz_lib import cmx500
from acts.controllers.rohdeschwarz_lib.cmx500 import LteBandwidth
@@ -43,6 +44,10 @@
}
+class ConfigurationMode(Enum):
+ Power = "Power"
+
+
class CMX500CellularSimulator(cc.AbstractCellularSimulator):
""" A cellular simulator for telephony simulations based on the CMX 500
controller. """
@@ -50,12 +55,16 @@
# The maximum power that the equipment is able to transmit
MAX_DL_POWER = -25
- def __init__(self, ip_address, port='5025'):
+ def __init__(self,
+ ip_address,
+ port='5025',
+ config_mode=ConfigurationMode.Power):
""" Initializes the cellular simulator.
Args:
ip_address: the ip address of the CMX500
port: the port number for the CMX500 controller
+ config_mode: A pre-defined configuration mode to use.
"""
super().__init__()
try:
@@ -63,6 +72,7 @@
except:
raise cc.CellularSimulatorError('Error when Initializes CMX500.')
+ self._config_mode = config_mode
self.bts = self.cmx.bts
def destroy(self):
@@ -339,6 +349,14 @@
self.bts[1].attach_as_secondary_cell()
time.sleep(10)
+ if self._config_mode and self._config_mode == ConfigurationMode.Power:
+ self.configure_for_power_measurement()
+
+ self.log.info('The radio connectivity is {}'.format(
+ self.cmx.dut.state.radio_connectivity))
+
+ def configure_for_power_measurement(self):
+ """ Applies a pre-defined configuration for PDCCH power testing."""
self.log.info('set lte cdrx for nr nsa scenario')
self.bts[0].set_cdrx_config()
time.sleep(5)
@@ -360,9 +378,6 @@
self.bts[1].disable_all_ul_slots()
time.sleep(5)
- self.log.info('The radio connectivity is {}'.format(
- self.cmx.dut.state.radio_connectivity))
-
def wait_until_attached(self, timeout=120):
""" Waits until the DUT is attached to the primary carrier.
@@ -401,6 +416,15 @@
self.log.info('wait for rrc off state')
return self.cmx.wait_for_rrc_state(cmx500.RrcState.RRC_OFF, timeout)
+ def wait_until_quiet(self, timeout=120):
+ """Waits for all pending operations to finish on the simulator.
+
+ Args:
+ timeout: after this amount of time the method will raise a
+ CellularSimulatorError exception. Default is 120 seconds.
+ """
+ self.cmx._network.apply_changes()
+
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
self.log.info('Bypass simulator detach step for now')
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_events.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_events.py
new file mode 100644
index 0000000..f4abb0c
--- /dev/null
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmx500_events.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python3
+#
+# Copyright 2023 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import logger as acts_logger
+
+logger = acts_logger.create_logger()
+
+
+def on_emm_registered(callback):
+ """Registers a callback to watch for EMM attach events.
+
+ Args:
+ callback: a callback to be invoked on EMM attach events.
+
+ Returns:
+ cancel: a callback to deregister the event watcher.
+ """
+ from rs_mrt.testenvironment.signaling.sri.nas.eps.pubsub import EmmAttachPub
+ from rs_mrt.testenvironment.signaling.sri.nas.eps import EmmRegistrationState
+
+ def wrapped(msg):
+ logger.debug("CMX received EMM registration state: {}".format(
+ msg.registration_state))
+ if msg.registration_state in (
+ EmmRegistrationState.COMBINED_REGISTERED, ):
+ callback()
+
+ sub = EmmAttachPub.multi_subscribe(callback=wrapped)
+
+ return lambda: sub.unsubscribe()