blob: ef232961f54cefbf54eb9586d8994fbb9675f2df [file] [log] [blame]
#!/usr/bin/env python3.4
# Copyright 2022 The Fuchsia Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import logging
from antlion.keys import Config
from antlion.libs.proc import job
def create(configs):
objs = []
for c in configs:
attn_model = c["Model"]
# Default to telnet.
protocol = c.get("Protocol", "telnet")
module_name = f"antlion.controllers.attenuator_lib.{attn_model}.{protocol}"
module = importlib.import_module(module_name)
inst_cnt = c["InstrumentCount"]
attn_inst = module.AttenuatorInstrument(inst_cnt)
attn_inst.model = attn_model
ip_address = c[Config.key_address.value]
port = c[Config.key_port.value]
for attempt_number in range(1, _ATTENUATOR_OPEN_RETRIES + 1):
try:, port)
except Exception as e:
"Attempt %s to open connection to attenuator "
"failed: %s" % (attempt_number, e)
if attempt_number == _ATTENUATOR_OPEN_RETRIES:
ping_output =
f"ping {ip_address} -c 1 -w 1", ignore_status=True
if ping_output.exit_status == 1:
logging.error(f"Unable to ping attenuator at {ip_address}")
logging.error(f"Able to ping attenuator at {ip_address}")
f'echo "q" | telnet {ip_address} {port}',
for i in range(inst_cnt):
attn = Attenuator(attn_inst, idx=i)
if "Paths" in c:
setattr(attn, "path", c["Paths"][i])
except IndexError:
logging.error("No path specified for attenuator %d.", i)
return objs
def get_info(attenuators):
"""Get information on a list of Attenuator objects.
attenuators: A list of Attenuator objects.
A list of dict, each representing info for Attenuator objects.
device_info = []
for attenuator in attenuators:
info = {
"Address": attenuator.instrument.address,
"Attenuator_Port": attenuator.idx,
return device_info
def destroy(objs):
for attn in objs:
def get_attenuators_for_device(device_attenuator_configs, attenuators, attenuator_key):
"""Gets the list of attenuators associated to a specified device and builds
a list of the attenuator objects associated to the ip address in the
device's section of the ACTS config and the Attenuator's IP address. In the
example below the access point object has an attenuator dictionary with
IP address associated to an attenuator object. The address is the only
mandatory field and the 'attenuator_ports_wifi_2g' and
'attenuator_ports_wifi_5g' are the attenuator_key specified above. These
can be anything and is sent in as a parameter to this function. The numbers
in the list are ports that are in the attenuator object. Below is an
standard Access_Point object and the link to a standard Attenuator object.
Notice the link is the IP address, which is why the IP address is mandatory.
"AccessPoint": [
"ssh_config": {
"user": "root",
"host": ""
"Attenuator": [
"Address": "",
"attenuator_ports_wifi_2g": [
"attenuator_ports_wifi_5g": [
"Attenuator": [
"Model": "minicircuits",
"InstrumentCount": 4,
"Address": "",
"Port": 23
device_attenuator_configs: A list of attenuators config information in
the acts config that are associated a particular device.
attenuators: A list of all of the available attenuators objects
in the testbed.
attenuator_key: A string that is the key to search in the device's
A list of attenuator objects for the specified device and the key in
that device's config.
attenuator_list = []
for device_attenuator_config in device_attenuator_configs:
for attenuator_port in device_attenuator_config[attenuator_key]:
for attenuator in attenuators:
if (
attenuator.instrument.address == device_attenuator_config["Address"]
and attenuator.idx is attenuator_port
return attenuator_list
"""Classes for accessing, managing, and manipulating attenuators.
Users will instantiate a specific child class, but almost all operation should
be performed on the methods and data members defined here in the base classes
or the wrapper classes.
class AttenuatorError(Exception):
"""Base class for all errors generated by Attenuator-related modules."""
class InvalidDataError(AttenuatorError):
""" "Raised when an unexpected result is seen on the transport layer.
When this exception is seen, closing an re-opening the link to the
attenuator instrument is probably necessary. Something has gone wrong in
the transport.
class InvalidOperationError(AttenuatorError):
"""Raised when the attenuator's state does not allow the given operation.
Certain methods may only be accessed when the instance upon which they are
invoked is in a certain state. This indicates that the object is not in the
correct state for a method to be called.
class AttenuatorInstrument(object):
"""Defines the primitive behavior of all attenuator instruments.
The AttenuatorInstrument class is designed to provide a simple low-level
interface for accessing any step attenuator instrument comprised of one or
more attenuators and a controller. All AttenuatorInstruments should override
all the methods below and call AttenuatorInstrument.__init__ in their
constructors. Outside of setup/teardown, devices should be accessed via
this generic "interface".
model = None
def __init__(self, num_atten=0):
"""This is the Constructor for Attenuator Instrument.
num_atten: The number of attenuators contained within the
instrument. In some instances setting this number to zero will
allow the driver to auto-determine the number of attenuators;
however, this behavior is not guaranteed.
NotImplementedError if initialization is called from this class.
if type(self) is AttenuatorInstrument:
raise NotImplementedError("Base class should not be instantiated directly!")
self.num_atten = num_atten
self.max_atten = AttenuatorInstrument.INVALID_MAX_ATTEN = None
def set_atten(self, idx, value, strict=True, retry=False):
"""Sets the attenuation given its index in the instrument.
idx: A zero based index used to identify a particular attenuator in
an instrument.
value: a floating point value for nominal attenuation to be set.
strict: if True, function raises an error when given out of
bounds attenuation values, if false, the function sets out of
bounds values to 0 or max_atten.
retry: if True, command will be retried if possible
raise NotImplementedError("Base class should not be called directly!")
def get_atten(self, idx, retry=False):
"""Returns the current attenuation of the attenuator at index idx.
idx: A zero based index used to identify a particular attenuator in
an instrument.
retry: if True, command will be retried if possible
The current attenuation value as a floating point value
raise NotImplementedError("Base class should not be called directly!")
class Attenuator(object):
"""An object representing a single attenuator in a remote instrument.
A user wishing to abstract the mapping of attenuators to physical
instruments should use this class, which provides an object that abstracts
the physical implementation and allows the user to think only of attenuators
regardless of their location.
def __init__(self, instrument, idx=0, offset=0):
"""This is the constructor for Attenuator
instrument: Reference to an AttenuatorInstrument on which the
Attenuator resides
idx: This zero-based index is the identifier for a particular
attenuator in an instrument.
offset: A power offset value for the attenuator to be used when
performing future operations. This could be used for either
calibration or to allow group operations with offsets between
various attenuators.
TypeError if an invalid AttenuatorInstrument is passed in.
IndexError if the index is out of range.
if not isinstance(instrument, AttenuatorInstrument):
raise TypeError("Must provide an Attenuator Instrument Ref")
self.model = instrument.model
self.instrument = instrument
self.idx = idx
self.offset = offset
if self.idx >= instrument.num_atten:
raise IndexError("Attenuator index out of range for attenuator instrument")
def set_atten(self, value, strict=True, retry=False):
"""Sets the attenuation.
value: A floating point value for nominal attenuation to be set.
strict: if True, function raises an error when given out of
bounds attenuation values, if false, the function sets out of
bounds values to 0 or max_atten.
retry: if True, command will be retried if possible
ValueError if value + offset is greater than the maximum value.
if value + self.offset > self.instrument.max_atten and strict:
raise ValueError("Attenuator Value+Offset greater than Max Attenuation!")
self.idx, value + self.offset, strict=strict, retry=retry
def get_atten(self, retry=False):
"""Returns the attenuation as a float, normalized by the offset."""
return self.instrument.get_atten(self.idx, retry) - self.offset
def get_max_atten(self):
"""Returns the max attenuation as a float, normalized by the offset."""
if self.instrument.max_atten == AttenuatorInstrument.INVALID_MAX_ATTEN:
raise ValueError("Invalid Max Attenuator Value")
return self.instrument.max_atten - self.offset
class AttenuatorGroup(object):
"""An abstraction for groups of attenuators that will share behavior.
Attenuator groups are intended to further facilitate abstraction of testing
functions from the physical objects underlying them. By adding attenuators
to a group, it is possible to operate on functional groups that can be
thought of in a common manner in the test. This class is intended to provide
convenience to the user and avoid re-implementation of helper functions and
small loops scattered throughout user code.
def __init__(self, name=""):
"""This constructor for AttenuatorGroup
name: An optional parameter intended to further facilitate the
passing of easily tracked groups of attenuators throughout code.
It is left to the user to use the name in a way that meets their
""" = name
self.attens = []
self._value = 0
def add_from_instrument(self, instrument, indices):
"""Adds an AttenuatorInstrument to the group.
This function will create Attenuator objects for all of the indices
passed in and add them to the group.
instrument: the AttenuatorInstrument to pull attenuators from.
indices: The index or indices to add to the group. Either a
range, a list, or a single integer.
Requires a valid AttenuatorInstrument to be passed in.
if not instrument or not isinstance(instrument, AttenuatorInstrument):
raise TypeError("Must provide an Attenuator Instrument Ref")
if type(indices) is range or type(indices) is list:
for i in indices:
self.attens.append(Attenuator(instrument, i))
elif type(indices) is int:
self.attens.append(Attenuator(instrument, indices))
def add(self, attenuator):
"""Adds an already constructed Attenuator object to this group.
attenuator: An Attenuator object.
TypeError if the attenuator parameter is not an Attenuator.
if not isinstance(attenuator, Attenuator):
raise TypeError("Must provide an Attenuator")
def synchronize(self):
"""Sets all grouped attenuators to the group's attenuation value."""
def is_synchronized(self):
"""Returns true if all attenuators have the synchronized value."""
for att in self.attens:
if att.get_atten() != self._value:
return False
return True
def set_atten(self, value):
"""Sets the attenuation value of all attenuators in the group.
value: A floating point value for nominal attenuation to be set.
value = float(value)
for att in self.attens:
self._value = value
def get_atten(self):
"""Returns the current attenuation setting of AttenuatorGroup."""
return float(self._value)