blob: 7836644263e02b6ab27f2b573ad87d2a7ff1645b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING
from antlion.libs.proc import job
if TYPE_CHECKING:
from antlion.controllers.access_point import AccessPoint
GET_ALL_INTERFACE = "ls /sys/class/net"
GET_VIRTUAL_INTERFACE = "ls /sys/devices/virtual/net"
BRCTL_SHOW = "brctl show"
class ApInterfacesError(Exception):
"""Error related to AP interfaces."""
class ApInterfaces(object):
"""Class to get network interface information for the device."""
def __init__(
self, ap: "AccessPoint", wan_interface_override: str | None = None
) -> None:
"""Initialize the ApInterface class.
Args:
ap: the ap object within ACTS
wan_interface_override: wan interface to use if specified by config
"""
self.ssh = ap.ssh
self.wan_interface_override = wan_interface_override
def get_all_interface(self) -> list[str]:
"""Get all network interfaces on the device.
Returns:
interfaces_all: list of all the network interfaces on device
"""
output = self.ssh.run(GET_ALL_INTERFACE)
interfaces_all = output.stdout.split("\n")
return interfaces_all
def get_virtual_interface(self) -> list[str]:
"""Get all virtual interfaces on the device.
Returns:
interfaces_virtual: list of all the virtual interfaces on device
"""
output = self.ssh.run(GET_VIRTUAL_INTERFACE)
interfaces_virtual = output.stdout.split("\n")
return interfaces_virtual
def get_physical_interface(self) -> list[str]:
"""Get all the physical interfaces of the device.
Get all physical interfaces such as eth ports and wlan ports
Returns:
interfaces_phy: list of all the physical interfaces
"""
interfaces_all = self.get_all_interface()
interfaces_virtual = self.get_virtual_interface()
interfaces_phy = list(set(interfaces_all) - set(interfaces_virtual))
return interfaces_phy
def get_bridge_interface(self) -> list[str]:
"""Get all the bridge interfaces of the device.
Returns:
interfaces_bridge: the list of bridge interfaces, return None if
bridge utility is not available on the device
Raises:
ApInterfaceError: Failing to run brctl
"""
try:
output = self.ssh.run(BRCTL_SHOW)
except job.Error as e:
raise ApInterfacesError(f'failed to execute "{BRCTL_SHOW}"') from e
lines = output.stdout.split("\n")
interfaces_bridge = []
for line in lines:
interfaces_bridge.append(line.split("\t")[0])
interfaces_bridge.pop(0)
return [x for x in interfaces_bridge if x != ""]
def get_wlan_interface(self) -> tuple[str, str]:
"""Get all WLAN interfaces and specify 2.4 GHz and 5 GHz interfaces.
Returns:
interfaces_wlan: all wlan interfaces
Raises:
ApInterfacesError: Missing at least one WLAN interface
"""
wlan_2g = None
wlan_5g = None
interfaces_phy = self.get_physical_interface()
for iface in interfaces_phy:
output = self.ssh.run(f"iwlist {iface} freq")
if "Channel 06" in output.stdout and "Channel 36" not in output.stdout:
wlan_2g = iface
elif "Channel 36" in output.stdout and "Channel 06" not in output.stdout:
wlan_5g = iface
if wlan_2g is None or wlan_5g is None:
raise ApInterfacesError("Missing at least one WLAN interface")
return (wlan_2g, wlan_5g)
def get_wan_interface(self) -> str:
"""Get the WAN interface which has internet connectivity. If a wan
interface is already specified return that instead.
Returns:
wan: the only one WAN interface
Raises:
ApInterfacesError: no running WAN can be found
"""
if self.wan_interface_override:
return self.wan_interface_override
wan = None
interfaces_phy = self.get_physical_interface()
interfaces_wlan = self.get_wlan_interface()
interfaces_eth = list(set(interfaces_phy) - set(interfaces_wlan))
for iface in interfaces_eth:
network_status = self.check_ping(iface)
if network_status == 1:
wan = iface
break
if wan:
return wan
output = self.ssh.run("ifconfig")
interfaces_all = output.stdout.split("\n")
logging.info(f"IFCONFIG output = {interfaces_all}")
raise ApInterfacesError("No WAN interface available")
def get_lan_interface(self) -> str | None:
"""Get the LAN interface connecting to local devices.
Returns:
lan: the only one running LAN interface of the devices
None, if nothing was found.
"""
lan = None
interfaces_phy = self.get_physical_interface()
interfaces_wlan = self.get_wlan_interface()
interfaces_eth = list(set(interfaces_phy) - set(interfaces_wlan))
interface_wan = self.get_wan_interface()
interfaces_eth.remove(interface_wan)
for iface in interfaces_eth:
output = self.ssh.run(f"ifconfig {iface}")
if "RUNNING" in output.stdout:
lan = iface
break
return lan
def check_ping(self, iface: str) -> int:
"""Check the ping status on specific interface to determine the WAN.
Args:
iface: the specific interface to check
Returns:
network_status: the connectivity status of the interface
"""
try:
self.ssh.run(f"ping -c 3 -I {iface} 8.8.8.8")
return 1
except job.Error:
return 0