blob: 035bd829c9c10002ccc13a31d2389056c5e9359e [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from antlion import logger, signals
from antlion.controllers.ap_lib.hostapd_security import FuchsiaSecurityType
from antlion.controllers.fuchsia_lib.sl4f import SL4F
from antlion.controllers.fuchsia_lib.ssh import FuchsiaSSHProvider
from antlion.controllers.fuchsia_lib.wlan_lib import FuchsiaWlanLib
SAVED_NETWORKS = "saved_networks"
CLIENT_STATE = "client_connections_state"
CONNECTIONS_ENABLED = "ConnectionsEnabled"
CONNECTIONS_DISABLED = "ConnectionsDisabled"
STATE_CONNECTED = "Connected"
STATE_CONNECTING = "Connecting"
STATE_DISCONNECTED = "Disconnected"
STATE_CONNECTION_STOPPED = "ConnectionStopped"
SESSION_MANAGER_TIMEOUT_SEC = 10
FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT = 30
DEFAULT_GET_UPDATE_TIMEOUT = 60
class WlanPolicyControllerError(signals.ControllerError):
pass
@dataclass
class PreservedState:
saved_networks: Optional[List[Dict[str, Any]]]
client_connections_state: Optional[str]
@dataclass
class GetCountryCodeFromPhyParams:
wlan_lib: FuchsiaWlanLib
id: str
def get_country_code_from_phy(params: GetCountryCodeFromPhyParams) -> Tuple[str, str]:
get_country_response = params.wlan_lib.wlanGetCountry(params.id)
if get_country_response.get("error"):
raise ConnectionError(
f"Failed to query PHY ID ({params.id}) for country. "
f'Error: {get_country_response["error"]}'
)
country_code = "".join(
[chr(ascii_char) for ascii_char in get_country_response["result"]]
)
return (params.id, country_code)
class WlanPolicyController:
"""Contains methods related to the wlan policy layer, to be used in the
FuchsiaDevice object.
Attributes:
sl4f: sl4f module for communicating to the WLAN policy controller.
ssh: transport to fuchsia device to stop component processes.
"""
def __init__(self, sl4f: SL4F, ssh: FuchsiaSSHProvider):
"""
Args:
sl4f: sl4f module for communicating to the WLAN policy controller.
ssh: transport to fuchsia device to stop component processes.
"""
self.preserved_networks_and_client_state: Optional[PreservedState] = None
self.policy_configured = False
self.sl4f = sl4f
self.ssh = ssh
self.log = logger.create_tagged_trace_logger(
f"WlanPolicyController | {self.ssh.config.host_name}"
)
def configure_wlan(
self,
preserve_saved_networks: bool,
timeout_sec: int = FUCHSIA_DEFAULT_WLAN_CONFIGURE_TIMEOUT,
) -> None:
"""Sets up wlan policy layer.
Args:
preserve_saved_networks: whether to clear existing saved
networks and client state, to be restored at test close.
timeout_sec: time to wait for device to configure WLAN.
"""
# We need to stop session manager to free control of
# fuchsia.wlan.policy.ClientController, which can only be used by a
# single caller at a time. SL4F needs the ClientController to trigger
# WLAN policy state changes. On eng builds the session_manager can be
# restarted after being stopped during reboot so we attempt killing the
# session manager process for 10 seconds.
# See https://cs.opensource.google/fuchsia/fuchsia/+/main:sdk/fidl/fuchsia.wlan.policy/client_provider.fidl
if "cast_agent.cm" in self.ssh.run("ps").stdout:
end_time_session_manager_sec = time.time() + SESSION_MANAGER_TIMEOUT_SEC
while time.time() < end_time_session_manager_sec:
self.ssh.stop_component("session_manager", is_cfv2_component=True)
# Acquire control of policy layer
end_time_config_sec = time.time() + timeout_sec
controller_errors = []
while time.time() < end_time_config_sec:
# Create a client controller
response = self.sl4f.wlan_policy_lib.wlanCreateClientController()
if response.get("error"):
controller_errors.append(response["error"])
self.log.debug(response["error"])
time.sleep(1)
continue
break
else:
self.log.warning(
"Failed to create and use a WLAN policy client controller. Errors: ["
+ "; ".join(controller_errors)
+ "]"
)
raise WlanPolicyControllerError(
"Failed to create and use a WLAN policy client controller."
)
self.log.info("ACTS tests now have control of the WLAN policy layer.")
if preserve_saved_networks and not self.preserved_networks_and_client_state:
self.preserved_networks_and_client_state = (
self.remove_and_preserve_networks_and_client_state()
)
if not self.start_client_connections():
raise WlanPolicyControllerError(
"Failed to start client connections during configuration."
)
self.policy_configured = True
def _deconfigure_wlan(self) -> None:
if not self.stop_client_connections():
raise WlanPolicyControllerError(
"Failed to stop client connections during deconfiguration."
)
self.policy_configured = False
def clean_up(self) -> None:
if self.preserved_networks_and_client_state is not None:
# It is possible for policy to have been configured before, but
# deconfigured before test end. In this case, in must be setup
# before restoring networks
if not self.policy_configured:
self.configure_wlan(False)
self.restore_preserved_networks_and_client_state()
def start_client_connections(self) -> bool:
"""Allow device to connect to networks via policy layer (including
autoconnecting to saved networks).
Returns:
True, if successful. False otherwise."""
start_response = self.sl4f.wlan_policy_lib.wlanStartClientConnections()
if start_response.get("error"):
self.log.error(
f"Failed to start client connections. Err: {start_response['error']}"
)
return False
return True
def stop_client_connections(self) -> bool:
"""Prevent device from connecting and autoconnecting to networks via the
policy layer.
Returns:
True, if successful. False otherwise."""
stop_response = self.sl4f.wlan_policy_lib.wlanStopClientConnections()
if stop_response.get("error"):
self.log.error(
f"Failed to stop client connections. Err: {stop_response['error']}"
)
return False
return True
def save_and_connect(
self,
ssid: str,
security: FuchsiaSecurityType,
password: Optional[str] = None,
timeout: int = 30,
):
"""Saves and connects to the network. This is the policy version of
connect and check_connect_response because the policy layer
requires a saved network and the policy connect does not return
success or failure
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
# Save network and check response
if not self.save_network(ssid, security, password=password):
return False
# Make connect call and check response
self.sl4f.wlan_policy_lib.wlanSetNewListener()
if not self.send_connect_command(ssid, security):
return False
return self.wait_for_connect(ssid, security, timeout=timeout)
def save_and_wait_for_autoconnect(
self,
ssid: str,
security: FuchsiaSecurityType,
password: Optional[str] = None,
timeout: int = 30,
) -> bool:
"""Saves a network and waits, expecting an autoconnection to the newly
saved network. This differes from save_and_connect, as it doesn't
expressly trigger a connection first. There are cases in which an
autoconnect won't occur after a save (like if the device is connected
already), so this should be used with caution to test very specific
situations.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
if not self.save_network(ssid, security, password=password):
return False
return self.wait_for_connect(ssid, security, timeout=timeout)
def remove_and_wait_for_disconnect(
self,
ssid: str,
security_type: FuchsiaSecurityType,
password: Optional[str] = None,
state: Optional[str] = None,
status: Optional[str] = None,
timeout: int = 30,
) -> bool:
"""Removes a single network and waits for a disconnect. It is not
guaranteed the device will stay disconnected, as it may autoconnect
to a different saved network.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
state: string, The connection state we are expecting, ie "Disconnected" or
"Failed"
status: string, The disconnect status we expect, it "ConnectionStopped" or
"ConnectionFailed"
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
self.sl4f.wlan_policy_lib.wlanSetNewListener()
if not self.remove_network(ssid, security_type, password=password):
return False
return self.wait_for_disconnect(
ssid, security_type, state=state, status=status, timeout=timeout
)
def remove_all_networks_and_wait_for_no_connections(
self, timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT
) -> bool:
"""Removes all networks and waits until device is not connected to any
networks. This should be used as the policy version of disconnect.
Args:
timeout_sec: The time to wait to see no connections.
Returns:
True, if successful. False otherwise.
"""
self.sl4f.wlan_policy_lib.wlanSetNewListener()
if not self.remove_all_networks():
self.log.error(
"Failed to remove all networks. Cannot continue to "
"wait_for_no_connections."
)
return False
return self.wait_for_no_connections(timeout_sec=timeout_sec)
def save_network(
self,
ssid: str,
security_type: FuchsiaSecurityType,
password: Optional[str] = None,
) -> bool:
"""Save a network via the policy layer.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if successful. False otherwise.
"""
save_response = self.sl4f.wlan_policy_lib.wlanSaveNetwork(
ssid, security_type, target_pwd=password
)
if save_response.get("error"):
self.log.error(
f"Failed to save network {ssid} with error: {save_response['error']}"
)
return False
return True
def remove_network(
self,
ssid: str,
security_type: FuchsiaSecurityType,
password: Optional[str] = None,
) -> bool:
"""Remove a saved network via the policy layer.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if successful. False otherwise.
"""
remove_response = self.sl4f.wlan_policy_lib.wlanRemoveNetwork(
ssid, security_type, target_pwd=password
)
if remove_response.get("error"):
self.log.error(
"Failed to remove network %s with error: %s"
% (ssid, remove_response["error"])
)
return False
return True
def remove_all_networks(self) -> bool:
"""Removes all saved networks from device.
Returns:
True, if successful. False otherwise.
"""
remove_all_response = self.sl4f.wlan_policy_lib.wlanRemoveAllNetworks()
if remove_all_response.get("error"):
self.log.error(
f"Error occurred removing all networks: {remove_all_response['error']}"
)
return False
return True
def get_saved_networks(self) -> List[Any]:
"""Retrieves saved networks from device.
Returns:
list of saved networks
Raises:
WlanPolicyControllerError, if retrieval fails.
"""
saved_networks_response = self.sl4f.wlan_policy_lib.wlanGetSavedNetworks()
if saved_networks_response.get("error"):
raise WlanPolicyControllerError(
f"Failed to retrieve saved networks: {saved_networks_response['error']}"
)
return saved_networks_response["result"]
def send_connect_command(
self, ssid: str, security_type: FuchsiaSecurityType
) -> bool:
"""Sends a connect command to a network that is already saved. This does
not wait to guarantee the connection is successful (for that, use
save_and_connect).
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if command send successfully. False otherwise.
"""
connect_response = self.sl4f.wlan_policy_lib.wlanConnect(ssid, security_type)
if connect_response.get("error"):
self.log.error(
"Error occurred when sending policy connect command: %s"
% connect_response["error"]
)
return False
return True
def wait_for_connect(
self, ssid: str, security_type: FuchsiaSecurityType, timeout: int = 30
) -> bool:
"""Wait until the device has connected to the specified network.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
timeout: int, seconds to wait for a update showing connection
Returns:
True if we see a connect to the network, False otherwise.
"""
# Wait until we've connected.
end_time = time.time() + timeout
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left)
except TimeoutError:
self.log.error(
"Timed out waiting for response from device "
'while waiting for network with SSID "%s" to '
"connect. Device took too long to connect or "
"the request timed out for another reason." % ssid
)
self.sl4f.wlan_policy_lib.wlanSetNewListener()
return False
if update.get("error"):
# This can occur for many reasons, so it is not necessarily a
# failure.
self.log.debug(
f"Error occurred getting status update: {update['error']}"
)
continue
for network in update["result"]["networks"]:
if network["id"]["ssid"] == ssid or network["id"][
"type_"
].lower() == str(security_type):
if "state" not in network:
raise WlanPolicyControllerError(
"WLAN status missing state field."
)
elif network["state"].lower() == STATE_CONNECTED.lower():
return True
# Wait a bit before requesting another status update
time.sleep(1)
# Stopped getting updates because out timeout
self.log.error(f'Timed out waiting for network with SSID "{ssid}" to connect')
return False
def wait_for_disconnect(
self,
ssid: str,
security_type: FuchsiaSecurityType,
state: Optional[str] = None,
status: Optional[str] = None,
timeout: int = 30,
) -> bool:
"""Wait for a disconnect of the specified network on the given device. This
will check that the correct connection state and disconnect status are
given in update. If we do not see a disconnect after some time,
return false.
Args:
ssid: string, the network name
security: string, security type of network (see sl4f.wlan_policy_lib)
state: string, The connection state we are expecting, ie "Disconnected" or
"Failed"
status: string, The disconnect status we expect, it "ConnectionStopped" or
"ConnectionFailed"
timeout: int, seconds to wait before giving up
Returns: True if we saw a disconnect as specified, or False otherwise.
"""
if not state:
state = STATE_DISCONNECTED
if not status:
status = STATE_CONNECTION_STOPPED
end_time = time.time() + timeout
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left)
except TimeoutError:
self.log.error(
"Timed out waiting for response from device "
'while waiting for network with SSID "%s" to '
"disconnect. Device took too long to disconnect "
"or the request timed out for another reason." % ssid
)
self.sl4f.wlan_policy_lib.wlanSetNewListener()
return False
if update.get("error"):
# This can occur for many reasons, so it is not necessarily a
# failure.
self.log.debug(
f"Error occurred getting status update: {update['error']}"
)
continue
# Update should include network, either connected to or recently disconnected.
if len(update["result"]["networks"]) == 0:
raise WlanPolicyControllerError("WLAN state update is missing network.")
for network in update["result"]["networks"]:
if network["id"]["ssid"] == ssid or network["id"][
"type_"
].lower() == str(security_type):
if "state" not in network or "status" not in network:
raise WlanPolicyControllerError(
"Client state summary's network is missing fields"
)
# If still connected, we will wait for another update and check again
elif network["state"].lower() == STATE_CONNECTED.lower():
continue
elif network["state"].lower() == STATE_CONNECTING.lower():
self.log.error(
'Update is "Connecting", but device should already be '
"connected; expected disconnect"
)
return False
# Check that the network state and disconnect status are expected, ie
# that it isn't ConnectionFailed when we expect ConnectionStopped
elif (
network["state"].lower() != state.lower()
or network["status"].lower() != status.lower()
):
self.log.error(
"Connection failed: a network failure occurred that is unrelated"
"to remove network or incorrect status update. \nExpected state: "
f"{state}, Status: {status},\nActual update: {network}"
)
return False
else:
return True
# Wait a bit before requesting another status update
time.sleep(1)
# Stopped getting updates because out timeout
self.log.error(f'Timed out waiting for network with SSID "{ssid}" to connect')
return False
def wait_for_no_connections(
self, timeout_sec: int = DEFAULT_GET_UPDATE_TIMEOUT
) -> bool:
"""Waits to see that there are no existing connections the device. This
is the simplest way to watch for disconnections when only a single
network is saved/present.
Args:
timeout_sec: The time to wait to see no connections.
Returns:
True, if successful. False, if still connected after timeout.
"""
# If there are already no existing connections when this function is called,
# then an update won't be generated by the device, and we'll time out.
# Force an update by getting a new listener.
self.sl4f.wlan_policy_lib.wlanSetNewListener()
end_time = time.time() + timeout_sec
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.sl4f.wlan_policy_lib.wlanGetUpdate(timeout=time_left)
except TimeoutError:
self.log.info(
"Timed out getting status update while waiting for all"
" connections to end."
)
self.sl4f.wlan_policy_lib.wlanSetNewListener()
return False
if update["error"] != None:
self.log.info("Failed to get status update")
return False
# If any network is connected or being connected to, wait for them
# to disconnect.
if any(
network["state"].lower()
in {STATE_CONNECTED.lower(), STATE_CONNECTING.lower()}
for network in update["result"]["networks"]
):
continue
else:
return True
return False
def remove_and_preserve_networks_and_client_state(self) -> PreservedState:
"""Preserves networks already saved on devices before removing them to
setup up for a clean test environment. Records the state of client
connections before tests.
Raises:
WlanPolicyControllerError, if the network removal is unsuccessful
"""
# Save preexisting saved networks
state = PreservedState(saved_networks=None, client_connections_state=None)
saved_networks_response = self.sl4f.wlan_policy_lib.wlanGetSavedNetworks()
if saved_networks_response.get("error"):
raise WlanPolicyControllerError(
"Failed to get preexisting saved networks: "
f'{saved_networks_response["error"]}'
)
state.saved_networks = saved_networks_response.get("result")
# Remove preexisting saved networks
if not self.remove_all_networks():
raise WlanPolicyControllerError(
"Failed to clear networks and disconnect at FuchsiaDevice creation."
)
self.sl4f.wlan_policy_lib.wlanSetNewListener()
update_response = self.sl4f.wlan_policy_lib.wlanGetUpdate()
update_result = update_response.get("result", {})
state.client_connections_state = update_result.get("state")
if update_result.get("state") is None:
self.log.warn(
"Failed to get update; test will not start or "
"stop client connections at the end of the test."
)
self.log.info("Saved networks cleared and preserved.")
return state
def restore_preserved_networks_and_client_state(self) -> None:
"""Restore saved networks and client state onto device if they have
been preserved.
"""
if self.preserved_networks_and_client_state is None:
self.log.info("No preserved networks or client state to restore")
return
if not self.remove_all_networks():
self.log.warn("Failed to remove saved networks before restore.")
restore_success = True
saved_networks = self.preserved_networks_and_client_state.saved_networks
if saved_networks is not None:
for network in saved_networks:
if not self.save_network(
network["ssid"],
network["security_type"],
network["credential_value"],
):
self.log.warn(f'Failed to restore network "{network["ssid"]}"')
restore_success = False
client_state = self.preserved_networks_and_client_state.client_connections_state
if client_state is not None:
state_restored = (
self.start_client_connections()
if client_state == CONNECTIONS_ENABLED
else self.stop_client_connections()
)
if not state_restored:
self.log.warn("Failed to restore client connections state.")
restore_success = False
if restore_success:
self.log.info("Preserved networks and client state restored.")
self.preserved_networks_and_client_state = None
def _get_phy_ids(self) -> List[str]:
"""Get a list of WLAN physical interfaces."""
phy_ids_response = self.sl4f.wlan_lib.wlanPhyIdList()
if phy_ids_response.get("error"):
raise ConnectionError(
f'Failed to get phy ids from DUT. Error: {phy_ids_response["error"]}'
)
return phy_ids_response["result"]
def _get_phy_country_codes(self) -> Dict[str, str]:
"""Get mapping of WLAN interfaces to the country code they are set to."""
phy_ids = self._get_phy_ids()
args = [GetCountryCodeFromPhyParams(self.sl4f.wlan_lib, id) for id in phy_ids]
with multiprocessing.Pool() as p:
items = p.map(get_country_code_from_phy, args)
return dict(items)
def set_country_code(self, country_code: str, timeout_sec: int = 10) -> None:
"""Set and verify the country code on the device.
Args:
country_code: 2 character country code to set
timeout_sec: seconds to wait until timing out
Raises:
TimeoutError: the device did not set all interfaces to country_code within
timeout_sec
"""
unique_country_codes = set(self._get_phy_country_codes().values())
if len(unique_country_codes) == 1 and country_code in unique_country_codes:
# The country code is already set on all WLAN phys; skip setting the country
# code again.
self.log.debug(f"Country code already set to {country_code}")
return
self.log.debug(f"Setting DUT country code to {country_code}")
country_code_response = self.sl4f.regulatory_region_lib.setRegion(country_code)
if country_code_response.get("error"):
raise EnvironmentError(
f"Failed to set country code ({country_code}) on DUT. "
f'Error: {country_code_response["error"]}'
)
self.log.debug(
f"Verifying DUT country code was correctly set to {country_code}."
)
end_time = time.time() + timeout_sec
while time.time() < end_time:
for phy_id, code in self._get_phy_country_codes().items():
if code != country_code:
# Interface has incorrect country code; continue to wait.
self.log.debug(
f"WLAN interface {phy_id} has incorrect country code set: "
f"got {code}, wanted {country_code}"
)
break
else:
# All interfaces have the expected country code.
self.log.info(f"Successfully set DUT country code to {country_code}.")
return
time.sleep(0.5)
raise TimeoutError(
f"Failed to set country code to {country_code} within {timeout_sec}s"
)