| #!/usr/bin/env python3 |
| # |
| # Copyright 2022 The Fuchsia Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import time |
| from dataclasses import dataclass |
| from ipaddress import IPv4Address, IPv4Network |
| |
| from mobly import asserts, signals |
| |
| from antlion import utils |
| from antlion.controllers.access_point import AccessPoint, setup_ap |
| from antlion.controllers.ap_lib import dhcp_config, hostapd_constants |
| from antlion.controllers.ap_lib.hostapd_security import Security, SecurityMode |
| from antlion.controllers.ap_lib.hostapd_utils import generate_random_password |
| from antlion.controllers.fuchsia_device import FuchsiaDevice |
| from antlion.test_utils.abstract_devices.wlan_device import ( |
| AssociationMode, |
| create_wlan_device, |
| ) |
| from antlion.test_utils.wifi import base_test |
| |
| |
| @dataclass |
| class APParams: |
| id: str |
| ssid: str |
| security: Security |
| ip: IPv4Address |
| network: IPv4Network |
| |
| |
| class Dhcpv4InteropFixture(base_test.WifiBaseTest): |
| """Test helpers for validating DHCPv4 Interop |
| |
| Test Bed Requirement: |
| * One Android device or Fuchsia device |
| * One Access Point |
| """ |
| |
| def setup_class(self) -> None: |
| super().setup_class() |
| self.log = logging.getLogger() |
| |
| self.fuchsia_device: FuchsiaDevice | None = None |
| |
| device_type = self.user_params.get("dut", "fuchsia_devices") |
| if device_type == "fuchsia_devices": |
| self.fuchsia_device = self.fuchsia_devices[0] |
| self.dut = create_wlan_device(self.fuchsia_device, AssociationMode.POLICY) |
| elif device_type == "android_devices": |
| self.dut = create_wlan_device( |
| self.android_devices[0], AssociationMode.POLICY |
| ) |
| else: |
| raise ValueError( |
| f'Invalid "dut" type specified in config: "{device_type}".' |
| 'Expected "fuchsia_devices" or "android_devices".' |
| ) |
| |
| self.access_point: AccessPoint = self.access_points[0] |
| self.access_point.stop_all_aps() |
| |
| def setup_test(self) -> None: |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockAcquireBright() |
| ad.droid.wakeUpNow() |
| self.dut.wifi_toggle_state(True) |
| |
| def teardown_test(self) -> None: |
| if hasattr(self, "android_devices"): |
| for ad in self.android_devices: |
| ad.droid.wakeLockRelease() |
| ad.droid.goToSleepNow() |
| self.dut.turn_location_off_and_scan_toggle_off() |
| self.dut.disconnect() |
| self.dut.reset_wifi() |
| self.access_point.stop_all_aps() |
| |
| def connect(self, ap_params: APParams) -> None: |
| asserts.assert_true( |
| self.dut.associate( |
| ap_params.ssid, |
| target_pwd=ap_params.security.password, |
| target_security=ap_params.security.security_mode, |
| ), |
| "Failed to connect.", |
| ) |
| |
| def setup_ap(self) -> APParams: |
| """Generates a hostapd config and sets up the AP with that config. |
| |
| Does not run a DHCP server. |
| |
| Returns: |
| APParams for the newly setup AP. |
| """ |
| ssid = utils.rand_ascii_str(20) |
| security = Security( |
| security_mode=SecurityMode.WPA2, |
| password=generate_random_password(length=20), |
| wpa_cipher="CCMP", |
| wpa2_cipher="CCMP", |
| ) |
| password = security.password |
| |
| ap_ids = setup_ap( |
| access_point=self.access_point, |
| profile_name="whirlwind", |
| mode=hostapd_constants.MODE_11N_MIXED, |
| channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, |
| n_capabilities=[], |
| ac_capabilities=[], |
| force_wmm=True, |
| ssid=ssid, |
| security=security, |
| ) |
| |
| if len(ap_ids) > 1: |
| raise Exception("Expected only one SSID on AP") |
| |
| configured_subnets = self.access_point.get_configured_subnets() |
| if len(configured_subnets) > 1: |
| raise Exception("Expected only one subnet on AP") |
| router_ip = configured_subnets[0].router |
| network = configured_subnets[0].network |
| |
| self.access_point.stop_dhcp() |
| |
| return APParams( |
| id=ap_ids[0], |
| ssid=ssid, |
| security=security, |
| ip=router_ip, |
| network=network, |
| ) |
| |
| def device_can_ping(self, dest_ip: IPv4Address) -> bool: |
| """Checks if the DUT can ping the given address. |
| |
| Returns: True if can ping, False otherwise""" |
| self.log.info(f"Attempting to ping {dest_ip}...") |
| ping_result = self.dut.can_ping(str(dest_ip), count=2) |
| if ping_result: |
| self.log.info(f"Success pinging: {dest_ip}") |
| else: |
| self.log.info(f"Failure pinging: {dest_ip}") |
| return ping_result |
| |
| def get_device_ipv4_addr( |
| self, interface: str | None = None, timeout_sec: float = 20.0 |
| ) -> IPv4Address: |
| """Checks if device has an ipv4 private address. |
| |
| Only supported on Fuchsia. |
| |
| Args: |
| interface: name of interface from which to get ipv4 address. |
| timeout: seconds to wait until raising ConnectionError |
| |
| Raises: |
| ConnectionError, if DUT does not have an ipv4 address after all |
| timeout. |
| |
| Returns: |
| The device's IP address |
| """ |
| if self.fuchsia_device is None: |
| # TODO(http://b/292289291): Add get_(ipv4|ipv6)_addr to SupportsIP. |
| raise TypeError( |
| "TODO(http://b/292289291): get_device_ipv4_addr only supports " |
| "FuchsiaDevice" |
| ) |
| |
| self.log.debug("Fetching updated WLAN interface list") |
| if interface is None: |
| interface = self.dut.get_default_wlan_test_interface() |
| self.log.info( |
| "Checking if DUT has received an ipv4 addr on iface %s. Will retry for %s " |
| "seconds." % (interface, timeout_sec) |
| ) |
| timeout_sec = time.time() + timeout_sec |
| while time.time() < timeout_sec: |
| ip_addrs = self.fuchsia_device.get_interface_ip_addresses(interface) |
| |
| if len(ip_addrs["ipv4_private"]) > 0: |
| ip = ip_addrs["ipv4_private"][0] |
| self.log.info(f"DUT has an ipv4 address: {ip}") |
| return IPv4Address(ip) |
| else: |
| self.log.debug( |
| "DUT does not yet have an ipv4 address...retrying in 1 " "second." |
| ) |
| time.sleep(1) |
| else: |
| raise ConnectionError("DUT failed to get an ipv4 address.") |
| |
| def run_test_case_expect_dhcp_success( |
| self, dhcp_parameters: dict[str, str], dhcp_options: dict[str, int | str] |
| ) -> None: |
| """Starts the AP and DHCP server, and validates that the client |
| connects and obtains an address. |
| |
| Args: |
| dhcp_parameters: a dictionary of DHCP parameters |
| dhcp_options: a dictionary of DHCP options |
| """ |
| ap_params = self.setup_ap() |
| subnet_conf = dhcp_config.Subnet( |
| subnet=ap_params.network, |
| router=ap_params.ip, |
| additional_parameters=dhcp_parameters, |
| additional_options=dhcp_options, |
| ) |
| dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) |
| |
| self.log.debug("DHCP Configuration:\n" + dhcp_conf.render_config_file() + "\n") |
| |
| self.access_point.start_dhcp(dhcp_conf=dhcp_conf) |
| self.connect(ap_params=ap_params) |
| |
| # Typical log lines look like: |
| # dhcpd[26695]: DHCPDISCOVER from f8:0f:f9:3d:ce:d1 via wlan1 |
| # dhcpd[26695]: DHCPOFFER on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1 |
| # dhcpd[26695]: DHCPREQUEST for 192.168.9.2 (192.168.9.1) from f8:0f:f9:3d:ce:d1 via wlan1 |
| # dhcpd[26695]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1 |
| |
| try: |
| ip = self.get_device_ipv4_addr() |
| except ConnectionError: |
| dhcp_logs = self.access_point.get_dhcp_logs() |
| self.log.warn(dhcp_logs) |
| asserts.fail(f"DUT failed to get an IP address") |
| |
| # Get updates to DHCP logs |
| dhcp_logs = self.access_point.get_dhcp_logs() |
| if dhcp_logs is None: |
| raise signals.TestFailure("No DHCP logs") |
| |
| expected_string = f"DHCPDISCOVER from" |
| asserts.assert_equal( |
| dhcp_logs.count(expected_string), |
| 1, |
| f'Incorrect count of DHCP Discovers ("{expected_string}") in logs:\n' |
| + dhcp_logs |
| + "\n", |
| ) |
| |
| expected_string = f"DHCPOFFER on {ip}" |
| asserts.assert_equal( |
| dhcp_logs.count(expected_string), |
| 1, |
| f'Incorrect count of DHCP Offers ("{expected_string}") in logs:\n' |
| + dhcp_logs |
| + "\n", |
| ) |
| |
| expected_string = f"DHCPREQUEST for {ip}" |
| asserts.assert_true( |
| dhcp_logs.count(expected_string) >= 1, |
| f'Incorrect count of DHCP Requests ("{expected_string}") in logs: ' |
| + dhcp_logs |
| + "\n", |
| ) |
| |
| expected_string = f"DHCPACK on {ip}" |
| asserts.assert_true( |
| dhcp_logs.count(expected_string) >= 1, |
| f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' |
| + dhcp_logs |
| + "\n", |
| ) |
| |
| asserts.assert_true( |
| self.device_can_ping(ap_params.ip), |
| f"DUT failed to ping router at {ap_params.ip}", |
| ) |