blob: 8bff2b2e665e57ca399875e69b6b90e0de1157a2 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2021, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import ipaddress
import json
import logging
import unittest
import config
import thread_cert
# Test description:
# This test verifies DNS-SD server works on a BR and is accessible from a Host.
#
# Topology:
# ----------------(eth)--------------------
# | |
# BR1 (Leader, Server) HOST
# / \
# CLIENT1 CLIENT2
SERVER = BR1 = 1
CLIENT1, CLIENT2 = 2, 3
HOST = 4
DOMAIN = 'default.service.arpa.'
SERVICE = '_testsrv._udp'
SERVICE_FULL_NAME = f'{SERVICE}.{DOMAIN}'
VALID_SERVICE_NAMES = [
'_abc._udp.default.service.arpa.',
'_abc._tcp.default.service.arpa.',
]
WRONG_SERVICE_NAMES = [
'_testsrv._udp.default.service.xxxx.',
'_testsrv._txp,default.service.arpa.',
]
class TestDnssdServerOnBr(thread_cert.TestCase):
USE_MESSAGE_FACTORY = False
TOPOLOGY = {
BR1: {
'name': 'SERVER',
'is_otbr': True,
'version': '1.2',
},
CLIENT1: {
'name': 'CLIENT1',
},
CLIENT2: {
'name': 'CLIENT2',
},
HOST: {
'name': 'Host',
'is_host': True
},
}
def test(self):
server = br1 = self.nodes[BR1]
client1 = self.nodes[CLIENT1]
client2 = self.nodes[CLIENT2]
digger = host = self.nodes[HOST]
host.start(start_radvd=False)
self.simulator.go(5)
br1.start()
self.simulator.go(config.LEADER_STARTUP_DELAY)
self.assertEqual('leader', br1.get_state())
server.srp_server_set_enabled(True)
server.dns_upstream_query_state = False
client1.start()
self.simulator.go(config.ROUTER_STARTUP_DELAY)
self.assertEqual('router', client1.get_state())
client2.start()
self.simulator.go(config.ROUTER_STARTUP_DELAY)
self.assertEqual('router', client2.get_state())
self.simulator.go(10)
server_addr = server.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]
# Router1 can ping to/from the Host on infra link.
self.assertTrue(br1.ping(host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0], backbone=True))
self.assertTrue(host.ping(br1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0], backbone=True))
client1_addrs = [client1.get_mleid(), client1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]]
self._config_srp_client_services(client1, 'ins1', 'host1', 11111, 1, 1, client1_addrs)
client2_addrs = [client2.get_mleid(), client2.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]]
self._config_srp_client_services(client2, 'ins2', 'host2', 22222, 2, 2, client2_addrs)
ins1_full_name = f'ins1.{SERVICE_FULL_NAME}'
ins2_full_name = f'ins2.{SERVICE_FULL_NAME}'
host1_full_name = f'host1.{DOMAIN}'
host2_full_name = f'host2.{DOMAIN}'
EMPTY_TXT = {}
# check if PTR query works
dig_result = digger.dns_dig(server_addr, SERVICE_FULL_NAME, 'PTR')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(SERVICE_FULL_NAME, 'IN', 'PTR')],
'ANSWER': [(SERVICE_FULL_NAME, 'IN', 'PTR', f'ins1.{SERVICE_FULL_NAME}'),
(SERVICE_FULL_NAME, 'IN', 'PTR', f'ins2.{SERVICE_FULL_NAME}')],
})
# check if SRV query works
dig_result = digger.dns_dig(server_addr, ins1_full_name, 'SRV')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(ins1_full_name, 'IN', 'SRV')],
'ANSWER': [(ins1_full_name, 'IN', 'SRV', 1, 1, 11111, host1_full_name),],
'ADDITIONAL': [
(host1_full_name, 'IN', 'AAAA', client1_addrs[0]),
(host1_full_name, 'IN', 'AAAA', client1_addrs[1]),
],
})
dig_result = digger.dns_dig(server_addr, ins2_full_name, 'SRV')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(ins2_full_name, 'IN', 'SRV')],
'ANSWER': [(ins2_full_name, 'IN', 'SRV', 2, 2, 22222, host2_full_name),],
'ADDITIONAL': [
(host2_full_name, 'IN', 'AAAA', client2_addrs[0]),
(host2_full_name, 'IN', 'AAAA', client2_addrs[1]),
],
})
# check if TXT query works
dig_result = digger.dns_dig(server_addr, ins1_full_name, 'TXT')
self._assert_dig_result_matches(dig_result, {
'QUESTION': [(ins1_full_name, 'IN', 'TXT')],
'ANSWER': [(ins1_full_name, 'IN', 'TXT', EMPTY_TXT),],
})
dig_result = digger.dns_dig(server_addr, ins2_full_name, 'TXT')
self._assert_dig_result_matches(dig_result, {
'QUESTION': [(ins2_full_name, 'IN', 'TXT')],
'ANSWER': [(ins2_full_name, 'IN', 'TXT', EMPTY_TXT),],
})
# check if AAAA query works
dig_result = digger.dns_dig(server_addr, host1_full_name, 'AAAA')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(host1_full_name, 'IN', 'AAAA'),],
'ANSWER': [
(host1_full_name, 'IN', 'AAAA', client1_addrs[0]),
(host1_full_name, 'IN', 'AAAA', client1_addrs[1]),
],
})
dig_result = digger.dns_dig(server_addr, host2_full_name, 'AAAA')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(host2_full_name, 'IN', 'AAAA'),],
'ANSWER': [
(host2_full_name, 'IN', 'AAAA', client2_addrs[0]),
(host2_full_name, 'IN', 'AAAA', client2_addrs[1]),
],
})
# check some invalid queries
for qtype in ['A', 'CNAME']:
dig_result = digger.dns_dig(server_addr, host1_full_name, qtype)
self._assert_dig_result_matches(dig_result, {
'status': 'NOTIMP',
})
for service_name in WRONG_SERVICE_NAMES:
dig_result = digger.dns_dig(server_addr, service_name, 'PTR')
self._assert_dig_result_matches(dig_result, {
'status': 'NXDOMAIN',
})
# verify Discovery Proxy works for _meshcop._udp
self._verify_discovery_proxy_meshcop(server_addr, server.get_network_name(), digger)
def _verify_discovery_proxy_meshcop(self, server_addr, network_name, digger):
dp_service_name = '_meshcop._udp.default.service.arpa.'
dp_hostname = lambda x: x.endswith('.default.service.arpa.')
def check_border_agent_port(port):
return 0 < port <= 65535
dig_result = digger.dns_dig(server_addr, dp_service_name, 'PTR')
for answer in dig_result['ANSWER']:
if len(answer) >= 2 and answer[-2] == 'PTR':
dp_instance_name = answer[-1]
break
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(dp_service_name, 'IN', 'PTR'),],
'ANSWER': [(dp_service_name, 'IN', 'PTR', dp_instance_name),],
'ADDITIONAL': [
(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),
(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),
],
})
# Find the actual host name and IPv6 address
dp_ip6_address = None
for rr in dig_result['ADDITIONAL']:
if rr[3] == 'SRV':
dp_hostname = rr[7]
elif rr[3] == 'AAAA':
dp_ip6_address = rr[4]
assert isinstance(dp_hostname, str), dig_result
dig_result = digger.dns_dig(server_addr, dp_instance_name, 'SRV')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(dp_instance_name, 'IN', 'SRV'),],
'ANSWER': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),],
'ADDITIONAL': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),],
})
dig_result = digger.dns_dig(server_addr, dp_instance_name, 'TXT')
self._assert_dig_result_matches(
dig_result, {
'QUESTION': [(dp_instance_name, 'IN', 'TXT'),],
'ANSWER': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get(
'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),],
'ADDITIONAL': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),],
})
if dp_ip6_address is not None:
dig_result = digger.dns_dig(server_addr, dp_hostname, 'AAAA')
self._assert_dig_result_matches(dig_result, {
'QUESTION': [(dp_hostname, 'IN', 'AAAA'),],
'ANSWER': [(dp_hostname, 'IN', 'AAAA', dp_ip6_address),],
})
def _config_srp_client_services(self, client, instancename, hostname, port, priority, weight, addrs):
client.srp_client_enable_auto_start_mode()
client.srp_client_set_host_name(hostname)
client.srp_client_set_host_address(*addrs)
client.srp_client_add_service(instancename, SERVICE, port, priority, weight)
self.simulator.go(5)
self.assertEqual(client.srp_client_get_host_state(), 'Registered')
def _assert_have_question(self, dig_result, question):
for dig_question in dig_result['QUESTION']:
if self._match_record(dig_question, question):
return
self.fail((dig_result, question))
def _assert_have_answer(self, dig_result, record, additional=False):
for dig_answer in dig_result['ANSWER' if not additional else 'ADDITIONAL']:
dig_answer = list(dig_answer)
dig_answer[1:2] = [] # remove TTL from answer
record = list(record)
# convert IPv6 addresses to `ipaddress.IPv6Address` before matching
if dig_answer[2] == 'AAAA':
dig_answer[3] = ipaddress.IPv6Address(dig_answer[3])
if record[2] == 'AAAA':
record[3] = ipaddress.IPv6Address(record[3])
if self._match_record(dig_answer, record):
return
print('not match: ', dig_answer, record,
list(a == b or (callable(b) and b(a)) for a, b in zip(dig_answer, record)))
self.fail((record, dig_result))
def _match_record(self, record, match):
assert not any(callable(elem) for elem in record), record
if record == match:
return True
return all(a == b or (callable(b) and b(a)) for a, b in zip(record, match))
def _assert_dig_result_matches(self, dig_result, expected_result):
self.assertEqual(dig_result['opcode'], expected_result.get('opcode', 'QUERY'), dig_result)
self.assertEqual(dig_result['status'], expected_result.get('status', 'NOERROR'), dig_result)
if 'QUESTION' in expected_result:
self.assertEqual(len(dig_result['QUESTION']), len(expected_result['QUESTION']), dig_result)
for question in expected_result['QUESTION']:
self._assert_have_question(dig_result, question)
if 'ANSWER' in expected_result:
self.assertEqual(len(dig_result['ANSWER']), len(expected_result['ANSWER']), dig_result)
for record in expected_result['ANSWER']:
self._assert_have_answer(dig_result, record, additional=False)
if 'ADDITIONAL' in expected_result:
self.assertGreaterEqual(len(dig_result['ADDITIONAL']), len(expected_result['ADDITIONAL']), dig_result)
for record in expected_result['ADDITIONAL']:
self._assert_have_answer(dig_result, record, additional=True)
logging.info("dig result matches:\r%s", json.dumps(dig_result, indent=True))
if __name__ == '__main__':
unittest.main()