Re-indent to 4 space PEP-8 indentation.
Our internal Python style is 2 space indents, that is rightfully
considered ugly in an open source code base. Fixed.
diff --git a/setup.py b/setup.py
index 58e58c1..d6ec9d2 100644
--- a/setup.py
+++ b/setup.py
@@ -6,48 +6,48 @@
def main():
- requires = []
- scripts = []
- py_version = sys.version_info[:2]
- if py_version < (3, 3):
- requires.append('mock(>=1.0)')
- if py_version == (3, 3):
- requires.append('asyncio(>=3.4)')
- if py_version >= (3, 3):
- # The example portserver implementation requires Python 3 and asyncio.
- scripts.append('src/portserver.py')
+ requires = []
+ scripts = []
+ py_version = sys.version_info[:2]
+ if py_version < (3, 3):
+ requires.append('mock(>=1.0)')
+ if py_version == (3, 3):
+ requires.append('asyncio(>=3.4)')
+ if py_version >= (3, 3):
+ # The example portserver implementation requires Python 3 and asyncio.
+ scripts.append('src/portserver.py')
- distutils.core.setup(
- name='portpicker',
- version='1.0.0',
- description='A library to choose unique available network ports.',
- long_description=textwrap.dedent("""
+ distutils.core.setup(
+ name='portpicker',
+ version='1.0.0',
+ description='A library to choose unique available network ports.',
+ long_description=textwrap.dedent("""
Portpicker provides an API to find and return an available network
port for an application to bind to. Ideally suited for use from
unittests or for test harnesses that launch a local server."""),
- license='Apache 2.0',
- maintainer='Google',
- url='https://github.com/google/python_portpicker',
- package_dir={'': 'src'},
- py_modules=['portpicker'],
- platforms=['POSIX'],
- requires=requires,
- scripts=scripts,
- classifiers=[
- 'Development Status :: 5 - Production/Stable',
- 'License :: OSI Approved :: Apache Software License',
- 'Intended Audience :: Developers',
- 'Programming Language :: Python',
- 'Programming Language :: Python :: 2',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 3',
- 'Programming Language :: Python :: 3.3',
- 'Programming Language :: Python :: 3.4',
- 'Programming Language :: Python :: Implementation :: CPython',
- 'Programming Language :: Python :: Implementation :: Jython',
- 'Programming Language :: Python :: Implementation :: PyPy']
- )
+ license='Apache 2.0',
+ maintainer='Google',
+ url='https://github.com/google/python_portpicker',
+ package_dir={'': 'src'},
+ py_modules=['portpicker'],
+ platforms=['POSIX'],
+ requires=requires,
+ scripts=scripts,
+ classifiers=[
+ 'Development Status :: 5 - Production/Stable',
+ 'License :: OSI Approved :: Apache Software License',
+ 'Intended Audience :: Developers',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.3',
+ 'Programming Language :: Python :: 3.4',
+ 'Programming Language :: Python :: Implementation :: CPython',
+ 'Programming Language :: Python :: Implementation :: Jython',
+ 'Programming Language :: Python :: Implementation :: PyPy']
+ )
if __name__ == '__main__':
- main()
+ main()
diff --git a/src/portpicker.py b/src/portpicker.py
old mode 100755
new mode 100644
index 5ea9f0c..e20ac00
--- a/src/portpicker.py
+++ b/src/portpicker.py
@@ -47,153 +47,154 @@
def Bind(port, socket_type, socket_proto):
- """Try to bind to a socket of the specified type, protocol, and port.
+ """Try to bind to a socket of the specified type, protocol, and port.
- This is primarily a helper function for PickUnusedPort, used to see
- if a particular port number is available.
+ This is primarily a helper function for PickUnusedPort, used to see
+ if a particular port number is available.
- Args:
- port: The port number to bind to, or 0 to have the OS pick a free port.
- socket_type: The type of the socket (ex: socket.SOCK_STREAM).
- socket_proto: The protocol of the socket (ex: socket.IPPROTO_TCP).
+ Args:
+ port: The port number to bind to, or 0 to have the OS pick a free port.
+ socket_type: The type of the socket (ex: socket.SOCK_STREAM).
+ socket_proto: The protocol of the socket (ex: socket.IPPROTO_TCP).
- Returns:
- The port number on success or None on failure.
- """
- s = socket.socket(socket.AF_INET, socket_type, socket_proto)
- try:
+ Returns:
+ The port number on success or None on failure.
+ """
+ s = socket.socket(socket.AF_INET, socket_type, socket_proto)
try:
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind(('', port))
- return s.getsockname()[1]
- except socket.error:
- return None
- finally:
- s.close()
+ try:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind(('', port))
+ return s.getsockname()[1]
+ except socket.error:
+ return None
+ finally:
+ s.close()
def IsPortFree(port):
- """Check if specified port is free.
+ """Check if specified port is free.
- Args:
- port: integer, port to check
- Returns:
- boolean, whether it is free to use for both TCP and UDP
- """
- return (Bind(port, _PROTOS[0][0], _PROTOS[0][1]) and
- Bind(port, _PROTOS[1][0], _PROTOS[1][1]))
+ Args:
+ port: integer, port to check
+ Returns:
+ boolean, whether it is free to use for both TCP and UDP
+ """
+ return (Bind(port, _PROTOS[0][0], _PROTOS[0][1]) and
+ Bind(port, _PROTOS[1][0], _PROTOS[1][1]))
def PickUnusedPort(pid=None):
- """A pure python implementation of PickUnusedPort.
+ """A pure python implementation of PickUnusedPort.
- Args:
- pid: PID to tell the portserver to associate the reservation with. If None,
- the current process's PID is used.
+ Args:
+ pid: PID to tell the portserver to associate the reservation with. If
+ None,
+ the current process's PID is used.
- Returns:
- A port number that is unused on both TCP and UDP.
- """
- port = None
- # Provide access to the portserver on an opt-in basis.
- if 'PORTSERVER_ADDRESS' in os.environ:
- port = GetPortFromPortServer(os.environ['PORTSERVER_ADDRESS'], pid=pid)
- if not port:
- return _PickUnusedPortWithoutServer()
- return port
+ Returns:
+ A port number that is unused on both TCP and UDP.
+ """
+ port = None
+ # Provide access to the portserver on an opt-in basis.
+ if 'PORTSERVER_ADDRESS' in os.environ:
+ port = GetPortFromPortServer(os.environ['PORTSERVER_ADDRESS'], pid=pid)
+ if not port:
+ return _PickUnusedPortWithoutServer()
+ return port
def _PickUnusedPortWithoutServer():
- """A pure python implementation of PickUnusedPort_NoServer().
+ """A pure python implementation of PickUnusedPort_NoServer().
- This code ensures that the port is available on both TCP and UDP.
+ This code ensures that the port is available on both TCP and UDP.
- This function is an implementation detail of PickUnusedPort(), and
- should not be called by code outside of this module.
+ This function is an implementation detail of PickUnusedPort(), and
+ should not be called by code outside of this module.
- Returns:
- A port number that is unused on both TCP and UDP. None on error.
- """
- # Try random ports first.
- r = random.Random()
- for _ in range(10):
- port = int(r.randrange(32768, 60000))
- if IsPortFree(port):
- return port
+ Returns:
+ A port number that is unused on both TCP and UDP. None on error.
+ """
+ # Try random ports first.
+ r = random.Random()
+ for _ in range(10):
+ port = int(r.randrange(32768, 60000))
+ if IsPortFree(port):
+ return port
- # Try OS-assigned ports next.
- # Ambrose discovered that on the 2.6 kernel, calling Bind() on UDP socket
- # returns the same port over and over. So always try TCP first.
- while True:
- # Ask the OS for an unused port.
- port = Bind(0, _PROTOS[0][0], _PROTOS[0][1])
- # Check if this port is unused on the other protocol.
- if port and Bind(port, _PROTOS[1][0], _PROTOS[1][1]):
- return port
+ # Try OS-assigned ports next.
+ # Ambrose discovered that on the 2.6 kernel, calling Bind() on UDP socket
+ # returns the same port over and over. So always try TCP first.
+ while True:
+ # Ask the OS for an unused port.
+ port = Bind(0, _PROTOS[0][0], _PROTOS[0][1])
+ # Check if this port is unused on the other protocol.
+ if port and Bind(port, _PROTOS[1][0], _PROTOS[1][1]):
+ return port
def GetPortFromPortServer(portserver_address, pid=None):
- """Request a free a port from a system-wide portserver.
+ """Request a free a port from a system-wide portserver.
- This follows a very simple portserver protocol:
- The request consists of our pid (in ASCII) followed by a newline.
- The response is a port number and a newline, 0 on failure.
+ This follows a very simple portserver protocol:
+ The request consists of our pid (in ASCII) followed by a newline.
+ The response is a port number and a newline, 0 on failure.
- This function is an implementation detail of PickUnusedPort(), and
- should not normally be called by code outside of this module.
+ This function is an implementation detail of PickUnusedPort(), and
+ should not normally be called by code outside of this module.
- Args:
- portserver_address: The address (path) of a unix domain socket
- with which to connect to the portserver. A leading '@'
- character indicates an address in the "abstract namespace."
- pid: The PID to tell the portserver to associate the reservation with.
- If None, the current process's PID is used.
+ Args:
+ portserver_address: The address (path) of a unix domain socket
+ with which to connect to the portserver. A leading '@'
+ character indicates an address in the "abstract namespace."
+ pid: The PID to tell the portserver to associate the reservation with.
+ If None, the current process's PID is used.
- Returns:
- The port number on success or None on failure.
- """
- if not portserver_address:
- return None
- # An AF_UNIX address may start with a zero byte, in which case it is in the
- # "abstract namespace", and doesn't have any filesystem representation.
- # See 'man 7 unix' for details.
- # The convention is to write '@' in the address to represent this zero byte.
- if portserver_address[0] == '@':
- portserver_address = '\0' + portserver_address[1:]
+ Returns:
+ The port number on success or None on failure.
+ """
+ if not portserver_address:
+ return None
+ # An AF_UNIX address may start with a zero byte, in which case it is in the
+ # "abstract namespace", and doesn't have any filesystem representation.
+ # See 'man 7 unix' for details.
+ # The convention is to write '@' in the address to represent this zero byte.
+ if portserver_address[0] == '@':
+ portserver_address = '\0' + portserver_address[1:]
- if pid is None:
- pid = os.getpid()
+ if pid is None:
+ pid = os.getpid()
- try:
- # Create socket.
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
- # Connect to portserver.
- sock.connect(portserver_address)
+ # Create socket.
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ # Connect to portserver.
+ sock.connect(portserver_address)
- # Write request.
- sock.sendall(('%d\n' % pid).encode('ascii'))
+ # Write request.
+ sock.sendall(('%d\n' % pid).encode('ascii'))
- # Read response.
- # 1K should be ample buffer space.
- buf = sock.recv(1024)
- finally:
- sock.close()
- except socket.error:
- print ('Socket error when connecting to portserver.', file=sys.stderr)
- return None
+ # Read response.
+ # 1K should be ample buffer space.
+ buf = sock.recv(1024)
+ finally:
+ sock.close()
+ except socket.error:
+ print ('Socket error when connecting to portserver.', file=sys.stderr)
+ return None
- try:
- return int(buf.split(b'\n')[0])
- except ValueError:
- print ('Portserver failed to find a port.', file=sys.stderr)
- return None
+ try:
+ return int(buf.split(b'\n')[0])
+ except ValueError:
+ print ('Portserver failed to find a port.', file=sys.stderr)
+ return None
if __name__ == '__main__':
- # If passed an argument, cast it to int and treat it as a PID, otherwise pass
- # pid=None to use portpicker's PID.
- port = PickUnusedPort(pid=int(sys.argv[1]) if len(sys.argv) > 1 else None)
- if not port:
- sys.exit(1)
- print (port)
+ # If passed an argument, cast it to int and treat it as a PID, otherwise pass
+ # pid=None to use portpicker's PID.
+ port = PickUnusedPort(pid=int(sys.argv[1]) if len(sys.argv) > 1 else None)
+ if not port:
+ sys.exit(1)
+ print (port)
diff --git a/src/portserver.py b/src/portserver.py
old mode 100755
new mode 100644
index f24ab19..c4d3714
--- a/src/portserver.py
+++ b/src/portserver.py
@@ -41,294 +41,296 @@
def _get_process_command_line(pid):
- try:
- with open('/proc/{}/cmdline'.format(pid), 'rt') as cmdline_f:
- return cmdline_f.read()
- except IOError:
- return ''
+ try:
+ with open('/proc/{}/cmdline'.format(pid), 'rt') as cmdline_f:
+ return cmdline_f.read()
+ except IOError:
+ return ''
def _get_process_start_time(pid):
- try:
- with open('/proc/{}/stat'.format(pid), 'rt') as pid_stat_f:
- return int(pid_stat_f.readline().split()[21])
- except IOError:
- return 0
+ try:
+ with open('/proc/{}/stat'.format(pid), 'rt') as pid_stat_f:
+ return int(pid_stat_f.readline().split()[21])
+ except IOError:
+ return 0
def _port_is_available(port):
- """Return False if the given network port is currently in use."""
- for socket_type, proto in ((socket.SOCK_STREAM, socket.IPPROTO_TCP),
- (socket.SOCK_DGRAM, 0)):
- sock = None
- try:
- sock = socket.socket(socket.AF_INET, socket_type, proto)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('', port))
- if socket_type == socket.SOCK_STREAM:
- sock.listen(1)
- except socket.error:
- return False
- finally:
- if sock:
- sock.close()
- return True
+ """Return False if the given network port is currently in use."""
+ for socket_type, proto in ((socket.SOCK_STREAM, socket.IPPROTO_TCP),
+ (socket.SOCK_DGRAM, 0)):
+ sock = None
+ try:
+ sock = socket.socket(socket.AF_INET, socket_type, proto)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('', port))
+ if socket_type == socket.SOCK_STREAM:
+ sock.listen(1)
+ except socket.error:
+ return False
+ finally:
+ if sock:
+ sock.close()
+ return True
def _should_allocate_port(pid):
- """Determine if we should allocate a port for use by the given process id."""
- if pid <= 0:
- log.info('Not allocating a port to invalid pid')
- return False
- if pid == 1:
- # The client probably meant to send us its parent pid but
- # had been reparented to init.
- log.info('Not allocating a port to init.')
- return False
- try:
- os.kill(pid, 0)
- except ProcessLookupError:
- log.info('Not allocating a port to a non-existent process')
- return False
- return True
+ """Determine if we should allocate a port for use by the given process id."""
+ if pid <= 0:
+ log.info('Not allocating a port to invalid pid')
+ return False
+ if pid == 1:
+ # The client probably meant to send us its parent pid but
+ # had been reparented to init.
+ log.info('Not allocating a port to init.')
+ return False
+ try:
+ os.kill(pid, 0)
+ except ProcessLookupError:
+ log.info('Not allocating a port to a non-existent process')
+ return False
+ return True
class _PortInfo(object):
- """Container class for information about a given port assignment.
+ """Container class for information about a given port assignment.
- Attributes:
- port: integer port number
- pid: integer process id or 0 if unassigned.
- start_time: Time in seconds since the epoch that the process started.
- """
+ Attributes:
+ port: integer port number
+ pid: integer process id or 0 if unassigned.
+ start_time: Time in seconds since the epoch that the process started.
+ """
- __slots__ = ('port', 'pid', 'start_time')
+ __slots__ = ('port', 'pid', 'start_time')
- def __init__(self, port):
- self.port = port
- self.pid = 0
- self.start_time = 0
+ def __init__(self, port):
+ self.port = port
+ self.pid = 0
+ self.start_time = 0
class _PortPool(object):
- """Manage available ports for processes.
+ """Manage available ports for processes.
- Ports are reclaimed when the reserving process exits and the reserved port
- is no longer in use. Only ports which are free for both TCP and UDP will be
- handed out. It is easier to not differentiate between protocols.
+ Ports are reclaimed when the reserving process exits and the reserved port
+ is no longer in use. Only ports which are free for both TCP and UDP will be
+ handed out. It is easier to not differentiate between protocols.
- The pool must be pre-seeded with add_port_to_free_pool() calls
- after which get_port_for_process() will allocate and reclaim ports.
- The len() of a _PortPool returns the total number of ports being managed.
+ The pool must be pre-seeded with add_port_to_free_pool() calls
+ after which get_port_for_process() will allocate and reclaim ports.
+ The len() of a _PortPool returns the total number of ports being managed.
- Attributes:
- ports_checked_for_last_request: The number of ports examined in order to
- return from the most recent get_port_for_process() request. A high
- number here likely means the number of available ports with no active
- process using them is getting low.
- """
+ Attributes:
+ ports_checked_for_last_request: The number of ports examined in order to
+ return from the most recent get_port_for_process() request. A high
+ number here likely means the number of available ports with no active
+ process using them is getting low.
+ """
- def __init__(self):
- self._port_queue = collections.deque()
- self.ports_checked_for_last_request = 0
+ def __init__(self):
+ self._port_queue = collections.deque()
+ self.ports_checked_for_last_request = 0
- def num_ports(self):
- return len(self._port_queue)
+ def num_ports(self):
+ return len(self._port_queue)
- def get_port_for_process(self, pid):
- """Allocates and returns port for pid or 0 if none could be allocated."""
- if not self._port_queue:
- raise RuntimeError('No ports being managed.')
+ def get_port_for_process(self, pid):
+ """Allocates and returns port for pid or 0 if none could be allocated."""
+ if not self._port_queue:
+ raise RuntimeError('No ports being managed.')
- # Avoid an infinite loop if all ports are currently assigned.
- check_count = 0
- max_ports_to_test = len(self._port_queue)
- while check_count < max_ports_to_test:
- # Get the next candidate port and move it to the back of the queue.
- candidate = self._port_queue.pop()
- self._port_queue.appendleft(candidate)
- check_count += 1
- if (candidate.start_time == 0 or
- candidate.start_time != _get_process_start_time(candidate.pid)):
- if _port_is_available(candidate.pid):
- candidate.pid = pid
- candidate.start_time = _get_process_start_time(pid)
- if not candidate.start_time:
- log.info("Can't read start time for pid %d.", pid)
- self.ports_checked_for_last_request = check_count
- return candidate.port
- else:
- log.info('Port %d unexpectedly in use, last owning pid %d.',
- candidate.port, candidate.pid)
+ # Avoid an infinite loop if all ports are currently assigned.
+ check_count = 0
+ max_ports_to_test = len(self._port_queue)
+ while check_count < max_ports_to_test:
+ # Get the next candidate port and move it to the back of the queue.
+ candidate = self._port_queue.pop()
+ self._port_queue.appendleft(candidate)
+ check_count += 1
+ if (candidate.start_time == 0 or
+ candidate.start_time != _get_process_start_time(candidate.pid)):
+ if _port_is_available(candidate.pid):
+ candidate.pid = pid
+ candidate.start_time = _get_process_start_time(pid)
+ if not candidate.start_time:
+ log.info("Can't read start time for pid %d.", pid)
+ self.ports_checked_for_last_request = check_count
+ return candidate.port
+ else:
+ log.info('Port %d unexpectedly in use, last owning pid %d.',
+ candidate.port, candidate.pid)
- log.info('All ports in use.')
- self.ports_checked_for_last_request = check_count
- return 0
+ log.info('All ports in use.')
+ self.ports_checked_for_last_request = check_count
+ return 0
- def add_port_to_free_pool(self, port):
- """Add a new port to the free pool for allocation."""
- if port < 1 or port > 65535:
- raise ValueError('Port must be in the [1, 65535] range, not %d.' % port)
- port_info = _PortInfo(port=port)
- self._port_queue.append(port_info)
+ def add_port_to_free_pool(self, port):
+ """Add a new port to the free pool for allocation."""
+ if port < 1 or port > 65535:
+ raise ValueError(
+ 'Port must be in the [1, 65535] range, not %d.' % port)
+ port_info = _PortInfo(port=port)
+ self._port_queue.append(port_info)
class _PortServerRequestHandler(object):
- """A class to handle port allocation and status requests.
+ """A class to handle port allocation and status requests.
- Allocates ports to process ids via the dead simple port server protocol
- when the handle_port_request asyncio.coroutine handler has been registered.
- Statistics can be logged using the dump_stats method.
- """
-
- def __init__(self, ports_to_serve):
- """Initialize a new port server.
-
- Args:
- ports_to_serve: A sequence of unique port numbers to test and offer
- up to clients.
+ Allocates ports to process ids via the dead simple port server protocol
+ when the handle_port_request asyncio.coroutine handler has been registered.
+ Statistics can be logged using the dump_stats method.
"""
- self._port_pool = _PortPool()
- self._total_allocations = 0
- self._denied_allocations = 0
- self._client_request_errors = 0
- for port in ports_to_serve:
- self._port_pool.add_port_to_free_pool(port)
- @asyncio.coroutine
- def handle_port_request(self, reader, writer):
- client_data = yield from reader.read(100)
- self._handle_port_request(client_data, writer)
- writer.close()
+ def __init__(self, ports_to_serve):
+ """Initialize a new port server.
- def _handle_port_request(self, client_data, writer):
- """Given a port request body, parse it and respond appropriately.
+ Args:
+ ports_to_serve: A sequence of unique port numbers to test and offer
+ up to clients.
+ """
+ self._port_pool = _PortPool()
+ self._total_allocations = 0
+ self._denied_allocations = 0
+ self._client_request_errors = 0
+ for port in ports_to_serve:
+ self._port_pool.add_port_to_free_pool(port)
- Args:
- client_data: The request bytes from the client.
- writer: The asyncio Writer for the response to be written to.
- """
- try:
- pid = int(client_data)
- except ValueError as error:
- self._client_request_errors += 1
- log.warning('Could not parse request: %s', error)
- return
+ @asyncio.coroutine
+ def handle_port_request(self, reader, writer):
+ # XXX#client_data = yield from reader.read(100)
+ self._handle_port_request(client_data, writer)
+ writer.close()
- log.info('Request on behalf of pid %d.', pid)
- log.info('cmdline: %s', _get_process_command_line(pid))
+ def _handle_port_request(self, client_data, writer):
+ """Given a port request body, parse it and respond appropriately.
- if not _should_allocate_port(pid):
- self._denied_allocations += 1
- return
+ Args:
+ client_data: The request bytes from the client.
+ writer: The asyncio Writer for the response to be written to.
+ """
+ try:
+ pid = int(client_data)
+ except ValueError as error:
+ self._client_request_errors += 1
+ log.warning('Could not parse request: %s', error)
+ return
- port = self._port_pool.get_port_for_process(pid)
- if port > 0:
- self._total_allocations += 1
- writer.write('{:d}\n'.format(port).encode('utf-8'))
- log.debug('Allocated port %d to pid %d', port, pid)
- else:
- self._denied_allocations += 1
+ log.info('Request on behalf of pid %d.', pid)
+ log.info('cmdline: %s', _get_process_command_line(pid))
- def dump_stats(self):
- """Logs statistics of our operation."""
- log.info('Dumping statistics:')
- stats = []
- stats.append(
- 'client-request-errors {}'.format(self._client_request_errors))
- stats.append('denied-allocations {}'.format(self._denied_allocations))
- stats.append('num-ports-managed {}'.format(self._port_pool.num_ports()))
- stats.append(
- 'num-ports-checked-for-last-request {}'.format(
- self._port_pool.ports_checked_for_last_request))
- stats.append('total-allocations {}'.format(self._total_allocations))
- for stat in stats:
- log.info(stat)
+ if not _should_allocate_port(pid):
+ self._denied_allocations += 1
+ return
+
+ port = self._port_pool.get_port_for_process(pid)
+ if port > 0:
+ self._total_allocations += 1
+ writer.write('{:d}\n'.format(port).encode('utf-8'))
+ log.debug('Allocated port %d to pid %d', port, pid)
+ else:
+ self._denied_allocations += 1
+
+ def dump_stats(self):
+ """Logs statistics of our operation."""
+ log.info('Dumping statistics:')
+ stats = []
+ stats.append(
+ 'client-request-errors {}'.format(self._client_request_errors))
+ stats.append('denied-allocations {}'.format(self._denied_allocations))
+ stats.append('num-ports-managed {}'.format(self._port_pool.num_ports()))
+ stats.append(
+ 'num-ports-checked-for-last-request {}'.format(
+ self._port_pool.ports_checked_for_last_request))
+ stats.append('total-allocations {}'.format(self._total_allocations))
+ for stat in stats:
+ log.info(stat)
def _parse_command_line():
- """Configure and parse our command line flags."""
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '--portserver_static_pool', type=str,
- default='32768-60000',
- help='Comma separated N-P Range(s) of ports to manage.')
- parser.add_argument(
- '--portserver_unix_socket_address', type=str,
- default='@unittest-portserver',
- help='Address of AF_UNIX socket on which to listen (first @ is a NUL).')
- parser.add_argument('--verbose', action='store_true',
- default=False, help='Enable verbose messages.')
- parser.add_argument('--debug', action='store_true',
- default=False, help='Enable full debug messages.')
- return parser.parse_args(sys.argv[1:])
+ """Configure and parse our command line flags."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--portserver_static_pool', type=str,
+ default='32768-60000',
+ help='Comma separated N-P Range(s) of ports to manage.')
+ parser.add_argument(
+ '--portserver_unix_socket_address', type=str,
+ default='@unittest-portserver',
+ help='Address of AF_UNIX socket on which to listen (first @ is a NUL).')
+ parser.add_argument('--verbose', action='store_true',
+ default=False, help='Enable verbose messages.')
+ parser.add_argument('--debug', action='store_true',
+ default=False, help='Enable full debug messages.')
+ return parser.parse_args(sys.argv[1:])
def _parse_port_ranges(pool_str):
- """Given a 'N-P,X-Y' description of port ranges, return a set of ints."""
- ports = set()
- for range_str in pool_str.split(','):
- try:
- a, b = range_str.split('-', 1)
- start, end = int(a), int(b)
- except ValueError:
- log.error('Ignoring unparsable port range %r.', range_str)
- continue
- if start < 1 or end > 65535:
- log.error('Ignoring out of bounds port range %r.', range_str)
- continue
- ports.update(set(range(start, end + 1)))
- return ports
+ """Given a 'N-P,X-Y' description of port ranges, return a set of ints."""
+ ports = set()
+ for range_str in pool_str.split(','):
+ try:
+ a, b = range_str.split('-', 1)
+ start, end = int(a), int(b)
+ except ValueError:
+ log.error('Ignoring unparsable port range %r.', range_str)
+ continue
+ if start < 1 or end > 65535:
+ log.error('Ignoring out of bounds port range %r.', range_str)
+ continue
+ ports.update(set(range(start, end + 1)))
+ return ports
def _configure_logging(verbose=False, debug=False):
- """Configure the log global, message format, and verbosity settings."""
- overall_level = logging.DEBUG if debug else logging.INFO
- logging.basicConfig(
- format=('{levelname[0]}{asctime}.{msecs:03.0f} {thread} '
- '{filename}:{lineno}] {message}'),
- datefmt='%m%d %H:%M:%S', style='{', level=overall_level)
- global log
- log = logging.getLogger('portserver')
- # The verbosity controls our loggers logging level, not the global
- # one above. This avoids debug messages from libraries such as asyncio.
- log.setLevel(logging.DEBUG if verbose else overall_level)
+ """Configure the log global, message format, and verbosity settings."""
+ overall_level = logging.DEBUG if debug else logging.INFO
+ logging.basicConfig(
+ format=('{levelname[0]}{asctime}.{msecs:03.0f} {thread} '
+ '{filename}:{lineno}] {message}'),
+ datefmt='%m%d %H:%M:%S', style='{', level=overall_level)
+ global log
+ log = logging.getLogger('portserver')
+ # The verbosity controls our loggers logging level, not the global
+ # one above. This avoids debug messages from libraries such as asyncio.
+ log.setLevel(logging.DEBUG if verbose else overall_level)
def main():
- config = _parse_command_line()
- if config.debug:
- asyncio.tasks._DEBUG = True # Equivalent of PYTHONASYNCIODEBUG=1 in 3.4; pylint: disable=protected-access
- _configure_logging(verbose=config.verbose, debug=config.debug)
- ports_to_serve = _parse_port_ranges(config.portserver_static_pool)
- if not ports_to_serve:
- log.error('No ports. Invalid port ranges in --portserver_static_pool?')
- sys.exit(1)
+ config = _parse_command_line()
+ if config.debug:
+ # Equivalent of PYTHONASYNCIODEBUG=1 in 3.4; pylint: disable=protected-access
+ asyncio.tasks._DEBUG = True
+ _configure_logging(verbose=config.verbose, debug=config.debug)
+ ports_to_serve = _parse_port_ranges(config.portserver_static_pool)
+ if not ports_to_serve:
+ log.error('No ports. Invalid port ranges in --portserver_static_pool?')
+ sys.exit(1)
- request_handler = _PortServerRequestHandler(ports_to_serve)
+ request_handler = _PortServerRequestHandler(ports_to_serve)
- event_loop = asyncio.get_event_loop()
- event_loop.add_signal_handler(signal.SIGUSR1, request_handler.dump_stats)
- coro = asyncio.start_unix_server(
- request_handler.handle_port_request,
- path=config.portserver_unix_socket_address.replace('@', '\0', 1),
- loop=event_loop)
- server_address = config.portserver_unix_socket_address
+ event_loop = asyncio.get_event_loop()
+ event_loop.add_signal_handler(signal.SIGUSR1, request_handler.dump_stats)
+ coro = asyncio.start_unix_server(
+ request_handler.handle_port_request,
+ path=config.portserver_unix_socket_address.replace('@', '\0', 1),
+ loop=event_loop)
+ server_address = config.portserver_unix_socket_address
- server = event_loop.run_until_complete(coro)
- log.info('Serving on %s', server_address)
- try:
- event_loop.run_forever()
- except KeyboardInterrupt:
- log.info('Stopping due to ^C.')
+ server = event_loop.run_until_complete(coro)
+ log.info('Serving on %s', server_address)
+ try:
+ event_loop.run_forever()
+ except KeyboardInterrupt:
+ log.info('Stopping due to ^C.')
- server.close()
- event_loop.run_until_complete(server.wait_closed())
- event_loop.remove_signal_handler(signal.SIGUSR1)
- event_loop.close()
- request_handler.dump_stats()
- log.info('Goodbye.')
+ server.close()
+ event_loop.run_until_complete(server.wait_closed())
+ event_loop.remove_signal_handler(signal.SIGUSR1)
+ event_loop.close()
+ request_handler.dump_stats()
+ log.info('Goodbye.')
if __name__ == '__main__':
- main()
+ main()
diff --git a/src/tests/portpicker_test.py b/src/tests/portpicker_test.py
old mode 100755
new mode 100644
index bf800cd..05724da
--- a/src/tests/portpicker_test.py
+++ b/src/tests/portpicker_test.py
@@ -22,117 +22,121 @@
import unittest
try:
- # pylint: disable=no-name-in-module
- from unittest import mock # Python >= 3.3.
+ # pylint: disable=no-name-in-module
+ from unittest import mock # Python >= 3.3.
except ImportError:
- import mock # https://pypi.python.org/pypi/mock
+ import mock # https://pypi.python.org/pypi/mock
import portpicker
class PickUnusedPortTest(unittest.TestCase):
- def IsUnusedTCPPort(self, port):
- return self._bind(port, socket.SOCK_STREAM, socket.IPPROTO_TCP)
+ def IsUnusedTCPPort(self, port):
+ return self._bind(port, socket.SOCK_STREAM, socket.IPPROTO_TCP)
- def IsUnusedUDPPort(self, port):
- return self._bind(port, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ def IsUnusedUDPPort(self, port):
+ return self._bind(port, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- def setUp(self):
- # So we can Bind even if portpicker.Bind is stubbed out.
- self._bind = portpicker.Bind
+ def setUp(self):
+ # So we can Bind even if portpicker.Bind is stubbed out.
+ self._bind = portpicker.Bind
- def testPickUnusedPortActuallyWorks(self):
- """This test can be flaky."""
- for _ in range(10):
- port = portpicker.PickUnusedPort()
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ def testPickUnusedPortActuallyWorks(self):
+ """This test can be flaky."""
+ for _ in range(10):
+ port = portpicker.PickUnusedPort()
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
- @unittest.skipIf('PORTSERVER_ADDRESS' not in os.environ,
- 'no port server to test against')
- def testPickUnusedCanSuccessfullyUsePortServer(self):
+ @unittest.skipIf('PORTSERVER_ADDRESS' not in os.environ,
+ 'no port server to test against')
+ def testPickUnusedCanSuccessfullyUsePortServer(self):
- with mock.patch.object(portpicker, '_PickUnusedPortWithoutServer'):
- portpicker._PickUnusedPortWithoutServer.side_effect = Exception('eek!')
+ with mock.patch.object(portpicker, '_PickUnusedPortWithoutServer'):
+ portpicker._PickUnusedPortWithoutServer.side_effect = (
+ Exception('eek!'))
- # Since _PickUnusedPortWithoutServer() raises an exception, if we
- # can successfully obtain a port, the portserver must be working.
- port = portpicker.PickUnusedPort()
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ # Since _PickUnusedPortWithoutServer() raises an exception, if we
+ # can successfully obtain a port, the portserver must be working.
+ port = portpicker.PickUnusedPort()
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
- @unittest.skipIf('PORTSERVER_ADDRESS' not in os.environ,
- 'no port server to test against')
- def testGetPortFromPortServer(self):
- """Exercise the GetPortFromPortServer() helper function."""
- for _ in range(10):
- port = portpicker.GetPortFromPortServer(os.environ['PORTSERVER_ADDRESS'])
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ @unittest.skipIf('PORTSERVER_ADDRESS' not in os.environ,
+ 'no port server to test against')
+ def testGetPortFromPortServer(self):
+ """Exercise the GetPortFromPortServer() helper function."""
+ for _ in range(10):
+ port = portpicker.GetPortFromPortServer(
+ os.environ['PORTSERVER_ADDRESS'])
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
- def testSendsPidToPortServer(self):
- server = mock.Mock()
- server.recv.return_value = b'42768\n'
- with mock.patch.object(socket, 'socket', return_value=server):
- port = portpicker.GetPortFromPortServer('portserver', pid=1234)
- server.sendall.assert_called_once_with(b'1234\n')
- self.assertEqual(port, 42768)
+ def testSendsPidToPortServer(self):
+ server = mock.Mock()
+ server.recv.return_value = b'42768\n'
+ with mock.patch.object(socket, 'socket', return_value=server):
+ port = portpicker.GetPortFromPortServer('portserver', pid=1234)
+ server.sendall.assert_called_once_with(b'1234\n')
+ self.assertEqual(port, 42768)
- def testPidDefaultsToOwnPid(self):
- server = mock.Mock()
- server.recv.return_value = b'52768\n'
- with mock.patch.object(socket, 'socket', return_value=server):
- with mock.patch.object(os, 'getpid', return_value=9876):
- port = portpicker.GetPortFromPortServer('portserver')
- server.sendall.assert_called_once_with(b'9876\n')
- self.assertEqual(port, 52768)
+ def testPidDefaultsToOwnPid(self):
+ server = mock.Mock()
+ server.recv.return_value = b'52768\n'
+ with mock.patch.object(socket, 'socket', return_value=server):
+ with mock.patch.object(os, 'getpid', return_value=9876):
+ port = portpicker.GetPortFromPortServer('portserver')
+ server.sendall.assert_called_once_with(b'9876\n')
+ self.assertEqual(port, 52768)
- def testRandomlyChosenPorts(self):
- # Unless this box is under an overwhelming socket load, this test
- # will heavily exercise the "pick a port randomly" part of the
- # port picking code, but may never hit the "OS assigns a port"
- # code.
- for _ in range(100):
- port = portpicker._PickUnusedPortWithoutServer()
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ def testRandomlyChosenPorts(self):
+ # Unless this box is under an overwhelming socket load, this test
+ # will heavily exercise the "pick a port randomly" part of the
+ # port picking code, but may never hit the "OS assigns a port"
+ # code.
+ for _ in range(100):
+ port = portpicker._PickUnusedPortWithoutServer()
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
- def testOSAssignedPorts(self):
- self.last_assigned_port = None
+ def testOSAssignedPorts(self):
+ self.last_assigned_port = None
- def ErrorForExplicitPorts(port, socket_type, socket_proto):
- # Only successfully return a port if an OS-assigned port is
- # requested, or if we're checking that the last OS-assigned port
- # is unused on the other protocol.
- if port == 0 or port == self.last_assigned_port:
- self.last_assigned_port = self._bind(port, socket_type, socket_proto)
- return self.last_assigned_port
- else:
- return None
+ def ErrorForExplicitPorts(port, socket_type, socket_proto):
+ # Only successfully return a port if an OS-assigned port is
+ # requested, or if we're checking that the last OS-assigned port
+ # is unused on the other protocol.
+ if port == 0 or port == self.last_assigned_port:
+ self.last_assigned_port = self._bind(
+ port, socket_type, socket_proto)
+ return self.last_assigned_port
+ else:
+ return None
- with mock.patch.object(portpicker, 'Bind', ErrorForExplicitPorts):
- for _ in range(100):
- port = portpicker._PickUnusedPortWithoutServer()
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ with mock.patch.object(portpicker, 'Bind', ErrorForExplicitPorts):
+ for _ in range(100):
+ port = portpicker._PickUnusedPortWithoutServer()
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
- def testPickPortsWithError(self):
- r = random.Random()
+ def testPickPortsWithError(self):
+ r = random.Random()
- def BindWithError(port, socket_type, socket_proto):
- # 95% failure rate means both port picking methods will be exercised.
- if int(r.uniform(0, 20)) == 0:
- return self._bind(port, socket_type, socket_proto)
- else:
- return None
+ def BindWithError(port, socket_type, socket_proto):
+ # 95% failure rate means both port picking methods will be
+ # exercised.
+ if int(r.uniform(0, 20)) == 0:
+ return self._bind(port, socket_type, socket_proto)
+ else:
+ return None
- with mock.patch.object(portpicker, 'Bind', BindWithError):
- for _ in range(100):
- port = portpicker._PickUnusedPortWithoutServer()
- self.assertTrue(self.IsUnusedTCPPort(port))
- self.assertTrue(self.IsUnusedUDPPort(port))
+ with mock.patch.object(portpicker, 'Bind', BindWithError):
+ for _ in range(100):
+ port = portpicker._PickUnusedPortWithoutServer()
+ self.assertTrue(self.IsUnusedTCPPort(port))
+ self.assertTrue(self.IsUnusedUDPPort(port))
if __name__ == '__main__':
- unittest.main()
+ unittest.main()
diff --git a/src/tests/portserver_test.py b/src/tests/portserver_test.py
old mode 100755
new mode 100644
index 835fc67..29b0792
--- a/src/tests/portserver_test.py
+++ b/src/tests/portserver_test.py
@@ -27,134 +27,135 @@
import portserver
-
def setUpModule():
- portserver._configure_logging(verbose=True)
+ portserver._configure_logging(verbose=True)
class PortserverFunctionsTest(unittest.TestCase):
- @classmethod
- def setUp(cls):
- cls.port = portpicker.PickUnusedPort()
+ @classmethod
+ def setUp(cls):
+ cls.port = portpicker.PickUnusedPort()
- def test_get_process_command_line(self):
- portserver._get_process_command_line(os.getpid())
+ def test_get_process_command_line(self):
+ portserver._get_process_command_line(os.getpid())
- def test_get_process_start_time(self):
- self.assertGreater(portserver._get_process_start_time(os.getpid()), 0)
+ def test_get_process_start_time(self):
+ self.assertGreater(portserver._get_process_start_time(os.getpid()), 0)
- def test_port_is_available_true(self):
- """This might be flaky unless this test is run with a portserver."""
- # Insert Inception "we must go deeper" meme here.
- self.assertTrue(portserver._port_is_available(self.port))
+ def test_port_is_available_true(self):
+ """This might be flaky unless this test is run with a portserver."""
+ # Insert Inception "we must go deeper" meme here.
+ self.assertTrue(portserver._port_is_available(self.port))
- def test_port_is_available_false(self):
- with mock.patch.object(socket, 'socket') as mock_sock:
- mock_sock.side_effect = socket.error('fake socket error', 0)
- self.assertFalse(portserver._port_is_available(self.port))
+ def test_port_is_available_false(self):
+ with mock.patch.object(socket, 'socket') as mock_sock:
+ mock_sock.side_effect = socket.error('fake socket error', 0)
+ self.assertFalse(portserver._port_is_available(self.port))
- def test_should_allocate_port(self):
- self.assertFalse(portserver._should_allocate_port(0))
- self.assertFalse(portserver._should_allocate_port(1))
- self.assertTrue(portserver._should_allocate_port, os.getpid())
- child_pid = os.fork()
- if child_pid == 0:
- os._exit(0)
- else:
- os.waitpid(child_pid, 0)
- # This test assumes that after waitpid returns the kernel has finished
- # cleaning the process. We also assume that the kernel will not reuse
- # the former child's pid before our next call checks for its existence.
- # Likely assumptions, but not guaranteed.
- self.assertFalse(portserver._should_allocate_port(child_pid))
+ def test_should_allocate_port(self):
+ self.assertFalse(portserver._should_allocate_port(0))
+ self.assertFalse(portserver._should_allocate_port(1))
+ self.assertTrue(portserver._should_allocate_port, os.getpid())
+ child_pid = os.fork()
+ if child_pid == 0:
+ os._exit(0)
+ else:
+ os.waitpid(child_pid, 0)
+ # This test assumes that after waitpid returns the kernel has finished
+ # cleaning the process. We also assume that the kernel will not reuse
+ # the former child's pid before our next call checks for its existence.
+ # Likely assumptions, but not guaranteed.
+ self.assertFalse(portserver._should_allocate_port(child_pid))
- def test_parse_command_line(self):
- with mock.patch.object(
- sys, 'argv', ['program_name', '--verbose',
- '--portserver_static_pool=1-1,3-8',
- '--portserver_unix_socket_address=@hello-test']):
- portserver._parse_command_line()
+ def test_parse_command_line(self):
+ with mock.patch.object(
+ sys, 'argv', ['program_name', '--verbose',
+ '--portserver_static_pool=1-1,3-8',
+ '--portserver_unix_socket_address=@hello-test']):
+ portserver._parse_command_line()
- def test_parse_port_ranges(self):
- self.assertFalse(portserver._parse_port_ranges(''))
- self.assertCountEqual(portserver._parse_port_ranges('1-1'), {1})
- self.assertCountEqual(portserver._parse_port_ranges('1-1,3-8,375-378'),
- {1, 3, 4, 5, 6, 7, 8, 375, 376, 377, 378})
- # Unparsable parts are logged but ignored.
- self.assertEqual({1, 2}, portserver._parse_port_ranges('1-2,not,numbers'))
- self.assertEqual(set(), portserver._parse_port_ranges('8080-8081x'))
- # Port ranges that go out of bounds are logged but ignored.
- self.assertEqual(set(), portserver._parse_port_ranges('0-1138'))
- self.assertEqual(set(range(19, 84 + 1)),
- portserver._parse_port_ranges('1138-65536,19-84'))
+ def test_parse_port_ranges(self):
+ self.assertFalse(portserver._parse_port_ranges(''))
+ self.assertCountEqual(portserver._parse_port_ranges('1-1'), {1})
+ self.assertCountEqual(portserver._parse_port_ranges('1-1,3-8,375-378'),
+ {1, 3, 4, 5, 6, 7, 8, 375, 376, 377, 378})
+ # Unparsable parts are logged but ignored.
+ self.assertEqual(
+ {1, 2}, portserver._parse_port_ranges('1-2,not,numbers'))
+ self.assertEqual(set(), portserver._parse_port_ranges('8080-8081x'))
+ # Port ranges that go out of bounds are logged but ignored.
+ self.assertEqual(set(), portserver._parse_port_ranges('0-1138'))
+ self.assertEqual(set(range(19, 84 + 1)),
+ portserver._parse_port_ranges('1138-65536,19-84'))
- def test_configure_logging(self):
- """Just code coverage really."""
- portserver._configure_logging(False)
- portserver._configure_logging(True)
+ def test_configure_logging(self):
+ """Just code coverage really."""
+ portserver._configure_logging(False)
+ portserver._configure_logging(True)
- @mock.patch.object(sys, 'argv',
- ['PortserverFunctionsTest.test_main',
- '--portserver_unix_socket_address=@TST-%d' % os.getpid()])
- @mock.patch.object(portserver, '_parse_port_ranges')
- @mock.patch('asyncio.get_event_loop')
- @mock.patch('asyncio.start_unix_server')
- def test_main(self, *unused_mocks):
- portserver._parse_port_ranges.return_value = set()
- with self.assertRaises(SystemExit):
- portserver.main()
+ @mock.patch.object(
+ sys, 'argv',
+ ['PortserverFunctionsTest.test_main',
+ '--portserver_unix_socket_address=@TST-%d' % os.getpid()])
+ @mock.patch.object(portserver, '_parse_port_ranges')
+ @mock.patch('asyncio.get_event_loop')
+ @mock.patch('asyncio.start_unix_server')
+ def test_main(self, *unused_mocks):
+ portserver._parse_port_ranges.return_value = set()
+ with self.assertRaises(SystemExit):
+ portserver.main()
- # Give it at least one port and try again.
- portserver._parse_port_ranges.return_value = {self.port}
+ # Give it at least one port and try again.
+ portserver._parse_port_ranges.return_value = {self.port}
- mock_event_loop = mock.Mock(spec=asyncio.base_events.BaseEventLoop)
- asyncio.get_event_loop.return_value = mock_event_loop
- asyncio.start_unix_server.return_value = mock.Mock()
- mock_event_loop.run_forever.side_effect = KeyboardInterrupt
+ mock_event_loop = mock.Mock(spec=asyncio.base_events.BaseEventLoop)
+ asyncio.get_event_loop.return_value = mock_event_loop
+ asyncio.start_unix_server.return_value = mock.Mock()
+ mock_event_loop.run_forever.side_effect = KeyboardInterrupt
- portserver.main()
+ portserver.main()
- mock_event_loop.run_until_complete.assert_any_call(
- asyncio.start_unix_server.return_value)
- mock_event_loop.close.assert_called_once_with()
- # NOTE: This could be improved. Tests of main() are often gross.
+ mock_event_loop.run_until_complete.assert_any_call(
+ asyncio.start_unix_server.return_value)
+ mock_event_loop.close.assert_called_once_with()
+ # NOTE: This could be improved. Tests of main() are often gross.
class PortPoolTest(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- cls.port = portpicker.PickUnusedPort()
+ @classmethod
+ def setUpClass(cls):
+ cls.port = portpicker.PickUnusedPort()
- def setUp(self):
- self.pool = portserver._PortPool()
+ def setUp(self):
+ self.pool = portserver._PortPool()
- def test_initialization(self):
- self.assertEqual(0, self.pool.num_ports())
- self.pool.add_port_to_free_pool(self.port)
- self.assertEqual(1, self.pool.num_ports())
- self.pool.add_port_to_free_pool(1138)
- self.assertEqual(2, self.pool.num_ports())
- self.assertRaises(ValueError, self.pool.add_port_to_free_pool, 0)
- self.assertRaises(ValueError, self.pool.add_port_to_free_pool, 65536)
+ def test_initialization(self):
+ self.assertEqual(0, self.pool.num_ports())
+ self.pool.add_port_to_free_pool(self.port)
+ self.assertEqual(1, self.pool.num_ports())
+ self.pool.add_port_to_free_pool(1138)
+ self.assertEqual(2, self.pool.num_ports())
+ self.assertRaises(ValueError, self.pool.add_port_to_free_pool, 0)
+ self.assertRaises(ValueError, self.pool.add_port_to_free_pool, 65536)
- @mock.patch.object(portserver, '_port_is_available')
- def test_get_port_for_process_ok(self, mock_port_is_available):
- self.pool.add_port_to_free_pool(self.port)
- mock_port_is_available.return_value = True
- self.assertEqual(self.port, self.pool.get_port_for_process(os.getpid()))
- self.assertEqual(1, self.pool.ports_checked_for_last_request)
+ @mock.patch.object(portserver, '_port_is_available')
+ def test_get_port_for_process_ok(self, mock_port_is_available):
+ self.pool.add_port_to_free_pool(self.port)
+ mock_port_is_available.return_value = True
+ self.assertEqual(self.port, self.pool.get_port_for_process(os.getpid()))
+ self.assertEqual(1, self.pool.ports_checked_for_last_request)
- @mock.patch.object(portserver, '_port_is_available')
- def test_get_port_for_process_none_left(self, mock_port_is_available):
- self.pool.add_port_to_free_pool(self.port)
- self.pool.add_port_to_free_pool(22)
- mock_port_is_available.return_value = False
- self.assertEqual(2, self.pool.num_ports())
- self.assertEqual(0, self.pool.get_port_for_process(os.getpid()))
- self.assertEqual(2, self.pool.num_ports())
- self.assertEqual(2, self.pool.ports_checked_for_last_request)
+ @mock.patch.object(portserver, '_port_is_available')
+ def test_get_port_for_process_none_left(self, mock_port_is_available):
+ self.pool.add_port_to_free_pool(self.port)
+ self.pool.add_port_to_free_pool(22)
+ mock_port_is_available.return_value = False
+ self.assertEqual(2, self.pool.num_ports())
+ self.assertEqual(0, self.pool.get_port_for_process(os.getpid()))
+ self.assertEqual(2, self.pool.num_ports())
+ self.assertEqual(2, self.pool.ports_checked_for_last_request)
@mock.patch.object(portserver, '_get_process_command_line')
@@ -162,54 +163,54 @@
@mock.patch.object(portserver._PortPool, 'get_port_for_process')
class PortServerRequestHandlerTest(unittest.TestCase):
- def setUp(self):
- portserver._configure_logging(verbose=True)
- self.rh = portserver._PortServerRequestHandler([23, 42, 54])
+ def setUp(self):
+ portserver._configure_logging(verbose=True)
+ self.rh = portserver._PortServerRequestHandler([23, 42, 54])
- def test_stats_reporting(self, *unused_mocks):
- with mock.patch.object(portserver, 'log') as mock_logger:
- self.rh.dump_stats()
- mock_logger.info.assert_called_with('total-allocations 0')
+ def test_stats_reporting(self, *unused_mocks):
+ with mock.patch.object(portserver, 'log') as mock_logger:
+ self.rh.dump_stats()
+ mock_logger.info.assert_called_with('total-allocations 0')
- def test_handle_port_request_bad_data(self, *unused_mocks):
- self._test_bad_data_from_client(b'')
- self._test_bad_data_from_client(b'\n')
- self._test_bad_data_from_client(b'99Z\n')
- self._test_bad_data_from_client(b'99 8\n')
- self.assertEqual([], portserver._get_process_command_line.mock_calls)
+ def test_handle_port_request_bad_data(self, *unused_mocks):
+ self._test_bad_data_from_client(b'')
+ self._test_bad_data_from_client(b'\n')
+ self._test_bad_data_from_client(b'99Z\n')
+ self._test_bad_data_from_client(b'99 8\n')
+ self.assertEqual([], portserver._get_process_command_line.mock_calls)
- def _test_bad_data_from_client(self, data):
- mock_writer = mock.Mock(asyncio.StreamWriter)
- self.rh._handle_port_request(data, mock_writer)
- self.assertFalse(portserver._should_allocate_port.mock_calls)
+ def _test_bad_data_from_client(self, data):
+ mock_writer = mock.Mock(asyncio.StreamWriter)
+ self.rh._handle_port_request(data, mock_writer)
+ self.assertFalse(portserver._should_allocate_port.mock_calls)
- def test_handle_port_request_denied_allocation(self, *unused_mocks):
- portserver._should_allocate_port.return_value = False
- self.assertEqual(0, self.rh._denied_allocations)
- mock_writer = mock.Mock(asyncio.StreamWriter)
- self.rh._handle_port_request(b'5\n', mock_writer)
- self.assertEqual(1, self.rh._denied_allocations)
+ def test_handle_port_request_denied_allocation(self, *unused_mocks):
+ portserver._should_allocate_port.return_value = False
+ self.assertEqual(0, self.rh._denied_allocations)
+ mock_writer = mock.Mock(asyncio.StreamWriter)
+ self.rh._handle_port_request(b'5\n', mock_writer)
+ self.assertEqual(1, self.rh._denied_allocations)
- def test_handle_port_request_bad_port_returned(self, *unused_mocks):
- portserver._should_allocate_port.return_value = True
- self.rh._port_pool.get_port_for_process.return_value = 0
- mock_writer = mock.Mock(asyncio.StreamWriter)
- self.rh._handle_port_request(b'6\n', mock_writer)
- self.rh._port_pool.get_port_for_process.assert_called_once_with(6)
- self.assertEqual(1, self.rh._denied_allocations)
+ def test_handle_port_request_bad_port_returned(self, *unused_mocks):
+ portserver._should_allocate_port.return_value = True
+ self.rh._port_pool.get_port_for_process.return_value = 0
+ mock_writer = mock.Mock(asyncio.StreamWriter)
+ self.rh._handle_port_request(b'6\n', mock_writer)
+ self.rh._port_pool.get_port_for_process.assert_called_once_with(6)
+ self.assertEqual(1, self.rh._denied_allocations)
- def test_handle_port_request_success(self, *unused_mocks):
- portserver._should_allocate_port.return_value = True
- self.rh._port_pool.get_port_for_process.return_value = 999
- mock_writer = mock.Mock(asyncio.StreamWriter)
- self.assertEqual(0, self.rh._total_allocations)
- self.rh._handle_port_request(b'8', mock_writer)
- portserver._should_allocate_port.assert_called_once_with(8)
- self.rh._port_pool.get_port_for_process.assert_called_once_with(8)
- self.assertEqual(1, self.rh._total_allocations)
- self.assertEqual(0, self.rh._denied_allocations)
- mock_writer.write.assert_called_once_with(b'999\n')
+ def test_handle_port_request_success(self, *unused_mocks):
+ portserver._should_allocate_port.return_value = True
+ self.rh._port_pool.get_port_for_process.return_value = 999
+ mock_writer = mock.Mock(asyncio.StreamWriter)
+ self.assertEqual(0, self.rh._total_allocations)
+ self.rh._handle_port_request(b'8', mock_writer)
+ portserver._should_allocate_port.assert_called_once_with(8)
+ self.rh._port_pool.get_port_for_process.assert_called_once_with(8)
+ self.assertEqual(1, self.rh._total_allocations)
+ self.assertEqual(0, self.rh._denied_allocations)
+ mock_writer.write.assert_called_once_with(b'999\n')
if __name__ == '__main__':
- unittest.main()
+ unittest.main()