blob: 3bde6dd1853b696e7a6f047f2d1ef6326931e1d7 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Script for testing various download stress scenarios.
"""
import logging
import threading
from mobly import signals, test_runner
from antlion.controllers.access_point import setup_ap
from antlion.controllers.ap_lib import hostapd_constants
from antlion.test_utils.abstract_devices.wlan_device import (
AssociationMode,
create_wlan_device,
)
from antlion.test_utils.fuchsia import utils
from antlion.test_utils.wifi import base_test
from antlion.utils import rand_ascii_str
class DownloadStressTest(base_test.WifiBaseTest):
# Default number of test iterations here.
# Override using parameter in config file.
# Eg: "download_stress_test_iterations": "10"
num_of_iterations = 3
# Timeout for download thread in seconds
download_timeout_s = 60 * 5
# Download urls
url_20MB = "http://ipv4.download.thinkbroadband.com/20MB.zip"
url_40MB = "http://ipv4.download.thinkbroadband.com/40MB.zip"
url_60MB = "http://ipv4.download.thinkbroadband.com/60MB.zip"
url_512MB = "http://ipv4.download.thinkbroadband.com/512MB.zip"
# Constants used in test_one_large_multiple_small_downloads
download_small_url = url_20MB
download_large_url = url_512MB
num_of_small_downloads = 5
download_threads_result = []
def setup_class(self):
super().setup_class()
self.log = logging.getLogger()
self.ssid = rand_ascii_str(10)
self.fuchsia_device = self.fuchsia_devices[0]
self.dut = create_wlan_device(self.fuchsia_devices[0], AssociationMode.POLICY)
self.access_point = self.access_points[0]
self.num_of_iterations = int(
self.user_params.get(
"download_stress_test_iterations", self.num_of_iterations
)
)
setup_ap(
access_point=self.access_point,
profile_name="whirlwind",
channel=hostapd_constants.AP_DEFAULT_CHANNEL_2G,
ssid=self.ssid,
)
self.dut.associate(self.ssid)
def teardown_test(self):
self.download_threads_result.clear()
self.dut.disconnect()
self.dut.reset_wifi()
self.download_ap_logs()
self.access_point.stop_all_aps()
super().teardown_test()
def test_download_small(self):
self.log.info("Downloading small file")
return self.download_file(self.url_20MB)
def test_download_large(self):
return self.download_file(self.url_512MB)
def test_continuous_download(self):
for x in range(0, self.num_of_iterations):
if not self.download_file(self.url_512MB):
return False
return True
def download_file(self, url):
self.log.info(f"Start downloading: {url}")
return utils.http_file_download_by_curl(
self.fuchsia_device,
url,
additional_args=f"--max-time {self.download_timeout_s} --silent",
)
def download_thread(self, url):
download_status = self.download_file(url)
if download_status:
self.log.info(f"Success downloading: {url}")
else:
self.log.info(f"Failure downloading: {url}")
self.download_threads_result.append(download_status)
return download_status
def test_multi_downloads(self):
download_urls = [self.url_20MB, self.url_40MB, self.url_60MB]
download_threads = []
try:
# Start multiple downloads at the same time
for index, url in enumerate(download_urls):
self.log.info(f"Create and start thread {index}.")
t = threading.Thread(target=self.download_thread, args=(url,))
download_threads.append(t)
t.start()
# Wait for all threads to complete or timeout
for t in download_threads:
t.join(self.download_timeout_s)
finally:
is_alive = False
for index, t in enumerate(download_threads):
if t.isAlive():
t = None
is_alive = True
if is_alive:
raise signals.TestFailure(f"Thread {index} timedout")
for index in range(0, len(self.download_threads_result)):
if not self.download_threads_result[index]:
self.log.info(f"Download failed for {index}")
raise signals.TestFailure(f"Thread {index} failed to download")
return False
return True
def test_one_large_multiple_small_downloads(self):
for index in range(self.num_of_iterations):
download_threads = []
try:
large_thread = threading.Thread(
target=self.download_thread, args=(self.download_large_url,)
)
download_threads.append(large_thread)
large_thread.start()
for i in range(self.num_of_small_downloads):
# Start small file download
t = threading.Thread(
target=self.download_thread, args=(self.download_small_url,)
)
download_threads.append(t)
t.start()
# Wait for thread to exit before starting the next iteration
t.join(self.download_timeout_s)
# Wait for the large file download thread to complete
large_thread.join(self.download_timeout_s)
finally:
is_alive = False
for index, t in enumerate(download_threads):
if t.isAlive():
t = None
is_alive = True
if is_alive:
raise signals.TestFailure(f"Thread {index} timedout")
for index in range(0, len(self.download_threads_result)):
if not self.download_threads_result[index]:
self.log.info(f"Download failed for {index}")
raise signals.TestFailure(f"Thread {index} failed to download")
return False
# Clear results before looping again
self.download_threads_result.clear()
return True
if __name__ == "__main__":
test_runner.main()