| #!/usr/bin/env python |
| """Enhanced Serial Port class |
| part of pyserial (http://pyserial.sf.net) (C)2002 cliechti@gmx.net |
| |
| another implementation of the readline and readlines method. |
| this one should be more efficient because a bunch of characters are read |
| on each access, but the drawback is that a timeout must be specified to |
| make it work (enforced by the class __init__). |
| |
| this class could be enhanced with a read_until() method and more |
| like found in the telnetlib. |
| """ |
| |
| from serial import Serial |
| |
| class EnhancedSerial(Serial): |
| def __init__(self, *args, **kwargs): |
| #ensure that a reasonable timeout is set |
| timeout = kwargs.get('timeout',0.1) |
| if timeout < 0.01: timeout = 0.1 |
| kwargs['timeout'] = timeout |
| Serial.__init__(self, *args, **kwargs) |
| self.buf = '' |
| |
| def readline(self, maxsize=None, timeout=1): |
| """maxsize is ignored, timeout in seconds is the max time that is way for a complete line""" |
| tries = 0 |
| while 1: |
| self.buf += self.read(512) |
| pos = self.buf.find('\n') |
| if pos >= 0: |
| line, self.buf = self.buf[:pos+1], self.buf[pos+1:] |
| return line |
| tries += 1 |
| if tries * self.timeout > timeout: |
| break |
| line, self.buf = self.buf, '' |
| return line |
| |
| def readlines(self, sizehint=None, timeout=1): |
| """read all lines that are available. abort after timout |
| when no more data arrives.""" |
| lines = [] |
| while 1: |
| line = self.readline(timeout=timeout) |
| if line: |
| lines.append(line) |
| if not line or line[-1:] != '\n': |
| break |
| return lines |
| |
| if __name__=='__main__': |
| #do some simple tests with a Loopback HW (see test.py for details) |
| PORT = 0 |
| #test, only with Loopback HW (shortcut RX/TX pins (3+4 on DSUB 9 and 25) ) |
| s = EnhancedSerial(PORT) |
| #write out some test data lines |
| s.write('\n'.join("hello how are you".split())) |
| #and read them back |
| print s.readlines() |
| #this one should print an empty list |
| print s.readlines(timeout=0.4) |