blob: e5b8fcef4d8cb0cc84c9a8a14c8a16bc179939b2 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
import time
from acts import logger
from acts import signals
import typing
if typing.TYPE_CHECKING:
from acts.controllers.fuchsia_device import FuchsiaDevice
SAVED_NETWORKS = "saved_networks"
CLIENT_STATE = "client_connections_state"
CONNECTIONS_ENABLED = "ConnectionsEnabled"
CONNECTIONS_DISABLED = "ConnectionsDisabled"
STATE_CONNECTED = 'Connected'
STATE_CONNECTING = 'Connecting'
STATE_DISCONNECTED = 'Disconnected'
STATE_CONNECTION_STOPPED = 'ConnectionStopped'
class WlanPolicyControllerError(signals.ControllerError):
pass
class WlanPolicyController:
"""Contains methods related to the wlan policy layer, to be used in the
FuchsiaDevice object.
"""
def __init__(self, fuchsia_device):
self.device: FuchsiaDevice = fuchsia_device
self.log = logger.create_tagged_trace_logger(
'WlanPolicyController for FuchsiaDevice | %s' % self.device.ip)
self.client_controller = False
self.preserved_networks_and_client_state = None
self.policy_configured = False
self._paused_session = False
def _configure_wlan(self, preserve_saved_networks, timeout=15):
"""Sets up wlan policy layer.
Args:
preserve_saved_networks: bool, whether to clear existing saved
networks and client state, to be restored at test close.
"""
end_time = time.time() + timeout
# Kill basemgr (Component v1 version of session manager)
while time.time() < end_time:
response = self.device.basemgr_lib.killBasemgr()
if not response.get('error'):
self.log.debug('Basemgr kill call successfully issued.')
break
self.log.debug(response['error'])
time.sleep(1)
else:
raise WlanPolicyControllerError(
'Failed to issue successful basemgr kill call.')
# Stop the session manager, which also holds the Policy controller.
response = self.device.session_manager_lib.pauseSession()
if response.get('error'):
self.log.error('Failed to stop the session.')
raise WlanPolicyControllerError(response['error'])
else:
if response.get('result') == 'Success':
self._paused_session = True
self.log.debug(f"Paused session: {response.get('result')}")
# Acquire control of policy layer
while time.time() < end_time:
# Create a client controller
response = self.device.wlan_policy_lib.wlanCreateClientController()
if response.get('error'):
self.log.debug(response['error'])
time.sleep(1)
continue
# Attempt to use the client controller (failure indicates a closed
# channel, meaning the client controller was rejected.
response = self.device.wlan_policy_lib.wlanGetSavedNetworks()
if response.get('error'):
self.log.debug(response['error'])
time.sleep(1)
continue
break
else:
raise WlanPolicyControllerError(
'Failed to create and use a WLAN policy client controller.')
self.log.info('ACTS tests now have control of the WLAN policy layer.')
if preserve_saved_networks and not self.preserved_networks_and_client_state:
self.preserved_networks_and_client_state = self.remove_and_preserve_networks_and_client_state(
)
if not self.start_client_connections():
raise WlanPolicyControllerError(
'Failed to start client connections during configuration.')
self.policy_configured = True
def _deconfigure_wlan(self):
if not self.stop_client_connections():
raise WlanPolicyControllerError(
'Failed to stop client connections during deconfiguration.')
self.policy_configured = False
def _clean_up(self):
if self.preserved_networks_and_client_state:
# It is possible for policy to have been configured before, but
# deconfigured before test end. In this case, in must be setup
# before restoring networks
if not self.policy_configured:
self._configure_wlan()
self.restore_preserved_networks_and_client_state()
if self._paused_session:
response = self.device.session_manager_lib.resumeSession()
if response.get('error'):
self.log.warning('Failed to resume the session.')
self.log.warning(response['error'])
else:
self.log.debug(f"Resumed session: {response.get('result')}")
def start_client_connections(self):
"""Allow device to connect to networks via policy layer (including
autoconnecting to saved networks).
Returns:
True, if successful. False otherwise."""
start_response = self.device.wlan_policy_lib.wlanStartClientConnections(
)
if start_response.get('error'):
self.log.error('Failed to start client connections. Err: %s' %
start_response['error'])
return False
return True
def stop_client_connections(self):
"""Prevent device from connecting and autoconnecting to networks via the
policy layer.
Returns:
True, if successful. False otherwise."""
stop_response = self.device.wlan_policy_lib.wlanStopClientConnections()
if stop_response.get('error'):
self.log.error('Failed to stop client connections. Err: %s' %
stop_response['error'])
return False
return True
def save_and_connect(self, ssid, security, password=None, timeout=30):
""" Saves and connects to the network. This is the policy version of
connect and check_connect_response because the policy layer
requires a saved network and the policy connect does not return
success or failure
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
# Save network and check response
if not self.save_network(ssid, security, password=password):
return False
# Make connect call and check response
self.device.wlan_policy_lib.wlanSetNewListener()
if not self.send_connect_command(ssid, security):
return False
return self.wait_for_connect(ssid, security, timeout=timeout)
def save_and_wait_for_autoconnect(self,
ssid,
security,
password=None,
timeout=30):
"""Saves a network and waits, expecting an autoconnection to the newly
saved network. This differes from save_and_connect, as it doesn't
expressly trigger a connection first. There are cases in which an
autoconnect won't occur after a save (like if the device is connected
already), so this should be used with caution to test very specific
situations.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
if not self.save_network(ssid, security, password=password):
return False
return self.wait_for_connect(ssid, security, timeout=timeout)
def remove_and_wait_for_disconnect(self,
ssid,
security_type,
password=None,
state=None,
status=None,
timeout=30):
"""Removes a single network and waits for a disconnect. It is not
guaranteed the device will stay disconnected, as it may autoconnect
to a different saved network.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
state: string, The connection state we are expecting, ie "Disconnected" or
"Failed"
status: string, The disconnect status we expect, it "ConnectionStopped" or
"ConnectionFailed"
timeout: int, time in seconds to wait for connection
Returns:
True, if successful. False otherwise.
"""
self.device.wlan_policy_lib.wlanSetNewListener()
if not self.remove_network(ssid, security_type, password=password):
return False
return self.wait_for_disconnect(ssid,
security_type,
state=state,
status=status,
timeout=timeout)
def remove_all_networks_and_wait_for_no_connections(self, timeout=30):
"""Removes all networks and waits until device is not connected to any
networks. This should be used as the policy version of disconnect.
Returns:
True, if successful. False otherwise.
"""
self.device.wlan_policy_lib.wlanSetNewListener()
if not self.remove_all_networks():
self.log.error('Failed to remove all networks. Cannot continue to '
'wait_for_no_connections.')
return False
return self.wait_for_no_connections(timeout=timeout)
def save_network(self, ssid, security_type, password=None):
"""Save a network via the policy layer.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if successful. False otherwise.
"""
save_response = self.device.wlan_policy_lib.wlanSaveNetwork(
ssid, security_type, target_pwd=password)
if save_response.get('error'):
self.log.error('Failed to save network %s with error: %s' %
(ssid, save_response['error']))
return False
return True
def remove_network(self, ssid, security_type, password=None):
"""Remove a saved network via the policy layer.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if successful. False otherwise.
"""
remove_response = self.device.wlan_policy_lib.wlanRemoveNetwork(
ssid, security_type, target_pwd=password)
if remove_response.get('error'):
self.log.error('Failed to remove network %s with error: %s' %
(ssid, remove_response['error']))
return False
return True
def remove_all_networks(self):
"""Removes all saved networks from device.
Returns:
True, if successful. False otherwise.
"""
remove_all_response = self.device.wlan_policy_lib.wlanRemoveAllNetworks(
)
if remove_all_response.get('error'):
self.log.error('Error occurred removing all networks: %s' %
remove_all_response['error'])
return False
return True
def get_saved_networks(self):
"""Retrieves saved networks from device.
Returns:
list of saved networks
Raises:
WlanPolicyControllerError, if retrieval fails.
"""
saved_networks_response = self.device.wlan_policy_lib.wlanGetSavedNetworks(
)
if saved_networks_response.get('error'):
raise WlanPolicyControllerError(
'Failed to retrieve saved networks: %s' %
saved_networks_response['error'])
return saved_networks_response['result']
def send_connect_command(self, ssid, security_type):
"""Sends a connect command to a network that is already saved. This does
not wait to guarantee the connection is successful (for that, use
save_and_connect).
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
password: string, the credential of the network if applicable
Returns:
True, if command send successfully. False otherwise.
"""
connect_response = self.device.wlan_policy_lib.wlanConnect(
ssid, security_type)
if connect_response.get('error'):
self.log.error(
'Error occurred when sending policy connect command: %s' %
connect_response['error'])
return False
return True
def wait_for_connect(self, ssid, security_type, timeout=30):
""" Wait until the device has connected to the specified network.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
timeout: int, seconds to wait for a update showing connection
Returns:
True if we see a connect to the network, False otherwise.
"""
security_type = str(security_type)
# Wait until we've connected.
end_time = time.time() + timeout
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.device.wlan_policy_lib.wlanGetUpdate(
timeout=time_left)
except requests.exceptions.Timeout:
self.log.error('Timed out waiting for response from device '
'while waiting for network with SSID "%s" to '
'connect. Device took too long to connect or '
'the request timed out for another reason.' %
ssid)
self.device.wlan_policy_lib.wlanSetNewListener()
return False
if update.get('error'):
# This can occur for many reasons, so it is not necessarily a
# failure.
self.log.debug('Error occurred getting status update: %s' %
update['error'])
continue
for network in update['result']['networks']:
if network['id']['ssid'] == ssid or network['id'][
'type_'].lower() == security_type.lower():
if 'state' not in network:
raise WlanPolicyControllerError(
'WLAN status missing state field.')
elif network['state'].lower() == STATE_CONNECTED.lower():
return True
# Wait a bit before requesting another status update
time.sleep(1)
# Stopped getting updates because out timeout
self.log.error('Timed out waiting for network with SSID "%s" to '
"connect" % ssid)
return False
def wait_for_disconnect(self,
ssid,
security_type,
state=None,
status=None,
timeout=30):
""" Wait for a disconnect of the specified network on the given device. This
will check that the correct connection state and disconnect status are
given in update. If we do not see a disconnect after some time,
return false.
Args:
ssid: string, the network name
security: string, security type of network (see wlan_policy_lib)
state: string, The connection state we are expecting, ie "Disconnected" or
"Failed"
status: string, The disconnect status we expect, it "ConnectionStopped" or
"ConnectionFailed"
timeout: int, seconds to wait before giving up
Returns: True if we saw a disconnect as specified, or False otherwise.
"""
if not state:
state = STATE_DISCONNECTED
if not status:
status = STATE_CONNECTION_STOPPED
end_time = time.time() + timeout
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.device.wlan_policy_lib.wlanGetUpdate(
timeout=time_left)
except requests.exceptions.Timeout:
self.log.error(
'Timed out waiting for response from device '
'while waiting for network with SSID "%s" to '
'disconnect. Device took too long to disconnect '
'or the request timed out for another reason.' % ssid)
self.device.wlan_policy_lib.wlanSetNewListener()
return False
if update.get('error'):
# This can occur for many reasons, so it is not necessarily a
# failure.
self.log.debug('Error occurred getting status update: %s' %
update['error'])
continue
# Update should include network, either connected to or recently disconnected.
if len(update['result']['networks']) == 0:
raise WlanPolicyControllerError(
'WLAN state update is missing network.')
for network in update['result']['networks']:
if network['id']['ssid'] == ssid or network['id'][
'type_'].lower() == security_type.lower():
if 'state' not in network or 'status' not in network:
raise WlanPolicyControllerError(
'Client state summary\'s network is missing fields'
)
# If still connected, we will wait for another update and check again
elif network['state'].lower() == STATE_CONNECTED.lower():
continue
elif network['state'].lower() == STATE_CONNECTING.lower():
self.log.error(
'Update is "Connecting", but device should already be '
'connected; expected disconnect')
return False
# Check that the network state and disconnect status are expected, ie
# that it isn't ConnectionFailed when we expect ConnectionStopped
elif network['state'].lower() != state.lower(
) or network['status'].lower() != status.lower():
self.log.error(
'Connection failed: a network failure occurred that is unrelated'
'to remove network or incorrect status update. \nExpected state: '
'%s, Status: %s,\nActual update: %s' %
(state, status, network))
return False
else:
return True
# Wait a bit before requesting another status update
time.sleep(1)
# Stopped getting updates because out timeout
self.log.error('Timed out waiting for network with SSID "%s" to '
'connect' % ssid)
return False
def wait_for_no_connections(self, timeout=30):
""" Waits to see that there are no existing connections the device. This
is the simplest way to watch for disconnections when only a single
network is saved/present.
Args:
timeout: int, time in seconds to wait to see no connections
Returns:
True, if successful. False, if still connected after timeout.
"""
# If there are already no existing connections when this function is called,
# then an update won't be generated by the device, and we'll time out.
# Force an update by getting a new listener.
self.device.wlan_policy_lib.wlanSetNewListener()
end_time = time.time() + timeout
while time.time() < end_time:
time_left = max(1, int(end_time - time.time()))
try:
update = self.device.wlan_policy_lib.wlanGetUpdate(
timeout=time_left)
except requests.exceptions.Timeout:
self.log.info(
"Timed out getting status update while waiting for all"
" connections to end.")
self.device.wlan_policy_lib.wlanSetNewListener()
return False
if update["error"] != None:
self.log.info("Failed to get status update")
return False
# If any network is connected or being connected to, wait for them
# to disconnect.
if any(network['state'].lower() in
{STATE_CONNECTED.lower(),
STATE_CONNECTING.lower()}
for network in update['result']['networks']):
continue
else:
return True
return False
def remove_and_preserve_networks_and_client_state(self):
""" Preserves networks already saved on devices before removing them to
setup up for a clean test environment. Records the state of client
connections before tests.
Raises:
WlanPolicyControllerError, if the network removal is unsuccessful
"""
# Save preexisting saved networks
preserved_networks_and_state = {}
saved_networks_response = self.device.wlan_policy_lib.wlanGetSavedNetworks(
)
if saved_networks_response.get('error'):
raise WlanPolicyControllerError(
'Failed to get preexisting saved networks: %s' %
saved_networks_response['error'])
if saved_networks_response.get('result') != None:
preserved_networks_and_state[
SAVED_NETWORKS] = saved_networks_response['result']
# Remove preexisting saved networks
if not self.remove_all_networks():
raise WlanPolicyControllerError(
'Failed to clear networks and disconnect at FuchsiaDevice creation.'
)
self.device.wlan_policy_lib.wlanSetNewListener()
update_response = self.device.wlan_policy_lib.wlanGetUpdate()
update_result = update_response.get('result', {})
if update_result.get('state'):
preserved_networks_and_state[CLIENT_STATE] = update_result['state']
else:
self.log.warn('Failed to get update; test will not start or '
'stop client connections at the end of the test.')
self.log.info('Saved networks cleared and preserved.')
return preserved_networks_and_state
def restore_preserved_networks_and_client_state(self):
""" Restore saved networks and client state onto device if they have
been preserved.
"""
if not self.remove_all_networks():
self.log.warn('Failed to remove saved networks before restore.')
restore_success = True
for network in self.preserved_networks_and_client_state[
SAVED_NETWORKS]:
if not self.save_network(network["ssid"], network["security_type"],
network["credential_value"]):
self.log.warn('Failed to restore network (%s).' %
network['ssid'])
restore_success = False
starting_state = self.preserved_networks_and_client_state[CLIENT_STATE]
if starting_state == CONNECTIONS_ENABLED:
state_restored = self.start_client_connections()
else:
state_restored = self.stop_client_connections()
if not state_restored:
self.log.warn('Failed to restore client connections state.')
restore_success = False
if restore_success:
self.log.info('Preserved networks and client state restored.')
self.preserved_networks_and_client_state = None
return restore_success