blob: 3445e10dd0a3aec45c2d1ed80c3a062ee02e7680 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import os
import re
import telnetlib
import time
try:
# python 2
from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, build_opener
except ImportError:
# python 3
from urllib.request import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, build_opener
logger = logging.getLogger(__name__)
try:
from pysnmp.hlapi import SnmpEngine, CommunityData, UdpTransportTarget, ContextData, getCmd, setCmd, ObjectType, ObjectIdentity, Integer32
except ImportError:
logger.warning('PySNMP module is not installed. Install if EATON_PDU_CONTROLLER is used')
class PduController(object):
def open(self, **params):
"""Open PDU controller connection"""
raise NotImplementedError
def reboot(self, **params):
"""Reboot an outlet or a board passed as params"""
raise NotImplementedError
def close(self):
"""Close PDU controller connection"""
raise NotImplementedError
class DummyPduController(PduController):
"""Dummy implementation which only says that PDU controller is not connected"""
def open(self, **params):
pass
def reboot(self, **params):
logger.info('No PDU controller connected.')
def close(self):
pass
class ApcPduController(PduController):
def __init__(self):
self.tn = None
def __del__(self):
self.close()
def _init(self):
"""Initialize the telnet connection
"""
self.tn = telnetlib.Telnet(self.ip, self.port)
self.tn.read_until('User Name')
self.tn.write('apc\r\n')
self.tn.read_until('Password')
self.tn.write('apc\r\n')
self.until_done()
def open(self, **params):
"""Open telnet connection
Args:
params (dict), must contain two parameters "ip" - ip address or hostname and "port" - port number
Example:
params = {'port': 23, 'ip': 'localhost'}
"""
logger.info('opening telnet')
self.port = params['port']
self.ip = params['ip']
self.tn = None
self._init()
def close(self):
"""Close telnet connection"""
logger.info('closing telnet')
if self.tn:
self.tn.close()
def until_done(self):
"""Wait until the prompt encountered
"""
self.until(r'^>')
def until(self, regex):
"""Wait until the regex encountered
"""
logger.debug('waiting for %s', regex)
r = re.compile(regex, re.M)
self.tn.expect([r])
def reboot(self, **params):
"""Reboot outlet
Args:
params (dict), must contain parameter "outlet" - outlet number
Example:
params = {'outlet': 1}
"""
outlet = params['outlet']
# main menu
self.tn.write('\x1b\r\n')
self.until_done()
# Device Manager
self.tn.write('1\r\n')
self.until_done()
# Outlet Management
self.tn.write('2\r\n')
self.until_done()
# Outlet Control
self.tn.write('1\r\n')
self.until_done()
# Select outlet
self.tn.write('%d\r\n' % outlet)
self.until_done()
# Control
self.tn.write('1\r\n')
self.until_done()
# off
self.tn.write('2\r\n')
self.until('to cancel')
self.tn.write('YES\r\n')
self.until('to continue')
self.tn.write('\r\n')
self.until_done()
time.sleep(5)
# on
self.tn.write('1\r\n')
self.until('to cancel')
self.tn.write('YES\r\n')
self.until('to continue')
self.tn.write('\r\n')
self.until_done()
class NordicBoardPduController(PduController):
def open(self, **params):
pass
def _pin_reset(self, serial_number):
os.system('nrfjprog -f NRF52 --snr {} -p'.format(serial_number))
def reboot(self, **params):
boards_serial_numbers = params['boards_serial_numbers']
for serial_number in boards_serial_numbers:
logger.info('Resetting board with the serial number: %s', serial_number)
self._pin_reset(serial_number)
def close(self):
pass
class EatonPduController(PduController):
outlet_oid_cmd_get_state_base = '1.3.6.1.4.1.534.6.6.7.6.6.1.2.0.'
outlet_oid_cmd_set_on_base = '1.3.6.1.4.1.534.6.6.7.6.6.1.4.0.'
outlet_oid_cmd_set_off_base = '1.3.6.1.4.1.534.6.6.7.6.6.1.3.0.'
outlet_oid_cmd_reboot_base = '1.3.6.1.4.1.534.6.6.7.6.6.1.5.0.'
outlet_oid_cmd_set_reboot_delay_seconds_base = '1.3.6.1.4.1.534.6.6.7.6.6.1.8.0.'
PDU_COMMAND_TIMEOUT = 5
def open(self, **params):
missing_fields = ['ip', 'port']
missing_fields = [field for field in missing_fields if field not in params.keys()]
if missing_fields:
raise KeyError('Missing keys in PDU params: {}'.format(missing_fields))
self.params = params
self.type = 'pdu'
self.ip = self.params['ip']
self.snmp_agent_port = int(self.params['port'])
self._community = 'public'
self._snmp_engine = SnmpEngine()
self._community_data = CommunityData(self._community, mpModel=0)
self._udp_transport_target = UdpTransportTarget((self.ip, self.snmp_agent_port))
self._context = ContextData()
def _outlet_oid_get(self, param, socket):
"""
Translates command to the OID number representing a command for the specific power socket.
Args:
param (str), command string
socket (int), socket index
Return:
full OID identifying the SNMP object (str)
"""
parameters = {
'get_state': self.outlet_oid_cmd_get_state_base,
'set_on': self.outlet_oid_cmd_set_on_base,
'set_off': self.outlet_oid_cmd_set_off_base,
'set_reboot_delay': self.outlet_oid_cmd_set_reboot_delay_seconds_base,
'reboot': self.outlet_oid_cmd_reboot_base
}
return parameters[param.lower()] + str(socket)
# Performs set command to specific OID with a given value by sending a SNMP Set message.
def _oid_set(self, oid, value):
"""
Performs set command to specific OID with a given value by sending a SNMP Set message.
Args:
oid (str): Full OID identifying the object to be read.
value (int): Value to be written to the OID as Integer32.
"""
errorIndication, errorStatus, errorIndex, varBinds = next(
setCmd(self._snmp_engine, self._community_data, self._udp_transport_target, self._context,
ObjectType(ObjectIdentity(oid), Integer32(value))))
if errorIndication:
msg = 'Found PDU errorIndication: {}'.format(errorIndication)
logger.exception(msg)
raise RuntimeError(msg)
elif errorStatus:
msg = 'Found PDU errorStatus: {}'.format(errorStatus)
logger.exception(msg)
raise RuntimeError(msg)
def _oid_get(self, oid):
"""
Performs SNMP get command and returns OID value by sending a SNMP Get message.
Args:
oid (str): Full OID identifying the object to be read.
Return:
OID value (int)
"""
errorIndication, errorStatus, errorIndex, varBinds = next(
getCmd(self._snmp_engine, self._community_data, self._udp_transport_target, self._context,
ObjectType(ObjectIdentity(oid))))
if errorIndication:
msg = 'Found PDU errorIndication: {}'.format(errorIndication)
logger.exception(msg)
raise RuntimeError(msg)
elif errorStatus:
msg = 'Found PDU errorStatus: {}'.format(errorStatus)
logger.exception(msg)
raise RuntimeError(msg)
return int(str(varBinds[-1]).partition('= ')[-1])
def _outlet_value_set(self, cmd, socket, value=1):
"""
Sets outlet parameter value.
Args:
cmd (str): OID base
socket (int): socket index (last OID number)
value (int): value to be set
"""
oid = self._outlet_oid_get(cmd, socket)
# Values other than 1 does not make sense with commands other than "set_reboot_delay".
if cmd != 'set_reboot_delay':
value = 1
self._oid_set(oid, value)
def _outlet_value_get(self, cmd, socket):
"""
Read outlet parameter value.
Args:
cmd (str): OID base
socket (int): socket index (last OID number)
Return:
parameter value (int)
"""
oid = self._outlet_oid_get(cmd, socket)
return self._oid_get(oid)
def validate_state(self, socket, state):
return (self._outlet_value_get('get_state', socket) == state)
def turn_off(self, sockets):
"""
Turns the specified socket off.
Args:
sockets (list(int)): sockets to be turned off
"""
logger.info('Executing turn OFF for: {}'.format(sockets))
for socket in sockets:
self._outlet_value_set('set_off', socket)
time.sleep(2)
timeout = time.time() + self.PDU_COMMAND_TIMEOUT
while ((time.time() < timeout) and not self.validate_state(socket, 0)):
time.sleep(0.1)
if self.validate_state(socket, 0):
logger.debug('Turned OFF socket {} at {}'.format(socket, self.ip))
else:
logger.error('Failed to turn OFF socket {} at {}'.format(socket, self.ip))
def turn_on(self, sockets):
"""
Turns the specified socket on.
Args:
sockets (list(int)): sockets to be turned on
"""
logger.info('Executing turn ON for: {}'.format(sockets))
for socket in sockets:
self._outlet_value_set('set_on', socket)
time.sleep(2)
timeout = time.time() + self.PDU_COMMAND_TIMEOUT
while ((time.time() < timeout) and not self.validate_state(socket, 1)):
time.sleep(0.1)
if self.validate_state(socket, 1):
logger.debug('Turned ON socket {} at {}'.format(socket, self.ip))
else:
logger.error('Failed to turn ON socket {} at {}'.format(socket, self.ip))
def close(self):
self._community = None
self._snmp_engine = None
self._community_data = None
self._udp_transport_target = None
self._context = None
def reboot(self, **params):
"""
Reboots the sockets specified in the constructor with off and on delays.
Args:
sockets (list(int)): sockets to reboot
"""
logger.info('Executing power cycle for: {}'.format(params['sockets']))
self.turn_off(params['sockets'])
time.sleep(10)
self.turn_on(params['sockets'])
time.sleep(5)
class IpPowerSocketPduController(PduController):
def open(self, **params):
self._base_url = 'http://{}/outs.cgi?out'.format(params['ip'])
password_manager = HTTPPasswordMgrWithDefaultRealm()
password_manager.add_password(None, self._base_url, params['user'], params['pass'])
authentication_handler = HTTPBasicAuthHandler(password_manager)
self._opener = build_opener(authentication_handler)
def reboot(self, **params):
logger.info('Executing power cycle')
for socket in params['sockets']:
self._turn_off(socket)
time.sleep(2)
self._turn_on(socket)
time.sleep(2)
def _change_state(self, socket, state):
self._opener.open('{}{}={}'.format(self._base_url, socket, state))
def _turn_off(self, socket):
self._change_state(socket, 1)
def _turn_on(self, socket):
self._change_state(socket, 0)
def close(self):
self._base_url = None
self._opener = None
class ManualPduController(PduController):
def open(self, **kwargs):
pass
def reboot(self, **kwargs):
input('Reset all devices and press enter to continue..')
def close(self):
pass