blob: b051448e568f6cf1d3383b5a2381223c3ce121d5 [file] [log] [blame]
#!/usr/bin/env python3.4
#
# Copyright 2020 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import itertools
import logging
import os
from acts import asserts
from acts import base_test
from acts import utils
from acts.controllers import iperf_server as ipf
from acts.controllers import iperf_client as ipc
from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi import ota_sniffer
from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
from functools import partial
from WifiRvrTest import WifiRvrTest
AccessPointTuple = collections.namedtuple(('AccessPointTuple'),
['ap_settings'])
class WifiTdlsRvrTest(WifiRvrTest):
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
self.testcase_metric_logger = (
BlackboxMappedMetricLogger.for_test_case())
self.testclass_metric_logger = (
BlackboxMappedMetricLogger.for_test_class())
self.publish_testcase_metrics = True
def setup_class(self):
"""Initializes common test hardware and parameters.
This function initializes hardwares and compiles parameters that are
common to all tests in this class.
"""
req_params = [
'tdls_rvr_test_params', 'testbed_params', 'RetailAccessPoints'
]
opt_params = ['ap_networks', 'OTASniffer']
self.unpack_userparams(req_params, opt_params)
self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
self.testclass_params = self.tdls_rvr_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
self.iperf_server = ipf.create([{
'AndroidDevice':
self.android_devices[0].serial,
'port':
'5201'
}])[0]
self.iperf_client = ipc.create([{
'AndroidDevice':
self.android_devices[1].serial,
'port':
'5201'
}])[0]
self.log_path = os.path.join(logging.log_path, 'results')
if hasattr(self,
'OTASniffer') and self.testbed_params['sniffer_enable']:
self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
os.makedirs(self.log_path, exist_ok=True)
if not hasattr(self, 'golden_files_list'):
if 'golden_results_path' in self.testbed_params:
self.golden_files_list = [
os.path.join(self.testbed_params['golden_results_path'],
file) for file in
os.listdir(self.testbed_params['golden_results_path'])
]
else:
self.log.warning('No golden files found.')
self.golden_files_list = []
self.testclass_results = []
# Turn WiFi ON
if self.testclass_params.get('airplane_mode', 1):
self.log.info('Turning on airplane mode.')
for ad in self.android_devices:
asserts.assert_true(utils.force_airplane_mode(ad, True),
"Can not turn on airplane mode.")
for ad in self.android_devices:
wutils.wifi_toggle_state(ad, True)
def teardown_class(self):
# Turn WiFi OFF
for dev in self.android_devices:
wutils.wifi_toggle_state(dev, False)
self.process_testclass_results()
# Teardown AP and release its lockfile
self.access_point.teardown()
def setup_test(self):
for ad in self.android_devices:
wputils.start_wifi_logging(ad)
def teardown_test(self):
self.iperf_server.stop()
for ad in self.android_devices:
wutils.reset_wifi(ad)
wputils.stop_wifi_logging(ad)
def on_exception(self, test_name, begin_time):
for ad in self.android_devices:
ad.take_bug_report(test_name, begin_time)
ad.cat_adb_log(test_name, begin_time)
wutils.get_ssrdumps(ad)
def compute_test_metrics(self, rvr_result):
#Set test metrics
rvr_result['metrics'] = {}
rvr_result['metrics']['peak_tput'] = max(
rvr_result['throughput_receive'])
if self.publish_testcase_metrics:
self.testcase_metric_logger.add_metric(
'peak_tput', rvr_result['metrics']['peak_tput'])
test_mode = rvr_result['testcase_params']['mode']
tput_below_limit = [
tput <
self.testclass_params['tput_metric_targets'][test_mode]['high']
for tput in rvr_result['throughput_receive']
]
rvr_result['metrics']['high_tput_range'] = -1
for idx in range(len(tput_below_limit)):
if all(tput_below_limit[idx:]):
if idx == 0:
#Throughput was never above limit
rvr_result['metrics']['high_tput_range'] = -1
else:
rvr_result['metrics']['high_tput_range'] = rvr_result[
'total_attenuation'][max(idx, 1) - 1]
break
if self.publish_testcase_metrics:
self.testcase_metric_logger.add_metric(
'high_tput_range', rvr_result['metrics']['high_tput_range'])
tput_below_limit = [
tput <
self.testclass_params['tput_metric_targets'][test_mode]['low']
for tput in rvr_result['throughput_receive']
]
for idx in range(len(tput_below_limit)):
if all(tput_below_limit[idx:]):
rvr_result['metrics']['low_tput_range'] = rvr_result[
'total_attenuation'][max(idx, 1) - 1]
break
else:
rvr_result['metrics']['low_tput_range'] = -1
if self.publish_testcase_metrics:
self.testcase_metric_logger.add_metric(
'low_tput_range', rvr_result['metrics']['low_tput_range'])
def setup_aps(self, testcase_params):
self.log.info('Setting AP to channel {} {}'.format(
testcase_params['channel'], testcase_params['bandwidth']))
self.access_point.set_channel(testcase_params['interface_id'],
testcase_params['channel'])
self.access_point.set_bandwidth(testcase_params['interface_id'],
testcase_params['bandwidth'])
def setup_duts(self, testcase_params):
# Check battery level before test
for ad in self.android_devices:
if not wputils.health_check(ad, 20):
asserts.skip('Overheating or Battery low. Skipping test.')
ad.go_to_sleep()
wutils.reset_wifi(ad)
# Turn screen off to preserve battery
for ad in self.android_devices:
wutils.wifi_connect(
ad,
self.ap_networks[0][testcase_params['interface_id']],
num_of_tries=5,
check_connectivity=True)
def setup_tdls_connection(self, testcase_params):
tdls_config = {}
for idx, ad in enumerate(self.android_devices):
tdls_config[idx] = {
'ip_address':
ad.droid.connectivityGetIPv4Addresses('wlan0')[0],
'mac_address': ad.droid.wifiGetConnectionInfo()['mac_address'],
'tdls_supported': ad.droid.wifiIsTdlsSupported(),
'off_channel_supported':
ad.droid.wifiIsOffChannelTdlsSupported()
}
self.android_devices[0].droid.wifiSetTdlsEnabledWithMacAddress(
tdls_config[1]['mac_address'], True)
testcase_params['iperf_server_address'] = tdls_config[0]['ip_address']
testcase_params['tdls_config'] = tdls_config
testcase_params['channel'] = testcase_params['channel']
testcase_params['mode'] = testcase_params['bandwidth']
testcase_params['test_network'] = self.ap_networks[0][
testcase_params['interface_id']]
def setup_tdls_rvr_test(self, testcase_params):
# Setup the aps
self.setup_aps(testcase_params)
# Setup the duts
self.setup_duts(testcase_params)
# Set attenuator to 0 dB
for attenuator in self.attenuators:
attenuator.set_atten(0, strict=False)
# Setup the aware connection
self.setup_tdls_connection(testcase_params)
# Set DUT to monitor RSSI and LLStats on
self.monitored_dut = self.android_devices[1]
def compile_test_params(self, testcase_params):
"""Function that completes all test params based on the test name.
Args:
testcase_params: dict containing test-specific parameters
"""
for ad in self.android_devices:
wputils.check_skip_conditions(testcase_params, ad,
self.access_point)
# Compile RvR parameters
num_atten_steps = int((self.testclass_params['atten_stop'] -
self.testclass_params['atten_start']) /
self.testclass_params['atten_step'])
testcase_params['atten_range'] = [
self.testclass_params['atten_start'] +
x * self.testclass_params['atten_step']
for x in range(0, num_atten_steps)
]
# Compile iperf arguments
if testcase_params['traffic_type'] == 'TCP':
testcase_params['iperf_socket_size'] = self.testclass_params.get(
'tcp_socket_size', None)
testcase_params['iperf_processes'] = self.testclass_params.get(
'tcp_processes', 1)
elif testcase_params['traffic_type'] == 'UDP':
testcase_params['iperf_socket_size'] = self.testclass_params.get(
'udp_socket_size', None)
testcase_params['iperf_processes'] = self.testclass_params.get(
'udp_processes', 1)
testcase_params['iperf_args'] = wputils.get_iperf_arg_string(
duration=self.testclass_params['iperf_duration'],
reverse_direction=(testcase_params['traffic_direction'] == 'DL'),
socket_size=testcase_params['iperf_socket_size'],
num_processes=testcase_params['iperf_processes'],
traffic_type=testcase_params['traffic_type'],
ipv6=False)
testcase_params['use_client_output'] = (
testcase_params['traffic_direction'] == 'DL')
# Compile AP and infrastructure connection parameters
testcase_params['interface_id'] = '2G' if testcase_params[
'channel'] < 13 else '5G_1'
return testcase_params
def _test_tdls_rvr(self, testcase_params):
""" Function that gets called for each test case
Args:
testcase_params: dict containing test-specific parameters
"""
# Compile test parameters from config and test name
testcase_params = self.compile_test_params(testcase_params)
# Prepare devices and run test
self.setup_tdls_rvr_test(testcase_params)
rvr_result = self.run_rvr_test(testcase_params)
# Post-process results
self.testclass_results.append(rvr_result)
self.process_test_results(rvr_result)
self.pass_fail_check(rvr_result)
def generate_test_cases(self, ap_config_list, traffic_type,
traffic_directions):
"""Function that auto-generates test cases for a test class."""
test_cases = []
for ap_config, traffic_direction in itertools.product(
ap_config_list, traffic_directions):
test_name = 'test_tdls_rvr_{}_{}_ch{}_{}'.format(
traffic_type, traffic_direction, ap_config[0], ap_config[1])
test_params = collections.OrderedDict(
traffic_type=traffic_type,
traffic_direction=traffic_direction,
channel=ap_config[0],
bandwidth=ap_config[1])
test_class = self.__class__.__name__
if "uuid_list" in self.user_params:
test_tracker_uuid = self.user_params["uuid_list"][
test_class][test_name]
test_case = test_tracker_info(uuid=test_tracker_uuid)(
lambda: self._test_tdls_rvr(test_params))
else:
test_case = partial(self._test_tdls_rvr,test_params)
setattr(self, test_name, test_case)
test_cases.append(test_name)
return test_cases
class WifiTdlsRvr_FCC_TCP_Test(WifiTdlsRvrTest):
def __init__(self, controllers):
super().__init__(controllers)
ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'],
[36, 'bw80'], [149, 'bw20'], [149, 'bw40'],
[149, 'bw80']]
self.country_code = 'US'
self.tests = self.generate_test_cases(ap_config_list=ap_config_list,
traffic_type='TCP',
traffic_directions=['DL', 'UL'])
class WifiTdlsRvr_FCC_UDP_Test(WifiTdlsRvrTest):
def __init__(self, controllers):
super().__init__(controllers)
ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'],
[36, 'bw80'], [149, 'bw20'], [149, 'bw40'],
[149, 'bw80']]
self.country_code = 'US'
self.tests = self.generate_test_cases(ap_config_list=ap_config_list,
traffic_type='UDP',
traffic_directions=['DL', 'UL'])
class WifiTdlsRvr_ETSI_TCP_Test(WifiTdlsRvrTest):
def __init__(self, controllers):
super().__init__(controllers)
ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'],
[36, 'bw80'], [149, 'bw20'], [149, 'bw40'],
[149, 'bw80']]
self.country_code = 'GB'
self.tests = self.generate_test_cases(ap_config_list=ap_config_list,
traffic_type='TCP',
traffic_directions=['DL', 'UL'])
class WifiTdlsRvr_ETSI_UDP_Test(WifiTdlsRvrTest):
def __init__(self, controllers):
super().__init__(controllers)
ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'],
[36, 'bw80'], [149, 'bw20'], [149, 'bw40'],
[149, 'bw80']]
self.country_code = 'GB'
self.tests = self.generate_test_cases(ap_config_list=ap_config_list,
traffic_type='UDP',
traffic_directions=['DL', 'UL'])