blob: d817c86e32ec9461cf8d59f1cc50e0d8345eee3f [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
# http://www.secdev.org/projects/scapy/
# On ubuntu, sudo pip3 install scapy
import scapy.all as scapy
from antlion.controllers.adb_lib.error import AdbCommandError
from antlion.controllers.ap_lib import bridge_interface as bi
from antlion.controllers.ap_lib import hostapd_ap_preset, hostapd_security
from antlion.libs.proc import job
from antlion.test_utils.wifi import wifi_test_utils as wutils
GET_FROM_PHONE = "get_from_dut"
GET_FROM_AP = "get_from_ap"
ENABLED_MODULATED_DTIM = "gEnableModulatedDTIM="
MAX_MODULATED_DTIM = "gMaxLIModulatedDTIM="
def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=10):
"""Function to change the DTIM setting in the phone.
Args:
ad: the target android device, AndroidDevice object
gEnableModulatedDTIM: Modulated DTIM, int
gMaxLIModulatedDTIM: Maximum modulated DTIM, int
"""
ad.log.info(f"Sets dtim to {gEnableModulatedDTIM}")
# In P21 the dtim setting method changed and an AdbCommandError will take
# place to get ini_file_phone. Thus add try/except block for the old method.
# If error occurs, use change_dtim_adb method later. Otherwise, first trying
# to find the ini file with DTIM settings
try:
ini_file_phone = ad.adb.shell("ls /vendor/firmware/wlan/*/*.ini")
except AdbCommandError as e:
# Gets AdbCommandError, change dtim later with change_dtim_adb merthod.
# change_dtim_adb requires that wifi connection is on.
ad.log.info(
f"Gets AdbCommandError, change dtim with change_dtim_adb. Error: {e}"
)
change_dtim_adb(ad, gEnableModulatedDTIM)
return 0
ini_file_local = ini_file_phone.split("/")[-1]
# Pull the file and change the DTIM to desired value
ad.adb.pull(f"{ini_file_phone} {ini_file_local}")
with open(ini_file_local, "r") as fin:
for line in fin:
if ENABLED_MODULATED_DTIM in line:
gE_old = line.strip("\n")
gEDTIM_old = line.strip(ENABLED_MODULATED_DTIM).strip("\n")
if MAX_MODULATED_DTIM in line:
gM_old = line.strip("\n")
gMDTIM_old = line.strip(MAX_MODULATED_DTIM).strip("\n")
fin.close()
if (
int(gEDTIM_old) == gEnableModulatedDTIM
and int(gMDTIM_old) == gMaxLIModulatedDTIM
):
ad.log.info("Current DTIM is already the desired value," "no need to reset it")
return 0
gE_new = ENABLED_MODULATED_DTIM + str(gEnableModulatedDTIM)
gM_new = MAX_MODULATED_DTIM + str(gMaxLIModulatedDTIM)
sed_gE = f"sed -i 's/{gE_old}/{gE_new}/g' {ini_file_local}"
sed_gM = f"sed -i 's/{gM_old}/{gM_new}/g' {ini_file_local}"
job.run(sed_gE)
job.run(sed_gM)
# Push the file to the phone
push_file_to_phone(ad, ini_file_local, ini_file_phone)
ad.log.info("DTIM changes checked in and rebooting...")
ad.reboot()
# Wait for auto-wifi feature to start
time.sleep(20)
ad.adb.shell("dumpsys battery set level 100")
ad.log.info("DTIM updated and device back from reboot")
return 1
def change_dtim_adb(ad, gEnableModulatedDTIM):
"""Function to change the DTIM setting in the P21 phone.
This method should be run after connecting wifi.
Args:
ad: the target android device, AndroidDevice object
gEnableModulatedDTIM: Modulated DTIM, int
"""
ad.log.info(f"Changes DTIM to {gEnableModulatedDTIM} with adb")
ad.adb.root()
screen_status = ad.adb.shell("dumpsys nfc | grep Screen")
screen_is_on = "ON_UNLOCKED" in screen_status
# To read the dtim with 'adb shell wl bcn_li_dtim', the screen should be off
if screen_is_on:
ad.log.info("The screen is on. Set it to off before change dtim")
ad.droid.goToSleepNow()
time_limit_seconds = 60
_wait_screen_off(ad, time_limit_seconds)
old_dtim = _read_dtim_adb(ad)
ad.log.info(f"The dtim before change is {old_dtim}")
try:
if int(old_dtim) == gEnableModulatedDTIM:
ad.log.info(
"Current DTIM is already the desired value," "no need to reset it"
)
if screen_is_on:
ad.log.info("Changes the screen to the original on status")
ad.droid.wakeUpNow()
return
except Exception as e:
ad.log.info(f"old_dtim is not available from adb: {e}")
current_dtim = _set_dtim(ad, gEnableModulatedDTIM)
ad.log.info(f"Old DTIM is {old_dtim}, current DTIM is {current_dtim}")
if screen_is_on:
ad.log.info("Changes the screen to the original on status")
ad.droid.wakeUpNow()
def _set_dtim(ad, gEnableModulatedDTIM):
out = ad.adb.shell(f"halutil -dtim_config {gEnableModulatedDTIM}")
ad.log.info(f"set dtim to {gEnableModulatedDTIM}, stdout: {out}")
return _read_dtim_adb(ad)
def _read_dtim_adb(ad):
try:
old_dtim = ad.adb.shell("wl bcn_li_dtim")
return old_dtim
except Exception as e:
ad.log.info(f"When reading dtim get error {e}")
return "The dtim value is not available from adb"
def _wait_screen_off(ad, time_limit_seconds):
while time_limit_seconds > 0:
screen_status = ad.adb.shell("dumpsys nfc | grep Screen")
if "OFF_UNLOCKED" in screen_status:
ad.log.info(f"The screen status is {screen_status}")
return
time.sleep(1)
time_limit_seconds -= 1
raise TimeoutError(
f"Timed out while waiting the screen off after {time_limit_seconds} seconds."
)
def push_file_to_phone(ad, file_local, file_phone):
"""Function to push local file to android phone.
Args:
ad: the target android device
file_local: the locla file to push
file_phone: the file/directory on the phone to be pushed
"""
ad.adb.root()
cmd_out = ad.adb.remount()
if "Permission denied" in cmd_out:
ad.log.info("Need to disable verity first and reboot")
ad.adb.disable_verity()
time.sleep(1)
ad.reboot()
ad.log.info("Verity disabled and device back from reboot")
ad.adb.root()
ad.adb.remount()
time.sleep(1)
ad.adb.push(f"{file_local} {file_phone}")
def ap_setup(ap, network, bandwidth=80, dtim_period=None):
"""Set up the whirlwind AP with provided network info.
Args:
ap: access_point object of the AP
network: dict with information of the network, including ssid, password
bssid, channel etc.
bandwidth: the operation bandwidth for the AP, default 80MHz
dtim_period: the dtim period of access point
Returns:
brconfigs: the bridge interface configs
"""
log = logging.getLogger()
bss_settings = []
ssid = network[wutils.WifiEnums.SSID_KEY]
if "password" in network.keys():
password = network["password"]
security = hostapd_security.Security(security_mode="wpa", password=password)
else:
security = hostapd_security.Security(security_mode=None, password=None)
channel = network["channel"]
config = hostapd_ap_preset.create_ap_preset(
channel=channel,
ssid=ssid,
dtim_period=dtim_period,
security=security,
bss_settings=bss_settings,
vht_bandwidth=bandwidth,
profile_name="whirlwind",
iface_wlan_2g=ap.wlan_2g,
iface_wlan_5g=ap.wlan_5g,
)
config_bridge = ap.generate_bridge_configs(channel)
brconfigs = bi.BridgeInterfaceConfigs(
config_bridge[0], config_bridge[1], config_bridge[2]
)
ap.bridge.startup(brconfigs)
ap.start_ap(config)
log.info(f"AP started on channel {channel} with SSID {ssid}")
return brconfigs
def run_iperf_client_nonblocking(ad, server_host, extra_args=""):
"""Start iperf client on the device with nohup.
Return status as true if iperf client start successfully.
And data flow information as results.
Args:
ad: the android device under test
server_host: Address of the iperf server.
extra_args: A string representing extra arguments for iperf client,
e.g. "-i 1 -t 30".
"""
log = logging.getLogger()
ad.adb.shell_nb(
f"nohup >/dev/null 2>&1 sh -c 'iperf3 -c {server_host} {extra_args} &'"
)
log.info("IPerf client started")
def get_wifi_rssi(ad):
"""Get the RSSI of the device.
Args:
ad: the android device under test
Returns:
RSSI: the rssi level of the device
"""
RSSI = ad.droid.wifiGetConnectionInfo()["rssi"]
return RSSI
def get_phone_ip(ad):
"""Get the WiFi IP address of the phone.
Args:
ad: the android device under test
Returns:
IP: IP address of the phone for WiFi, as a string
"""
IP = ad.droid.connectivityGetIPv4Addresses("wlan0")[0]
return IP
def get_phone_mac(ad):
"""Get the WiFi MAC address of the phone.
Args:
ad: the android device under test
Returns:
mac: MAC address of the phone for WiFi, as a string
"""
mac = ad.droid.wifiGetConnectionInfo()["mac_address"]
return mac
def get_phone_ipv6(ad):
"""Get the WiFi IPV6 address of the phone.
Args:
ad: the android device under test
Returns:
IPv6: IPv6 address of the phone for WiFi, as a string
"""
IPv6 = ad.droid.connectivityGetLinkLocalIpv6Address("wlan0")[:-6]
return IPv6
def wait_for_dhcp(interface_name):
"""Wait the DHCP address assigned to desired interface.
Getting DHCP address takes time and the wait time isn't constant. Utilizing
utils.timeout to keep trying until success
Args:
interface_name: desired interface name
Returns:
ip: ip address of the desired interface name
Raise:
TimeoutError: After timeout, if no DHCP assigned, raise
"""
log = logging.getLogger()
reset_host_interface(interface_name)
start_time = time.time()
time_limit_seconds = 60
ip = "0.0.0.0"
while start_time + time_limit_seconds > time.time():
ip = scapy.get_if_addr(interface_name)
if ip == "0.0.0.0":
time.sleep(1)
else:
log.info(f"DHCP address assigned to {interface_name} as {ip}")
return ip
raise TimeoutError(
f"Timed out while getting if_addr after {time_limit_seconds} seconds."
)
def reset_host_interface(intferface_name):
"""Reset the host interface.
Args:
intferface_name: the desired interface to reset
"""
log = logging.getLogger()
intf_down_cmd = f"ifconfig {intferface_name} down"
intf_up_cmd = f"ifconfig {intferface_name} up"
try:
job.run(intf_down_cmd)
time.sleep(10)
job.run(intf_up_cmd)
log.info(f"{intferface_name} has been reset")
except job.Error:
raise Exception("No such interface")
def bringdown_host_interface(intferface_name):
"""Reset the host interface.
Args:
intferface_name: the desired interface to reset
"""
log = logging.getLogger()
intf_down_cmd = f"ifconfig {intferface_name} down"
try:
job.run(intf_down_cmd)
time.sleep(2)
log.info(f"{intferface_name} has been brought down")
except job.Error:
raise Exception("No such interface")
def create_pkt_config(test_class):
"""Creates the config for generating multicast packets
Args:
test_class: object with all networking paramters
Returns:
Dictionary with the multicast packet config
"""
addr_type = (
scapy.IPV6_ADDR_LINKLOCAL
if test_class.ipv6_src_type == "LINK_LOCAL"
else scapy.IPV6_ADDR_GLOBAL
)
mac_dst = test_class.mac_dst
if GET_FROM_PHONE in test_class.mac_dst:
mac_dst = get_phone_mac(test_class.dut)
ipv4_dst = test_class.ipv4_dst
if GET_FROM_PHONE in test_class.ipv4_dst:
ipv4_dst = get_phone_ip(test_class.dut)
ipv6_dst = test_class.ipv6_dst
if GET_FROM_PHONE in test_class.ipv6_dst:
ipv6_dst = get_phone_ipv6(test_class.dut)
ipv4_gw = test_class.ipv4_gwt
if GET_FROM_AP in test_class.ipv4_gwt:
ipv4_gw = test_class.access_point.ssh_settings.hostname
pkt_gen_config = {
"interf": test_class.pkt_sender.interface,
"subnet_mask": test_class.sub_mask,
"src_mac": test_class.mac_src,
"dst_mac": mac_dst,
"src_ipv4": test_class.ipv4_src,
"dst_ipv4": ipv4_dst,
"src_ipv6": test_class.ipv6_src,
"src_ipv6_type": addr_type,
"dst_ipv6": ipv6_dst,
"gw_ipv4": ipv4_gw,
}
return pkt_gen_config