blob: 54fa4d9a0f7eec1093ce584a86c458997007c7d3 [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ipaddress
import re
from antlion.controllers.utils_lib.ssh import connection
class Error(Exception):
"""Exception thrown when a valid ip command experiences errors."""
class NetworkInterfaceDown(Error):
"""Exception thrown when a network interface is down."""
class LinuxRouteCommand(object):
"""Interface for doing standard ip route commands on a linux system."""
DEFAULT_ROUTE = "default"
def __init__(self, runner):
"""
Args:
runner: Object that can take unix commands and run them in an
environment.
"""
self._runner = runner
def add_route(self, net_interface, address, proto="static"):
"""Add an entry to the ip routing table.
Will add a route for either a specific ip address, or a network.
Args:
net_interface: string, Any packet that sends through this route
will be sent using this network interface
(eg. wlan0).
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE. The address to use. If a network
is given then the entire subnet will be routed.
If DEFAULT_ROUTE is given then this will set the
default route.
proto: string, Routing protocol identifier of this route
(e.g. kernel, redirect, boot, static, ra).
See `man ip-route(8)` for details.
Raises:
NetworkInterfaceDown: Raised when the network interface is down.
"""
try:
self._runner.run(
f"ip route add {address} dev {net_interface} proto {proto}"
)
except connection.CommandError as e:
if "File exists" in e.result.stderr:
raise Error("Route already exists.")
if "Network is down" in e.result.stderr:
raise NetworkInterfaceDown("Device must be up for adding a route.")
raise
def get_routes(self, net_interface=None):
"""Get the routes in the ip routing table.
Args:
net_interface: string, If given, only retrieve routes that have
been registered to go through this network
interface (eg. wlan0).
Returns: An iterator that returns a tuple of (address, net_interface).
If it is the default route then address
will be the DEFAULT_ROUTE. If the route is a subnet then
it will be a ipaddress.IPv4Network otherwise it is a
ipaddress.IPv4Address.
"""
result_ipv4 = self._runner.run("ip -4 route show")
result_ipv6 = self._runner.run("ip -6 route show")
lines = result_ipv4.stdout.splitlines() + result_ipv6.stdout.splitlines()
# Scan through each line for valid route entries
# Example output:
# default via 192.168.1.254 dev eth0 proto static
# 192.168.1.0/24 dev eth0 proto kernel scope link src 172.22.100.19 metric 1
# 192.168.2.1 dev eth2 proto kernel scope link metric 1
# fe80::/64 dev wlan0 proto static metric 1024
for line in lines:
if not "dev" in line:
continue
if line.startswith(self.DEFAULT_ROUTE):
# The default route entry is formatted differently.
match = re.search("dev (?P<net_interface>\S+)", line)
pair = None
if match:
# When there is a match for the route entry pattern create
# A pair to hold the info.
pair = (self.DEFAULT_ROUTE, match.groupdict()["net_interface"])
else:
# Test the normal route entry pattern.
match = re.search(
"(?P<address>[0-9A-Fa-f\.\:/]+) dev (?P<net_interface>\S+)", line
)
pair = None
if match:
# When there is a match for the route entry pattern create
# A pair to hold the info.
d = match.groupdict()
# Route can be either a network or specific address
try:
address = ipaddress.ip_address(d["address"])
except ValueError:
address = d["address"]
pair = (address, d["net_interface"])
# No pair means no pattern was found.
if not pair:
continue
if net_interface:
# If a net_interface was passed in then only give the pair when it is
# The correct net_interface.
if pair[1] == net_interface:
yield pair
else:
# No net_interface given give all valid route entries.
yield pair
def is_route(self, address, net_interface=None):
"""Checks to see if a route exists.
Args:
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE, The address to use.
net_interface: string, If specified, the route must be
registered to go through this network interface
(eg. wlan0).
Returns: True if the route is found, False otherwise.
"""
for route, _ in self.get_routes(net_interface):
if route == address:
return True
return False
def remove_route(self, address, net_interface=None):
"""Removes a route from the ip routing table.
Removes a route from the ip routing table. If the route does not exist
nothing is done.
Args:
address: ipaddress.IPv4Address, ipaddress.IPv4Network,
or DEFAULT_ROUTE, The address of the route to remove.
net_interface: string, If specified the route being removed is
registered to go through this network interface
(eg. wlan0)
"""
try:
if net_interface:
self._runner.run(f"ip route del {address} dev {net_interface}")
else:
self._runner.run(f"ip route del {address}")
except connection.CommandError as e:
if "No such process" in e.result.stderr:
# The route didn't exist.
return
raise
def clear_routes(self, net_interface=None):
"""Clears all routes.
Args:
net_interface: The network interface to clear routes on.
If not given then all routes will be removed on all network
interfaces (eg. wlan0).
"""
routes = self.get_routes(net_interface)
for a, d in routes:
self.remove_route(a, d)