blob: d024c4b9f168cfe7d5b99ba300f2fe8fd014a401 [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ipaddress
import re
from antlion.controllers.utils_lib.ssh import connection
class Error(Exception):
"""Exception thrown when a valid ip command experiences errors."""
class NetworkInterfaceDown(Error):
"""Exception thrown when a network interface is down."""
class LinuxRouteCommand(object):
"""Interface for doing standard ip route commands on a linux system."""
DEFAULT_ROUTE = 'default'
def __init__(self, runner):
"""
Args:
runner: Object that can take unix commands and run them in an
enviroment.
"""
self._runner = runner
def add_route(self, net_interface, address):
"""Add an entry to the ip routing table.
Will add a route for either a specific ip address, or a network.
Args:
net_interface: string, Any packet that sends through this route
will be sent using this network interface
(eg. wlan0).
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE. The address to use. If a network
is given then the entire subnet will be routed.
If DEFAULT_ROUTE is given then this will set the
default route.
Raises:
NetworkInterfaceDown: Raised when the network interface is down.
"""
try:
self._runner.run('ip route add %s dev %s' %
(address, net_interface))
except connection.CommandError as e:
if 'File exists' in e.result.stderr:
raise Error('Route already exists.')
if 'Network is down' in e.result.stderr:
raise NetworkInterfaceDown(
'Device must be up for adding a route.')
raise
def get_routes(self, net_interface=None):
"""Get the routes in the ip routing table.
Args:
net_interface: string, If given, only retrive routes that have
been registered to go through this network
interface (eg. wlan0).
Returns: An iterator that returns a tuple of (address, net_interface).
If it is the default route then address
will be the DEFAULT_ROUTE. If the route is a subnet then
it will be a ipaddress.IPv4Network otherwise it is a
ipaddress.IPv4Address.
"""
result = self._runner.run('ip route show')
lines = result.stdout.splitlines()
# Scan through each line for valid route entries
# Example output:
# default via 192.168.1.254 dev eth0 proto static
# 192.168.1.0/24 dev eth0 proto kernel scope link src 172.22.100.19 metric 1
# 192.168.2.1 dev eth2 proto kernel scope link metric 1
for line in lines:
if not 'dev' in line:
continue
if line.startswith(self.DEFAULT_ROUTE):
# The default route entry is formatted differently.
match = re.search('dev (?P<net_interface>.*)', line)
pair = None
if match:
# When there is a match for the route entry pattern create
# A pair to hold the info.
pair = (self.DEFAULT_ROUTE,
match.groupdict()['net_interface'])
else:
# Test the normal route entry pattern.
match = re.search(
'(?P<address>[^\s]*) dev (?P<net_interface>[^\s]*)', line)
pair = None
if match:
# When there is a match for the route entry pattern create
# A pair to hold the info.
d = match.groupdict()
# Route can be either a network or specific address
try:
address = ipaddress.IPv4Address(d['address'])
except ipaddress.AddressValueError:
address = ipaddress.IPv4Network(d['address'])
pair = (address, d['net_interface'])
# No pair means no pattern was found.
if not pair:
continue
if net_interface:
# If a net_interface was passed in then only give the pair when it is
# The correct net_interface.
if pair[1] == net_interface:
yield pair
else:
# No net_interface given give all valid route entries.
yield pair
def is_route(self, address, net_interface=None):
"""Checks to see if a route exists.
Args:
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE, The address to use.
net_interface: string, If specified, the route must be
registered to go through this network interface
(eg. wlan0).
Returns: True if the route is found, False otherwise.
"""
for route, _ in self.get_routes(net_interface):
if route == address:
return True
return False
def remove_route(self, address, net_interface=None):
"""Removes a route from the ip routing table.
Removes a route from the ip routing table. If the route does not exist
nothing is done.
Args:
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE, The address of the route to remove.
net_interface: string, If specified the route being removed is
registered to go through this network interface
(eg. wlan0)
"""
try:
if net_interface:
self._runner.run('ip route del %s dev %s' %
(address, net_interface))
else:
self._runner.run('ip route del %s' % address)
except connection.CommandError as e:
if 'No such process' in e.result.stderr:
# The route didn't exist.
return
raise
def clear_routes(self, net_interface=None):
"""Clears all routes.
Args:
net_interface: The network interface to clear routes on.
If not given then all routes will be removed on all network
interfaces (eg. wlan0).
"""
routes = self.get_routes(net_interface)
for a, d in routes:
self.remove_route(a, d)