blob: 43566a1ff6ccff66a1d5fecdb75eb6a0b871c225 [file] [log] [blame]
#
# Copyright 2021 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from acts import asserts
from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
from scapy.all import rdpcap, DNSRR, DNSQR, IP, IPv6
WLAN = "wlan0"
PING_ADDR = "google.com"
class DNSTest(WifiBaseTest):
"""DNS related test for Android."""
def setup_class(self):
self.dut = self.android_devices[0]
wutils.wifi_test_device_init(self.dut)
req_params = []
opt_param = ["wifi_network", "configure_OpenWrt"]
self.unpack_userparams(
req_param_names=req_params, opt_param_names=opt_param)
asserts.assert_true(OPENWRT in self.user_params,
"OpenWrtAP is not in testbed.")
self.openwrt = self.access_points[0]
if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip":
self.dut.log.info("Skip configure Wifi interface due to config setup.")
else:
self.configure_openwrt_ap_and_start(wpa_network=True)
self.wifi_network = self.openwrt.get_wifi_network()
asserts.assert_true(self.openwrt.verify_wifi_status(),
"OpenWrt Wifi interface is not ready.")
def teardown_class(self):
"""Reset wifi to make sure VPN tears down cleanly."""
wutils.reset_wifi(self.dut)
def teardown_test(self):
"""Reset wifi to make sure VPN tears down cleanly."""
wutils.reset_wifi(self.dut)
def ping(self, addr, ignore_status=True, timeout=60):
"""Start a ping from DUT and return ping result.
Args:
addr: Address to ping.
ignore_status: ignore non zero return.
timeout: cmd timeout.
Returns:
Boolean for ping result.
"""
return "100%" not in self.dut.adb.shell("ping -c 1 %s" % addr,
ignore_status=ignore_status,
timeout=timeout)
def generate_query_qname(self):
"""Return a random query name."""
return "%s-ds.metric.gstatic.com" % random.randint(0, 99999999)
@test_tracker_info(uuid="dd7b8c92-c0f4-4403-a0ae-57a703162d83")
def test_dns_query(self):
# Setup environment
wutils.connect_to_wifi_network(self.dut, self.wifi_network)
# Start tcpdump on OpenWrt
remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
# Generate query name
test_qname = self.generate_query_qname()
self.dut.log.info("Test query name = %s" % test_qname)
# Start send a query
ping_result = self.ping(test_qname)
local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path,
self.dut.device_log_path)
# Check DNSRR.rrname in tcpdump to verify DNS response
packets = rdpcap(local_pcap_path)
self.dut.log.info("pcap file path : %s" % local_pcap_path)
pkt_count = 0
for pkt in packets:
if pkt.haslayer(DNSRR) and pkt[DNSRR].rrname.decode().strip(".") == test_qname:
pkt_count = pkt_count + 1
self.dut.log.info("DNS query response count : %s" % pkt_count)
if not ping_result:
asserts.assert_true(pkt_count > 0,
"Did not find match standard query response in tcpdump.")
asserts.assert_true(ping_result, "Device ping fail.")
@test_tracker_info(uuid="cd20c6e7-9c2e-4286-b08e-c8e40e413da5")
def test_dns_query_retransmit(self):
# Setup environment
wutils.connect_to_wifi_network(self.dut, self.wifi_network)
self.openwrt.network_setting.block_dns_response()
# Start tcpdump on OpenWrt
remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name)
# Generate query name
test_qname = self.generate_query_qname()
self.dut.log.info("Test query name = %s" % test_qname)
# Start send a query
self.ping(test_qname)
local_pcap_path = self.openwrt.network_setting.stop_tcpdump(remote_pcap_path,
self.dut.device_log_path)
# Check DNSQR.qname in tcpdump to verify device retransmit the query
packets = rdpcap(local_pcap_path)
self.dut.log.info("pcap file path : %s" % local_pcap_path)
pkt_count = 0
pkt6_count = 0
for pkt in packets:
if pkt.haslayer(DNSQR) and pkt[DNSQR].qname.decode().strip(".") == test_qname:
if pkt.haslayer(IP):
pkt_count = pkt_count + 1
if pkt.haslayer(IPv6):
pkt6_count = pkt6_count + 1
self.dut.log.info("IPv4 DNS query count : %s" % pkt_count)
self.dut.log.info("IPv6 DNS query count : %s" % pkt6_count)
self.openwrt.network_setting.unblock_dns_response()
asserts.assert_true(pkt_count >= 2 or pkt6_count >= 2,
"Did not find match standard query in tcpdump.")