| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import ipaddress |
| import re |
| |
| from antlion.controllers.utils_lib.ssh import connection |
| |
| |
| class Error(Exception): |
| """Exception thrown when a valid ip command experiences errors.""" |
| |
| |
| class NetworkInterfaceDown(Error): |
| """Exception thrown when a network interface is down.""" |
| |
| |
| class LinuxRouteCommand(object): |
| """Interface for doing standard ip route commands on a linux system.""" |
| |
| DEFAULT_ROUTE = "default" |
| |
| def __init__(self, runner): |
| """ |
| Args: |
| runner: Object that can take unix commands and run them in an |
| environment. |
| """ |
| self._runner = runner |
| |
| def add_route(self, net_interface, address, proto="static"): |
| """Add an entry to the ip routing table. |
| |
| Will add a route for either a specific ip address, or a network. |
| |
| Args: |
| net_interface: string, Any packet that sends through this route |
| will be sent using this network interface |
| (eg. wlan0). |
| address: ipaddress.IPv4Address, ipaddress.IPv4Network, |
| or DEFAULT_ROUTE. The address to use. If a network |
| is given then the entire subnet will be routed. |
| If DEFAULT_ROUTE is given then this will set the |
| default route. |
| proto: string, Routing protocol identifier of this route |
| (e.g. kernel, redirect, boot, static, ra). |
| See `man ip-route(8)` for details. |
| |
| Raises: |
| NetworkInterfaceDown: Raised when the network interface is down. |
| """ |
| try: |
| self._runner.run( |
| f"ip route add {address} dev {net_interface} proto {proto}" |
| ) |
| except connection.CommandError as e: |
| if "File exists" in e.result.stderr: |
| raise Error("Route already exists.") |
| if "Network is down" in e.result.stderr: |
| raise NetworkInterfaceDown("Device must be up for adding a route.") |
| raise |
| |
| def get_routes(self, net_interface=None): |
| """Get the routes in the ip routing table. |
| |
| Args: |
| net_interface: string, If given, only retrieve routes that have |
| been registered to go through this network |
| interface (eg. wlan0). |
| |
| Returns: An iterator that returns a tuple of (address, net_interface). |
| If it is the default route then address |
| will be the DEFAULT_ROUTE. If the route is a subnet then |
| it will be a ipaddress.IPv4Network otherwise it is a |
| ipaddress.IPv4Address. |
| """ |
| result_ipv4 = self._runner.run("ip -4 route show") |
| result_ipv6 = self._runner.run("ip -6 route show") |
| |
| lines = result_ipv4.stdout.splitlines() + result_ipv6.stdout.splitlines() |
| |
| # Scan through each line for valid route entries |
| # Example output: |
| # default via 192.168.1.254 dev eth0 proto static |
| # 192.168.1.0/24 dev eth0 proto kernel scope link src 172.22.100.19 metric 1 |
| # 192.168.2.1 dev eth2 proto kernel scope link metric 1 |
| # fe80::/64 dev wlan0 proto static metric 1024 |
| for line in lines: |
| if not "dev" in line: |
| continue |
| |
| if line.startswith(self.DEFAULT_ROUTE): |
| # The default route entry is formatted differently. |
| match = re.search("dev (?P<net_interface>\S+)", line) |
| pair = None |
| if match: |
| # When there is a match for the route entry pattern create |
| # A pair to hold the info. |
| pair = (self.DEFAULT_ROUTE, match.groupdict()["net_interface"]) |
| else: |
| # Test the normal route entry pattern. |
| match = re.search( |
| "(?P<address>[0-9A-Fa-f\.\:/]+) dev (?P<net_interface>\S+)", line |
| ) |
| pair = None |
| if match: |
| # When there is a match for the route entry pattern create |
| # A pair to hold the info. |
| d = match.groupdict() |
| # Route can be either a network or specific address |
| try: |
| address = ipaddress.ip_address(d["address"]) |
| except ValueError: |
| address = d["address"] |
| |
| pair = (address, d["net_interface"]) |
| |
| # No pair means no pattern was found. |
| if not pair: |
| continue |
| |
| if net_interface: |
| # If a net_interface was passed in then only give the pair when it is |
| # The correct net_interface. |
| if pair[1] == net_interface: |
| yield pair |
| else: |
| # No net_interface given give all valid route entries. |
| yield pair |
| |
| def is_route(self, address, net_interface=None): |
| """Checks to see if a route exists. |
| |
| Args: |
| address: ipaddress.IPv4Address, ipaddress.IPv4Network, |
| or DEFAULT_ROUTE, The address to use. |
| net_interface: string, If specified, the route must be |
| registered to go through this network interface |
| (eg. wlan0). |
| |
| Returns: True if the route is found, False otherwise. |
| """ |
| for route, _ in self.get_routes(net_interface): |
| if route == address: |
| return True |
| |
| return False |
| |
| def remove_route(self, address, net_interface=None): |
| """Removes a route from the ip routing table. |
| |
| Removes a route from the ip routing table. If the route does not exist |
| nothing is done. |
| |
| Args: |
| address: ipaddress.IPv4Address, ipaddress.IPv4Network, |
| or DEFAULT_ROUTE, The address of the route to remove. |
| net_interface: string, If specified the route being removed is |
| registered to go through this network interface |
| (eg. wlan0) |
| """ |
| try: |
| if net_interface: |
| self._runner.run(f"ip route del {address} dev {net_interface}") |
| else: |
| self._runner.run(f"ip route del {address}") |
| except connection.CommandError as e: |
| if "No such process" in e.result.stderr: |
| # The route didn't exist. |
| return |
| raise |
| |
| def clear_routes(self, net_interface=None): |
| """Clears all routes. |
| |
| Args: |
| net_interface: The network interface to clear routes on. |
| If not given then all routes will be removed on all network |
| interfaces (eg. wlan0). |
| """ |
| routes = self.get_routes(net_interface) |
| |
| for a, d in routes: |
| self.remove_route(a, d) |