blob: d721295644fcfeab861c5bd5a4f1c4af5da75e91 [file] [log] [blame]
import struct
from Monsoon import Operations as op
import ctypes
import platform
import usb.core
import usb.util
import os
import platform
import time
class USB_protocol(object):
"""Uses native python usb functions to communicate with the Power Monitor.
Best choice for connecting to a single Power Monitor."""
def __init__(self):
self.DEVICE = None
def enumerateDevices(self):
"""Returns a list of the serial numbers of all devices connected to the system.
Includes both HVPM LVPM hardware"""
results = []
devices = usb.core.find(find_all=True, idVendor = 0x2AB9, idProduct = 0x0001)
for device in devices:
results.append(str(device.serial_number))
return results
def reconnect(self,deviceType, serialno):
"""Reset the port and reconnect to the power monitor.
Useful for some cases"""
self.DEVICE.reset()
time.sleep(5)
#give device time to re-enumerate
#TODO: We should be able to replace this with waiting for the event that fires when
#A new device enumerates on the system.
self.Connect(deviceType,serialno)
def Connect(self,deviceType, serialno=None):
"""Connect to a Power Monitor.
deviceType = LVPM or HVPM
serialno = device serial number. If None, connect to the first device found"""
def device_matcher(d):
try:
return d.idVendor == 0x2AB9 and d.idProduct == 0x0001 and (serialno is None or d.serial_number == str(serialno))
except:#Catches some platform-specific errors when connecting to multiple PMs simultaneously.
return False
self.DEVICE = usb.core.find(custom_match=device_matcher)
if (self.DEVICE is None):
print('Unable to find device')
return
connectedDeviceType = self.getValue(op.OpCodes.HardwareModel,2)
if(connectedDeviceType != deviceType):
print('warning: Device type mismatch. Found ' + repr(connectedDeviceType) + " expected " + repr(deviceType))
firmwareRev = self.getValue(op.OpCodes.FirmwareVersion,1)
if(firmwareRev < op.ReturnCodes.CURRENT_FIRMWARE_REV):
print('Warning: Detected firmware revision ' + repr(firmwareRev) + ", current release is " + repr(op.ReturnCodes.CURRENT_FIRMWARE_REV))
# On Linux we need to detach usb HID first
if "Linux" == platform.system():
try:
self.DEVICE.detach_kernel_driver(0)
except:
pass # already unregistered
self.DEVICE.set_configuration()
cfg = self.DEVICE.get_active_configuration()
intf = cfg[(0,0)]
self.epBulkWriter = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT)
self.epBulkReader = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_IN)
def BulkRead(self):
return(self.DEVICE.read(0x81,64,timeout=1000))
def sendCommand(self,operation, value):
"""Send a USB Control transfer. Normally this is used to set an EEPROM value."""
if not self.verifyReady(operation):
self.stopSampling()
#TODO: We might smooth this behavior over later, but for now we want to explicitly fail if this occurs.
raise ValueError("Power Monitor Error, attempted to send a command while the unit is in Sample Mode.")
value = int(value)
value_array = struct.unpack("4B",struct.pack("I",value))
operation_array = struct.unpack("4b",struct.pack("I",operation))
wValue = struct.unpack("H",struct.pack("BB",value_array[0],value_array[1]))[0]
wIndex = struct.unpack("H",struct.pack("BB",operation_array[0],value_array[2]))[0]
self.DEVICE.ctrl_transfer(op.Control_Codes.USB_OUT_PACKET,op.Control_Codes.USB_SET_VALUE,wValue,wIndex,value_array,5000)
def stopSampling(self):
"""Send a control transfer instructing the Power Monitor to stop sampling."""
self.verifyReady(0x02)
self.DEVICE.ctrl_transfer(op.Control_Codes.USB_OUT_PACKET,op.Control_Codes.USB_REQUEST_STOP,0,0,0,5000)
def startSampling(self,calTime, maxTime):
"""Instruct the Power Monitor to enter sample mode.
calTime = Amount of time, in ms, between calibration samples.
maxTime = Number of samples to take before exiting sample mode automatically."""
if not self.verifyReady(0x02):
self.stopSampling()
raise ValueError("Power Monitor Error, attempted to start while already started.")
value_array = struct.unpack("4B",struct.pack("I",calTime))
maxtime_array = struct.unpack("4B",struct.pack("I",maxTime))
wValue = struct.unpack("H",struct.pack("BB",value_array[0],value_array[1]))[0]
wIndex = struct.unpack("H",struct.pack("BB",0,0))[0]
self.DEVICE.ctrl_transfer(op.Control_Codes.USB_OUT_PACKET,op.Control_Codes.USB_REQUEST_START,wValue,wIndex,maxtime_array,1000)
def resetToBootloader(self):
wValue = 0
wIndex = 0
wLength = 0
#This will cause a disconnect event, which throws an exception in libusb.
try:
self.DEVICE.ctrl_transfer(op.Control_Codes.USB_OUT_PACKET,op.Control_Codes.USB_REQUEST_RESET_TO_BOOTLOADER,wValue,wIndex,wLength,1000)
except:
print("Resetting to bootloader")
def getValue(self,operation,valueLength, signed = False):
"""Get an EEPROM value from the Power Monitor."""
operation_array = struct.unpack("4b",struct.pack("I",operation))
wIndex = struct.unpack("H",struct.pack("bb",operation_array[0],0))[0]
result = self.DEVICE.ctrl_transfer(op.Control_Codes.USB_IN_PACKET,op.Control_Codes.USB_SET_VALUE,0,wIndex,4,5000)
if(result == op.ReturnCodes.ERROR):
self.stopSampling()
raise ValueError("Error code returned. Attempted to query Power Monitor while in sample mode.")
if(valueLength == 4):
if(signed):
result = struct.unpack("i",result)[0]
else:
result = struct.unpack("I",result)[0]
elif(valueLength == 2):
if(signed):
result = struct.unpack("h",result[0:2])[0]
else:
result = struct.unpack("H",result[0:2])[0]
elif(valueLength == 1):
if(signed):
result = struct.unpack("b",result[0:1])[0]
else:
result = struct.unpack("B",result[0:1])[0]
return result
def closeDevice(self):
"""Cleanup any loose ends, if present."""
self.stopSampling()
self.DEVICE.reset() #Releases the bulk endpoint, which dispose_resources apparently doesn't release.
usb.util.dispose_resources(self.DEVICE)
def verifyReady(self,opcode):
"""Check whether we're currently in sample mode.
Some commands can cause errors if we are.
Current behavior checks for all opcodes, though there are some specific ones which will not return an error code."""
#Only check if the firmware version is 26 or greater
#getStartStatus command does not include 'Sample mode' bit before that revision.
firmwareRev = self.getValue(op.OpCodes.FirmwareVersion,1)
if(firmwareRev >= 26):
status = self.getValue(op.OpCodes.getStartStatus, 1)
return not 0x80 & status
else:
return True
class CPP_Backend_Protocol(object):
"""Uses C++ backend with libusb.
When connecting to multiple Power Monitors with the same computer, native Python code is often too slow, and results in many dropped samples.
This offloads the the time-critical parts of sample collection to C++, but isn't compatible with every OS yet.
"""
def __init__(self):
self.DEVICE = self.loadLibrary()
self.DEVICE.pySetup.argtypes = (ctypes.c_int, ctypes.c_int,ctypes.c_int)
self.DEVICE.pyStart.argtypes = (ctypes.c_int,ctypes.c_int)
self.DEVICE.pyGetBulkData.argtypes = (ctypes.c_int, ctypes.POINTER(ctypes.c_uint8))
self.DEVICE.pySendCommand.argtypes = (ctypes.c_ubyte, ctypes.c_int)
self.DEVICE.pyGetValue.argtypes = (ctypes.c_ubyte,ctypes.c_int)
self.queueSize = 1024
self.Queue = (ctypes.c_uint8*self.queueSize)()
ctypes.cast(self.Queue,ctypes.POINTER(ctypes.c_uint8))
def Connect(self,deviceType, serialno=None):
VID = 0x2AB9
PID = 0x0001
self.DEVICE.pySetup(VID,PID,serialno)
pass
def BulkRead(self):
self.DEVICE.pyGetBulkData(self.queueSize,self.Queue)
count = self.DEVICE.pyQueueCount()
result = self.Queue[0:count*64]
return result
def sendCommand(self,operation, value):
"""Send a USB Control transfer. Normally this is used to set an EEPROM value."""
self.DEVICE.pySendCommand(operation,int(value))
def stopSampling(self):
"""Send a control transfer instructing the Power Monitor to stop sampling."""
self.DEVICE.pyStop()
def startSampling(self,calTime, maxTime):
"""Instruct the Power Monitor to enter sample mode.
calTime = Amount of time, in ms, between calibration samples.
maxTime = Number of samples to take before exiting sample mode automatically."""
self.DEVICE.pyStart(calTime,maxTime)
def getValue(self,operation,valueLength):
"""Get an EEPROM value from the Power Monitor."""
return self.DEVICE.pyGetValue(operation,valueLength)
def closeDevice(self):
"""Cleanup any loose ends, if present."""
self.DEVICE.pyClose();
pass
def loadLibrary(self):
"""Tries to load the library in an OS-neutral way."""
#TODO: Currently we're distributing pre-compiled .dll and .so files.
#The proper way to do this will be including a makefile and compiling these libraries on installation.
path = os.path.abspath(__file__)
path = os.path.realpath(path)
path = os.path.dirname(path)
if(platform.system() == "Linux"):
libLocation=os.path.join(path,"Compiled/Linux/libcpp_backend.so")
elif(platform.system() == "Windows"):
libLocation = os.path.join(path,"Compiled//WIN32//Cpp_backend.dll")
else:
raise NotImplementedError("OS not currently supported.")
test = ctypes.CDLL(libLocation)
return test
def reconnect(self):
raise NotImplementedError
def findAllSerialNumbers(self):
raise NotImplementedError