blob: ccc7e74b07ebe36dafc3a83bfb794fc42943ecc8 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from antlion import utils
from antlion.controllers import pdu
import re
import telnetlib
import time
class PduDevice(pdu.PduDevice):
"""Implementation of pure abstract PduDevice object for the Synaccess np02b
Pdu.
"""
def __init__(self, host, username, password):
super(PduDevice, self).__init__(host, username, password)
self.tnhelper = _TNHelperNP02B(host)
def on_all(self):
""" Turns on both outlets on the np02b."""
self.tnhelper.cmd('ps 1')
self._verify_state({'1': True, '2': True})
def off_all(self):
""" Turns off both outlets on the np02b."""
self.tnhelper.cmd('ps 0')
self._verify_state({'1': False, '2': False})
def on(self, outlet):
""" Turns on specific outlet on the np02b.
Args:
outlet: string of the outlet to turn on ('1' or '2')
"""
self.tnhelper.cmd('pset %s 1' % outlet)
self._verify_state({outlet: True})
def off(self, outlet):
""" Turns off a specifc outlet on the np02b.
Args:
outlet: string of the outlet to turn off ('1' or '2')
"""
self.tnhelper.cmd('pset %s 0' % outlet)
self._verify_state({outlet: False})
def reboot(self, outlet):
""" Toggles a specific outlet on the np02b to off, then to on.
Args:
outlet: string of the outlet to reboot ('1' or '2')
"""
self.off(outlet)
self._verify_state({outlet: False})
self.on(outlet)
self._verify_state({outlet: True})
def status(self):
""" Returns the status of the np02b outlets.
Return:
a dict mapping outlet strings ('1' and '2') to:
True if outlet is ON
False if outlet is OFF
"""
res = self.tnhelper.cmd('pshow')
status_list = re.findall('(ON|OFF)', res)
status_dict = {}
for i, status in enumerate(status_list):
status_dict[str(i + 1)] = (status == 'ON')
return status_dict
def close(self):
"""Ensure connection to device is closed.
In this implementation, this shouldn't be necessary, but could be in
others that open on creation.
"""
self.tnhelper.close()
def _verify_state(self, expected_state, timeout=3):
"""Returns when expected_state is reached on device.
In order to prevent command functions from exiting until the desired
effect has occurred, this function verifys that the expected_state is a
subset of the desired state.
Args:
expected_state: a dict representing the expected state of one or
more outlets on the device. Maps outlet strings ('1' and/or '2')
to:
True if outlet is expected to be ON.
False if outlet is expected to be OFF.
timeout (default: 3): time in seconds until raising an exception.
Return:
True, if expected_state is reached.
Raises:
PduError if expected_state has not been reached by timeout.
"""
end_time = time.time() + timeout
while time.time() < end_time:
actual_state = self.status()
if expected_state.items() <= actual_state.items():
return True
time.sleep(.1)
raise pdu.PduError('Timeout while verifying state.\n'
'Expected State: %s\n'
'Actual State: %s' % (expected_state, actual_state))
class _TNHelperNP02B(object):
"""An internal helper class for Telnet with the Synaccess NP02B Pdu. This
helper is specific to the idiosyncrasies of the NP02B and therefore should
not be used with other devices.
"""
def __init__(self, host):
self._tn = telnetlib.Telnet()
self.host = host
self.tx_cmd_separator = '\n\r'
self.rx_cmd_separator = '\r\n'
self.prompt = '>'
"""
Executes a command on the device via telnet.
Args:
cmd_str: A string of the command to be run.
Returns:
A string of the response from the valid command (often empty).
"""
def cmd(self, cmd_str):
# Open session
try:
self._tn.open(self.host, timeout=3)
except:
raise pdu.PduError("Failed to open telnet session to host (%s)" %
self.host)
time.sleep(.1)
# Read to end of first prompt
cmd_str.strip(self.tx_cmd_separator)
self._tn.read_eager()
time.sleep(.1)
# Write command and read all output text
self._tn.write(utils.ascii_string(cmd_str + self.tx_cmd_separator))
res = self._tn.read_until(utils.ascii_string(self.prompt), 2)
# Parses out the commands output
if res is None:
raise pdu.PduError("Command failed: %s" % cmd_str)
res = res.decode()
if re.search('Invalid', res):
raise pdu.PduError("Command Invalid: %s" % cmd_str)
res = res.replace(self.prompt, '')
res = res.replace(self.tx_cmd_separator, '')
res = res.replace(self.rx_cmd_separator, '')
res = res.replace(cmd_str, '')
# Close session
self._tn.close()
time.sleep(0.5)
return res
def close(self):
self._tn.close()