| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2021, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS' |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| import ipaddress |
| import json |
| import logging |
| import unittest |
| |
| import config |
| import thread_cert |
| |
| # Test description: |
| # This test verifies DNS-SD server works on a BR and is accessible from a Host. |
| # |
| # Topology: |
| # ----------------(eth)-------------------- |
| # | | |
| # BR1 (Leader, Server) HOST |
| # / \ |
| # CLIENT1 CLIENT2 |
| |
| SERVER = BR1 = 1 |
| CLIENT1, CLIENT2 = 2, 3 |
| HOST = 4 |
| |
| DOMAIN = 'default.service.arpa.' |
| SERVICE = '_testsrv._udp' |
| SERVICE_FULL_NAME = f'{SERVICE}.{DOMAIN}' |
| |
| VALID_SERVICE_NAMES = [ |
| '_abc._udp.default.service.arpa.', |
| '_abc._tcp.default.service.arpa.', |
| ] |
| |
| WRONG_SERVICE_NAMES = [ |
| '_testsrv._udp.default.service.xxxx.', |
| '_testsrv._txp,default.service.arpa.', |
| ] |
| |
| |
| class TestDnssdServerOnBr(thread_cert.TestCase): |
| USE_MESSAGE_FACTORY = False |
| |
| TOPOLOGY = { |
| BR1: { |
| 'name': 'SERVER', |
| 'is_otbr': True, |
| 'version': '1.2', |
| }, |
| CLIENT1: { |
| 'name': 'CLIENT1', |
| }, |
| CLIENT2: { |
| 'name': 'CLIENT2', |
| }, |
| HOST: { |
| 'name': 'Host', |
| 'is_host': True |
| }, |
| } |
| |
| def test(self): |
| server = br1 = self.nodes[BR1] |
| client1 = self.nodes[CLIENT1] |
| client2 = self.nodes[CLIENT2] |
| digger = host = self.nodes[HOST] |
| |
| host.start(start_radvd=False) |
| self.simulator.go(5) |
| |
| br1.start() |
| self.simulator.go(config.LEADER_STARTUP_DELAY) |
| self.assertEqual('leader', br1.get_state()) |
| server.srp_server_set_enabled(True) |
| server.dns_upstream_query_state = False |
| |
| client1.start() |
| |
| self.simulator.go(config.ROUTER_STARTUP_DELAY) |
| self.assertEqual('router', client1.get_state()) |
| |
| client2.start() |
| self.simulator.go(config.ROUTER_STARTUP_DELAY) |
| self.assertEqual('router', client2.get_state()) |
| |
| self.simulator.go(10) |
| |
| server_addr = server.get_ip6_address(config.ADDRESS_TYPE.OMR)[0] |
| |
| # Router1 can ping to/from the Host on infra link. |
| self.assertTrue(br1.ping(host.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA)[0], backbone=True)) |
| self.assertTrue(host.ping(br1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0], backbone=True)) |
| |
| client1_addrs = [client1.get_mleid(), client1.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]] |
| self._config_srp_client_services(client1, 'ins1', 'host1', 11111, 1, 1, client1_addrs) |
| |
| client2_addrs = [client2.get_mleid(), client2.get_ip6_address(config.ADDRESS_TYPE.OMR)[0]] |
| self._config_srp_client_services(client2, 'ins2', 'host2', 22222, 2, 2, client2_addrs) |
| |
| ins1_full_name = f'ins1.{SERVICE_FULL_NAME}' |
| ins2_full_name = f'ins2.{SERVICE_FULL_NAME}' |
| host1_full_name = f'host1.{DOMAIN}' |
| host2_full_name = f'host2.{DOMAIN}' |
| EMPTY_TXT = {} |
| |
| # check if PTR query works |
| dig_result = digger.dns_dig(server_addr, SERVICE_FULL_NAME, 'PTR') |
| |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(SERVICE_FULL_NAME, 'IN', 'PTR')], |
| 'ANSWER': [(SERVICE_FULL_NAME, 'IN', 'PTR', f'ins1.{SERVICE_FULL_NAME}'), |
| (SERVICE_FULL_NAME, 'IN', 'PTR', f'ins2.{SERVICE_FULL_NAME}')], |
| }) |
| |
| # check if SRV query works |
| dig_result = digger.dns_dig(server_addr, ins1_full_name, 'SRV') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(ins1_full_name, 'IN', 'SRV')], |
| 'ANSWER': [(ins1_full_name, 'IN', 'SRV', 1, 1, 11111, host1_full_name),], |
| 'ADDITIONAL': [ |
| (host1_full_name, 'IN', 'AAAA', client1_addrs[0]), |
| (host1_full_name, 'IN', 'AAAA', client1_addrs[1]), |
| ], |
| }) |
| |
| dig_result = digger.dns_dig(server_addr, ins2_full_name, 'SRV') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(ins2_full_name, 'IN', 'SRV')], |
| 'ANSWER': [(ins2_full_name, 'IN', 'SRV', 2, 2, 22222, host2_full_name),], |
| 'ADDITIONAL': [ |
| (host2_full_name, 'IN', 'AAAA', client2_addrs[0]), |
| (host2_full_name, 'IN', 'AAAA', client2_addrs[1]), |
| ], |
| }) |
| |
| # check if TXT query works |
| dig_result = digger.dns_dig(server_addr, ins1_full_name, 'TXT') |
| self._assert_dig_result_matches(dig_result, { |
| 'QUESTION': [(ins1_full_name, 'IN', 'TXT')], |
| 'ANSWER': [(ins1_full_name, 'IN', 'TXT', EMPTY_TXT),], |
| }) |
| |
| dig_result = digger.dns_dig(server_addr, ins2_full_name, 'TXT') |
| self._assert_dig_result_matches(dig_result, { |
| 'QUESTION': [(ins2_full_name, 'IN', 'TXT')], |
| 'ANSWER': [(ins2_full_name, 'IN', 'TXT', EMPTY_TXT),], |
| }) |
| |
| # check if AAAA query works |
| dig_result = digger.dns_dig(server_addr, host1_full_name, 'AAAA') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(host1_full_name, 'IN', 'AAAA'),], |
| 'ANSWER': [ |
| (host1_full_name, 'IN', 'AAAA', client1_addrs[0]), |
| (host1_full_name, 'IN', 'AAAA', client1_addrs[1]), |
| ], |
| }) |
| |
| dig_result = digger.dns_dig(server_addr, host2_full_name, 'AAAA') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(host2_full_name, 'IN', 'AAAA'),], |
| 'ANSWER': [ |
| (host2_full_name, 'IN', 'AAAA', client2_addrs[0]), |
| (host2_full_name, 'IN', 'AAAA', client2_addrs[1]), |
| ], |
| }) |
| |
| # check some invalid queries |
| for qtype in ['A', 'CNAME']: |
| dig_result = digger.dns_dig(server_addr, host1_full_name, qtype) |
| self._assert_dig_result_matches(dig_result, { |
| 'status': 'NOTIMP', |
| }) |
| |
| for service_name in WRONG_SERVICE_NAMES: |
| dig_result = digger.dns_dig(server_addr, service_name, 'PTR') |
| self._assert_dig_result_matches(dig_result, { |
| 'status': 'NXDOMAIN', |
| }) |
| |
| # verify Discovery Proxy works for _meshcop._udp |
| self._verify_discovery_proxy_meshcop(server_addr, server.get_network_name(), digger) |
| |
| def _verify_discovery_proxy_meshcop(self, server_addr, network_name, digger): |
| dp_service_name = '_meshcop._udp.default.service.arpa.' |
| dp_hostname = lambda x: x.endswith('.default.service.arpa.') |
| |
| def check_border_agent_port(port): |
| return 0 < port <= 65535 |
| |
| dig_result = digger.dns_dig(server_addr, dp_service_name, 'PTR') |
| for answer in dig_result['ANSWER']: |
| if len(answer) >= 2 and answer[-2] == 'PTR': |
| dp_instance_name = answer[-1] |
| break |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(dp_service_name, 'IN', 'PTR'),], |
| 'ANSWER': [(dp_service_name, 'IN', 'PTR', dp_instance_name),], |
| 'ADDITIONAL': [ |
| (dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname), |
| (dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( |
| 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)), |
| ], |
| }) |
| |
| # Find the actual host name and IPv6 address |
| dp_ip6_address = None |
| for rr in dig_result['ADDITIONAL']: |
| if rr[3] == 'SRV': |
| dp_hostname = rr[7] |
| elif rr[3] == 'AAAA': |
| dp_ip6_address = rr[4] |
| |
| assert isinstance(dp_hostname, str), dig_result |
| |
| dig_result = digger.dns_dig(server_addr, dp_instance_name, 'SRV') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(dp_instance_name, 'IN', 'SRV'),], |
| 'ANSWER': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),], |
| 'ADDITIONAL': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( |
| 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),], |
| }) |
| |
| dig_result = digger.dns_dig(server_addr, dp_instance_name, 'TXT') |
| self._assert_dig_result_matches( |
| dig_result, { |
| 'QUESTION': [(dp_instance_name, 'IN', 'TXT'),], |
| 'ANSWER': [(dp_instance_name, 'IN', 'TXT', lambda txt: (isinstance(txt, dict) and txt.get( |
| 'nn') == network_name and 'xp' in txt and 'tv' in txt and 'xa' in txt)),], |
| 'ADDITIONAL': [(dp_instance_name, 'IN', 'SRV', 0, 0, check_border_agent_port, dp_hostname),], |
| }) |
| |
| if dp_ip6_address is not None: |
| dig_result = digger.dns_dig(server_addr, dp_hostname, 'AAAA') |
| |
| self._assert_dig_result_matches(dig_result, { |
| 'QUESTION': [(dp_hostname, 'IN', 'AAAA'),], |
| 'ANSWER': [(dp_hostname, 'IN', 'AAAA', dp_ip6_address),], |
| }) |
| |
| def _config_srp_client_services(self, client, instancename, hostname, port, priority, weight, addrs): |
| client.srp_client_enable_auto_start_mode() |
| client.srp_client_set_host_name(hostname) |
| client.srp_client_set_host_address(*addrs) |
| client.srp_client_add_service(instancename, SERVICE, port, priority, weight) |
| |
| self.simulator.go(5) |
| self.assertEqual(client.srp_client_get_host_state(), 'Registered') |
| |
| def _assert_have_question(self, dig_result, question): |
| for dig_question in dig_result['QUESTION']: |
| if self._match_record(dig_question, question): |
| return |
| |
| self.fail((dig_result, question)) |
| |
| def _assert_have_answer(self, dig_result, record, additional=False): |
| for dig_answer in dig_result['ANSWER' if not additional else 'ADDITIONAL']: |
| dig_answer = list(dig_answer) |
| dig_answer[1:2] = [] # remove TTL from answer |
| |
| record = list(record) |
| |
| # convert IPv6 addresses to `ipaddress.IPv6Address` before matching |
| if dig_answer[2] == 'AAAA': |
| dig_answer[3] = ipaddress.IPv6Address(dig_answer[3]) |
| |
| if record[2] == 'AAAA': |
| record[3] = ipaddress.IPv6Address(record[3]) |
| |
| if self._match_record(dig_answer, record): |
| return |
| |
| print('not match: ', dig_answer, record, |
| list(a == b or (callable(b) and b(a)) for a, b in zip(dig_answer, record))) |
| |
| self.fail((record, dig_result)) |
| |
| def _match_record(self, record, match): |
| assert not any(callable(elem) for elem in record), record |
| |
| if record == match: |
| return True |
| |
| return all(a == b or (callable(b) and b(a)) for a, b in zip(record, match)) |
| |
| def _assert_dig_result_matches(self, dig_result, expected_result): |
| self.assertEqual(dig_result['opcode'], expected_result.get('opcode', 'QUERY'), dig_result) |
| self.assertEqual(dig_result['status'], expected_result.get('status', 'NOERROR'), dig_result) |
| |
| if 'QUESTION' in expected_result: |
| self.assertEqual(len(dig_result['QUESTION']), len(expected_result['QUESTION']), dig_result) |
| |
| for question in expected_result['QUESTION']: |
| self._assert_have_question(dig_result, question) |
| |
| if 'ANSWER' in expected_result: |
| self.assertEqual(len(dig_result['ANSWER']), len(expected_result['ANSWER']), dig_result) |
| |
| for record in expected_result['ANSWER']: |
| self._assert_have_answer(dig_result, record, additional=False) |
| |
| if 'ADDITIONAL' in expected_result: |
| self.assertGreaterEqual(len(dig_result['ADDITIONAL']), len(expected_result['ADDITIONAL']), dig_result) |
| |
| for record in expected_result['ADDITIONAL']: |
| self._assert_have_answer(dig_result, record, additional=True) |
| |
| logging.info("dig result matches:\r%s", json.dumps(dig_result, indent=True)) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |