blob: 0a5e2146136e0be9e3cd22780c961ebb5c00792a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2016, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import unittest
import command
import dtls
import mle
import thread_cert
from command import CheckType
COMMISSIONER = 1
JOINER = 2
class Cert_8_1_01_Commissioning(thread_cert.TestCase):
SUPPORT_NCP = False
TOPOLOGY = {
COMMISSIONER: {
'masterkey': '00112233445566778899aabbccddeeff',
'mode': 'rsdn',
'panid': 0xface
},
JOINER: {
'masterkey': 'deadbeefdeadbeefdeadbeefdeadbeef',
'mode': 'rsdn',
'router_selection_jitter': 1
},
}
def test(self):
self.nodes[COMMISSIONER].interface_up()
self.nodes[COMMISSIONER].thread_start()
self.simulator.go(5)
self.assertEqual(self.nodes[COMMISSIONER].get_state(), 'leader')
self.nodes[COMMISSIONER].commissioner_start()
self.simulator.go(3)
self.nodes[COMMISSIONER].commissioner_add_joiner(
self.nodes[JOINER].get_eui64(), 'PSKD01')
self.nodes[JOINER].interface_up()
self.nodes[JOINER].joiner_start('PSKD01')
self.simulator.go(10)
self.simulator.read_cert_messages_in_commissioning_log(
[COMMISSIONER, JOINER])
self.assertEqual(
self.nodes[JOINER].get_masterkey(),
self.nodes[COMMISSIONER].get_masterkey(),
)
joiner_messages = self.simulator.get_messages_sent_by(JOINER)
commissioner_messages = self.simulator.get_messages_sent_by(
COMMISSIONER)
# 2 - N/A
# 3 - Joiner_1
msg = joiner_messages.next_mle_message(
mle.CommandType.DISCOVERY_REQUEST)
command.check_discovery_request(msg)
request_src_addr = msg.mac_header.src_address
# 4 - Commissioner
msg = commissioner_messages.next_mle_message(
mle.CommandType.DISCOVERY_RESPONSE)
command.check_discovery_response(msg,
request_src_addr,
steering_data=CheckType.CONTAIN)
udp_port_set_by_commissioner = command.get_joiner_udp_port_in_discovery_response(
msg)
# 5.2 - Joiner_1
msg = joiner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.CLIENT_HELLO)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# 5.3 - Commissioner
msg = commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.HELLO_VERIFY_REQUEST)
commissioner_cookie = msg.dtls.body.cookie
# 5.4 - Joiner_1
msg = joiner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.CLIENT_HELLO)
self.assertEqual(commissioner_cookie, msg.dtls.body.cookie)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# 5.5 - Commissioner
commissioner_messages.next_dtls_message(dtls.ContentType.HANDSHAKE,
dtls.HandshakeType.SERVER_HELLO)
commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_KEY_EXCHANGE)
commissioner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.SERVER_HELLO_DONE)
# 5.6 - Joiner_1
msg = joiner_messages.next_dtls_message(
dtls.ContentType.HANDSHAKE, dtls.HandshakeType.CLIENT_KEY_EXCHANGE)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
msg = joiner_messages.next_dtls_message(
dtls.ContentType.CHANGE_CIPHER_SPEC)
self.assertEqual(msg.get_dst_udp_port(), udp_port_set_by_commissioner)
# TODO(wgtdkp): It's required to verify DTLS FINISHED message here.
# Currently not handled as it is encrypted.
# 5.7 - Commissioner
commissioner_messages.next_dtls_message(
dtls.ContentType.CHANGE_CIPHER_SPEC)
# TODO(wgtdkp): It's required to verify DTLS FINISHED message here.
# Currently not handled as it is encrypted.
# 5.8,9,10,11
# - Joiner_1
command.check_joiner_commissioning_messages(
joiner_messages.commissioning_messages)
# - Commissioner
command.check_commissioner_commissioning_messages(
commissioner_messages.commissioning_messages)
# As commissioner is also joiner router
command.check_joiner_router_commissioning_messages(
commissioner_messages.commissioning_messages)
self.nodes[JOINER].thread_start()
self.simulator.go(5)
self.assertEqual(self.nodes[JOINER].get_state(), 'router')
if __name__ == '__main__':
unittest.main()