| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| import io |
| import random |
| import struct |
| import unittest |
| |
| import common |
| import config |
| import mle |
| import network_data |
| |
| |
| def any_address(): |
| return random.getrandbits(16) |
| |
| |
| def any_receiver(): |
| return random.getrandbits(1) |
| |
| |
| def any_secure(): |
| return random.getrandbits(1) |
| |
| |
| def any_device_type(): |
| return random.getrandbits(1) |
| |
| |
| def any_network_data(): |
| return random.getrandbits(1) |
| |
| |
| mode_map = { |
| 0x00: {"receiver": 0, "secure": 0, "device_type": 0, "network_data": 0}, |
| 0x08: {"receiver": 1, "secure": 0, "device_type": 0, "network_data": 0}, |
| 0x04: {"receiver": 0, "secure": 1, "device_type": 0, "network_data": 0}, |
| 0x0C: {"receiver": 1, "secure": 1, "device_type": 0, "network_data": 0}, |
| 0x02: {"receiver": 0, "secure": 0, "device_type": 1, "network_data": 0}, |
| 0x0A: {"receiver": 1, "secure": 0, "device_type": 1, "network_data": 0}, |
| 0x06: {"receiver": 0, "secure": 1, "device_type": 1, "network_data": 0}, |
| 0x0E: {"receiver": 1, "secure": 1, "device_type": 1, "network_data": 0}, |
| 0x01: {"receiver": 0, "secure": 0, "device_type": 0, "network_data": 1}, |
| 0x09: {"receiver": 1, "secure": 0, "device_type": 0, "network_data": 1}, |
| 0x05: {"receiver": 0, "secure": 1, "device_type": 0, "network_data": 1}, |
| 0x0D: {"receiver": 1, "secure": 1, "device_type": 0, "network_data": 1}, |
| 0x03: {"receiver": 0, "secure": 0, "device_type": 1, "network_data": 1}, |
| 0x0B: {"receiver": 1, "secure": 0, "device_type": 1, "network_data": 1}, |
| 0x07: {"receiver": 0, "secure": 1, "device_type": 1, "network_data": 1}, |
| 0x0F: {"receiver": 1, "secure": 1, "device_type": 1, "network_data": 1} |
| } |
| |
| |
| def any_mode(): |
| return random.getrandbits(4) |
| |
| |
| def any_timeout(): |
| return random.getrandbits(32) |
| |
| |
| def any_challenge(): |
| length = random.randint(4, 8) |
| return bytearray(random.getrandbits(8) for _ in range(length)) |
| |
| |
| def any_response(): |
| length = random.randint(4, 8) |
| return bytearray(random.getrandbits(8) for _ in range(length)) |
| |
| |
| def any_link_layer_frame_counter(): |
| return random.getrandbits(32) |
| |
| |
| def any_mle_frame_counter(): |
| return random.getrandbits(32) |
| |
| |
| def any_output(): |
| return random.getrandbits(2) |
| |
| |
| def any_input(): |
| return random.getrandbits(2) |
| |
| |
| def any_route(): |
| return random.getrandbits(4) |
| |
| |
| def any_router_id_mask(): |
| return random.getrandbits(64) |
| |
| |
| def any_link_quality_and_route_data(length=None): |
| length = length if length is not None else random.randint(0, 63) |
| return [random.getrandbits(8) for _ in range(length)] |
| |
| |
| def any_partition_id(): |
| return random.getrandbits(32) |
| |
| |
| def any_weighting(): |
| return random.getrandbits(8) |
| |
| |
| def any_data_version(): |
| return random.getrandbits(8) |
| |
| |
| def any_stable_data_version(): |
| return random.getrandbits(8) |
| |
| |
| def any_leader_router_id(): |
| return random.getrandbits(8) |
| |
| |
| scan_mask_map = { |
| 0x00: {"router": 0, "end_device": 0}, |
| 0x40: {"router": 0, "end_device": 1}, |
| 0x80: {"router": 1, "end_device": 0}, |
| 0xC0: {"router": 1, "end_device": 1}, |
| } |
| |
| |
| def any_scan_mask_router(): |
| return random.getrandbits(1) |
| |
| |
| def any_scan_mask_end_device(): |
| return random.getrandbits(1) |
| |
| |
| def any_scan_mask(): |
| return (random.getrandbits(2) << 6) |
| |
| |
| def any_link_margin(): |
| return random.getrandbits(8) |
| |
| |
| def any_status(): |
| return random.getrandbits(8) |
| |
| |
| def any_version(): |
| return random.getrandbits(16) |
| |
| |
| def any_channel_page(): |
| return random.getrandbits(8) |
| |
| |
| def any_channel(): |
| return random.getrandbits(16) |
| |
| |
| def any_pan_id(): |
| return random.getrandbits(16) |
| |
| |
| def any_timestamp_seconds(): |
| return random.getrandbits(48) |
| |
| |
| def any_timestamp_ticks(): |
| return random.getrandbits(15) |
| |
| |
| def any_u(): |
| return random.getrandbits(1) |
| |
| |
| def any_pp(): |
| return random.getrandbits(2) |
| |
| |
| def any_link_quality_3(): |
| return random.getrandbits(8) |
| |
| |
| def any_link_quality_2(): |
| return random.getrandbits(8) |
| |
| |
| def any_link_quality_1(): |
| return random.getrandbits(8) |
| |
| |
| def any_leader_cost(): |
| return random.getrandbits(8) |
| |
| |
| def any_id_sequence(): |
| return random.getrandbits(8) |
| |
| |
| def any_active_routers(): |
| return random.getrandbits(8) |
| |
| |
| def any_sed_buffer_size(): |
| return random.getrandbits(16) |
| |
| |
| def any_sed_datagram_count(): |
| return random.getrandbits(8) |
| |
| |
| def any_tlvs(length=None): |
| if length is None: |
| length = random.randint(0, 16) |
| |
| return [random.getrandbits(8) for _ in range(length)] |
| |
| |
| def any_cid(): |
| return random.getrandbits(4) |
| |
| |
| def any_iid(): |
| return bytearray([random.getrandbits(8) for _ in range(8)]) |
| |
| |
| def any_ipv6_address(): |
| return bytearray([random.getrandbits(8) for _ in range(16)]) |
| |
| |
| def any_addresses(): |
| addresses = [ |
| mle.AddressCompressed(any_cid(), any_iid()), |
| mle.AddressFull(any_ipv6_address()) |
| ] |
| |
| return addresses |
| |
| |
| def any_key_id_mode(): |
| return random.getrandbits(2) |
| |
| |
| def any_security_level(): |
| return random.getrandbits(3) |
| |
| |
| def any_frame_counter(): |
| return random.getrandbits(32) |
| |
| |
| def any_key_id(key_id_mode): |
| if key_id_mode == 0: |
| length = 0 |
| elif key_id_mode == 1: |
| length = 1 |
| elif key_id_mode == 2: |
| length = 5 |
| elif key_id_mode == 3: |
| length = 9 |
| |
| return bytearray([random.getrandbits(8) for _ in range(length)]) |
| |
| |
| def any_eui64(): |
| return bytearray([random.getrandbits(8) for _ in range(8)]) |
| |
| |
| class TestSourceAddress(unittest.TestCase): |
| |
| def test_should_return_address_value_when_address_property_is_called(self): |
| # GIVEN |
| address = any_address() |
| |
| source_address = mle.SourceAddress(address) |
| |
| # WHEN |
| actual_address = source_address.address |
| |
| # THEN |
| self.assertEqual(address, actual_address) |
| |
| |
| class TestSourceAddressFactory(unittest.TestCase): |
| |
| def test_should_create_SourceAddress_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| address = any_address() |
| |
| factory = mle.SourceAddressFactory() |
| |
| data = struct.pack(">H", address) |
| |
| # WHEN |
| actual_source_address = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_source_address, mle.SourceAddress)) |
| self.assertEqual(address, actual_source_address.address) |
| |
| |
| class TestMode(unittest.TestCase): |
| |
| def test_should_return_receiver_value_when_receiver_property_is_called(self): |
| # GIVEN |
| receiver = any_receiver() |
| |
| mode = mle.Mode(receiver, any_secure(), any_device_type(), any_network_data()) |
| |
| # WHEN |
| actual_receiver = mode.receiver |
| |
| # THEN |
| self.assertEqual(receiver, actual_receiver) |
| |
| def test_should_return_secure_value_when_secure_property_is_called(self): |
| # GIVEN |
| secure = any_secure() |
| |
| mode = mle.Mode(any_receiver(), secure, any_device_type(), any_network_data()) |
| |
| # WHEN |
| actual_secure = mode.secure |
| |
| # THEN |
| self.assertEqual(secure, actual_secure) |
| |
| def test_should_return_device_type_value_when_device_type_property_is_called(self): |
| # GIVEN |
| device_type = any_device_type() |
| |
| mode = mle.Mode(any_receiver(), any_secure(), device_type, any_network_data()) |
| |
| # WHEN |
| actual_device_type = mode.device_type |
| |
| # THEN |
| self.assertEqual(device_type, actual_device_type) |
| |
| def test_should_return_network_data_value_when_network_data_property_is_called(self): |
| # GIVEN |
| network_data = any_network_data() |
| |
| mode = mle.Mode(any_receiver(), any_secure(), any_device_type(), network_data) |
| |
| # WHEN |
| actual_network_data = mode.network_data |
| |
| # THEN |
| self.assertEqual(network_data, actual_network_data) |
| |
| |
| class TestModeFactory(unittest.TestCase): |
| |
| def test_should_create_Mode_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| mode = any_mode() |
| |
| factory = mle.ModeFactory() |
| |
| data = bytearray([mode]) |
| |
| # WHEN |
| actual_mode = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_mode, mle.Mode)) |
| self.assertEqual(mode_map[mode]["receiver"], actual_mode.receiver) |
| self.assertEqual(mode_map[mode]["secure"], actual_mode.secure) |
| self.assertEqual(mode_map[mode]["device_type"], actual_mode.device_type) |
| self.assertEqual(mode_map[mode]["network_data"], actual_mode.network_data) |
| |
| |
| class TestTimeout(unittest.TestCase): |
| |
| def test_should_return_timeout_value_when_timeout_property_is_called(self): |
| # GIVEN |
| timeout = any_timeout() |
| |
| timeout_obj = mle.Timeout(timeout) |
| |
| # WHEN |
| actual_timeout = timeout_obj.timeout |
| |
| # THEN |
| self.assertEqual(timeout, actual_timeout) |
| |
| |
| class TestTimeoutFactory(unittest.TestCase): |
| |
| def test_should_create_Timeout_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| timeout = any_timeout() |
| |
| factory = mle.TimeoutFactory() |
| |
| data = struct.pack(">I", timeout) |
| |
| # WHEN |
| actual_timeout = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_timeout, mle.Timeout)) |
| self.assertEqual(timeout, actual_timeout.timeout) |
| |
| |
| class TestChallenge(unittest.TestCase): |
| |
| def test_should_return_challenge_value_when_challenge_property_is_called(self): |
| # GIVEN |
| challenge = any_challenge() |
| |
| challenge_obj = mle.Challenge(challenge) |
| |
| # WHEN |
| actual_challenge = challenge_obj.challenge |
| |
| # THEN |
| self.assertEqual(challenge, actual_challenge) |
| |
| |
| class TestChallengeFactory(unittest.TestCase): |
| |
| def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| challenge = any_challenge() |
| |
| factory = mle.ChallengeFactory() |
| |
| data = challenge |
| |
| # WHEN |
| actual_challenge = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_challenge, mle.Challenge)) |
| self.assertEqual(challenge, actual_challenge.challenge) |
| |
| |
| class TestResponse(unittest.TestCase): |
| |
| def test_should_return_response_value_when_response_property_is_called(self): |
| # GIVEN |
| response = any_response() |
| |
| response_obj = mle.Response(response) |
| |
| # WHEN |
| actual_response = response_obj.response |
| |
| # THEN |
| self.assertEqual(response, actual_response) |
| |
| |
| class TestResponseFactory(unittest.TestCase): |
| |
| def test_should_create_Challenge_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| response = any_response() |
| |
| factory = mle.ResponseFactory() |
| |
| data = response |
| |
| # WHEN |
| actual_response = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_response, mle.Response)) |
| self.assertEqual(response, actual_response.response) |
| |
| |
| class TestLinkLayerFrameCounter(unittest.TestCase): |
| |
| def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self): |
| # GIVEN |
| link_layer_frame_counter = any_link_layer_frame_counter() |
| |
| link_layer_frame_counter_obj = mle.LinkLayerFrameCounter(link_layer_frame_counter) |
| |
| # WHEN |
| actual_link_layer_frame_counter = link_layer_frame_counter_obj.frame_counter |
| |
| # THEN |
| self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter) |
| |
| |
| class TestLinkLayerFrameCounterFactory(unittest.TestCase): |
| |
| def test_should_create_LinkLayerFrameCounter_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| link_layer_frame_counter = any_link_layer_frame_counter() |
| |
| factory = mle.LinkLayerFrameCounterFactory() |
| |
| data = struct.pack(">I", link_layer_frame_counter) |
| |
| # WHEN |
| actual_link_layer_frame_counter = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_link_layer_frame_counter, mle.LinkLayerFrameCounter)) |
| self.assertEqual(link_layer_frame_counter, actual_link_layer_frame_counter.frame_counter) |
| |
| |
| class TestMleFrameCounter(unittest.TestCase): |
| |
| def test_should_return_frame_counter_value_when_frame_counter_property_is_called(self): |
| # GIVEN |
| mle_frame_counter = any_mle_frame_counter() |
| |
| mle_frame_counter_obj = mle.MleFrameCounter(mle_frame_counter) |
| |
| # WHEN |
| actual_mle_frame_counter = mle_frame_counter_obj.frame_counter |
| |
| # THEN |
| self.assertEqual(mle_frame_counter, actual_mle_frame_counter) |
| |
| |
| class TestMleFrameCounterFactory(unittest.TestCase): |
| |
| def test_should_create_MleFrameCounter_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| mle_frame_counter = any_mle_frame_counter() |
| |
| factory = mle.MleFrameCounterFactory() |
| |
| data = struct.pack(">I", mle_frame_counter) |
| |
| # WHEN |
| actual_mle_frame_counter = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_mle_frame_counter, mle.MleFrameCounter)) |
| self.assertEqual(mle_frame_counter, actual_mle_frame_counter.frame_counter) |
| |
| |
| class TestLinkQualityAndRouteData(unittest.TestCase): |
| |
| def test_should_return_output_value_when_output_property_is_called(self): |
| # GIVEN |
| output = any_output() |
| |
| lqrd = mle.LinkQualityAndRouteData(output, any_input(), any_route()) |
| |
| # WHEN |
| actual_output = lqrd.output |
| |
| # THEN |
| self.assertEqual(output, actual_output) |
| |
| def test_should_return_input_value_when_input_property_is_called(self): |
| # GIVEN |
| _input = any_input() |
| |
| lqrd = mle.LinkQualityAndRouteData(any_output(), _input, any_route()) |
| |
| # WHEN |
| actual_input = lqrd.input |
| |
| # THEN |
| self.assertEqual(_input, actual_input) |
| |
| def test_should_return_route_value_when_route_property_is_called(self): |
| # GIVEN |
| route = any_route() |
| |
| lqrd = mle.LinkQualityAndRouteData(any_output(), any_input(), route) |
| |
| # WHEN |
| actual_route = lqrd.route |
| |
| # THEN |
| self.assertEqual(route, actual_route) |
| |
| |
| class TestLinkQualityAndRouteDataFactory(unittest.TestCase): |
| |
| def test_should_create_LinkQualityAndRouteData_from_well_known_byte_when_parse_method_is_called(self): |
| # GIVEN |
| factory = mle.LinkQualityAndRouteDataFactory() |
| |
| data = bytearray([0x66]) |
| |
| # WHEN |
| actual_lqrd = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertEqual(1, actual_lqrd.output) |
| self.assertEqual(2, actual_lqrd.input) |
| self.assertEqual(6, actual_lqrd.route) |
| |
| def test_should_create_LinkQualityAndRouteData_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| output = any_output() |
| _input = any_input() |
| route = any_route() |
| |
| lqrd = (output << 6) | (_input << 4) | route |
| |
| factory = mle.LinkQualityAndRouteDataFactory() |
| |
| data = bytearray([lqrd]) |
| |
| # WHEN |
| actual_lqrd = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_lqrd, mle.LinkQualityAndRouteData)) |
| self.assertEqual(output, actual_lqrd.output) |
| self.assertEqual(_input, actual_lqrd.input) |
| self.assertEqual(route, actual_lqrd.route) |
| |
| |
| class TestRoute64(unittest.TestCase): |
| |
| def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self): |
| # GIVEN |
| id_sequence = any_id_sequence() |
| |
| route64_obj = mle.Route64(id_sequence, any_router_id_mask(), any_link_quality_and_route_data()) |
| |
| # WHEN |
| actual_id_sequence = route64_obj.id_sequence |
| |
| # THEN |
| self.assertEqual(id_sequence, actual_id_sequence) |
| |
| def test_should_return_router_id_mask_value_when_router_id_mask_property_is_called(self): |
| # GIVEN |
| router_id_mask = any_router_id_mask() |
| |
| route64_obj = mle.Route64(any_id_sequence(), router_id_mask, any_link_quality_and_route_data()) |
| |
| # WHEN |
| actual_router_id_mask = route64_obj.router_id_mask |
| |
| # THEN |
| self.assertEqual(router_id_mask, actual_router_id_mask) |
| |
| def test_should_return_link_quality_and_route_data_value_when_link_quality_and_route_data_property_is_called(self): |
| # GIVEN |
| link_quality_and_route_data = any_link_quality_and_route_data() |
| |
| route64_obj = mle.Route64(any_id_sequence(), any_router_id_mask(), link_quality_and_route_data) |
| |
| # WHEN |
| actual_link_quality_and_route_data = route64_obj.link_quality_and_route_data |
| |
| # THEN |
| self.assertEqual(link_quality_and_route_data, actual_link_quality_and_route_data) |
| |
| |
| class TestRoute64Factory(unittest.TestCase): |
| |
| def test_should_create_Route64_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| class DummyLQRDFactory: |
| |
| def parse(self, data, context): |
| return ord(data.read(1)) |
| |
| id_sequence = any_id_sequence() |
| router_id_mask = any_router_id_mask() |
| |
| router_count = 0 |
| for i in range(64): |
| router_count += (router_id_mask >> i) & 0x01 |
| |
| link_quality_and_route_data = any_link_quality_and_route_data(router_count) |
| |
| factory = mle.Route64Factory(DummyLQRDFactory()) |
| |
| data = bytearray([id_sequence]) + struct.pack(">Q", router_id_mask) + bytearray(link_quality_and_route_data) |
| |
| # WHEN |
| actual_route64 = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_route64, mle.Route64)) |
| self.assertEqual(id_sequence, actual_route64.id_sequence) |
| self.assertEqual(router_id_mask, actual_route64.router_id_mask) |
| self.assertEqual([b for b in link_quality_and_route_data], actual_route64.link_quality_and_route_data) |
| |
| |
| class TestAddress16(unittest.TestCase): |
| |
| def test_should_return_address_value_when_address_property_is_called(self): |
| # GIVEN |
| address = any_address() |
| |
| address16 = mle.Address16(address) |
| |
| # WHEN |
| actual_address = address16.address |
| |
| # THEN |
| self.assertEqual(address, actual_address) |
| |
| |
| class TestAddress16Factory(unittest.TestCase): |
| |
| def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| address = any_address() |
| |
| factory = mle.Address16Factory() |
| |
| data = struct.pack(">H", address) |
| |
| # WHEN |
| actual_address16 = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_address16, mle.Address16)) |
| self.assertEqual(address, actual_address16.address) |
| |
| |
| class TestLeaderData(unittest.TestCase): |
| |
| def test_should_return_partition_id_value_when_partition_id_property_is_called(self): |
| # GIVEN |
| partition_id = any_partition_id() |
| |
| leader_data = mle.LeaderData(partition_id, any_weighting(), any_data_version(), |
| any_stable_data_version(), any_leader_router_id()) |
| |
| # WHEN |
| actual_partition_id = leader_data.partition_id |
| |
| # THEN |
| self.assertEqual(partition_id, actual_partition_id) |
| |
| def test_should_return_weighting_value_when_weighting_property_is_called(self): |
| # GIVEN |
| weighting = any_weighting() |
| |
| leader_data = mle.LeaderData(any_partition_id(), weighting, any_data_version(), |
| any_stable_data_version(), any_leader_router_id()) |
| |
| # WHEN |
| actual_weighting = leader_data.weighting |
| |
| # THEN |
| self.assertEqual(weighting, actual_weighting) |
| |
| def test_should_return_data_version_value_when_data_version_property_is_called(self): |
| # GIVEN |
| data_version = any_data_version() |
| |
| leader_data = mle.LeaderData(any_partition_id(), any_weighting(), data_version, |
| any_stable_data_version(), any_leader_router_id()) |
| |
| # WHEN |
| actual_data_version = leader_data.data_version |
| |
| # THEN |
| self.assertEqual(data_version, actual_data_version) |
| |
| def test_should_return_stable_data_version_value_when_stable_data_version_property_is_called(self): |
| # GIVEN |
| stable_data_version = any_stable_data_version() |
| |
| leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(), |
| stable_data_version, any_leader_router_id()) |
| |
| # WHEN |
| actual_stable_data_version = leader_data.stable_data_version |
| |
| # THEN |
| self.assertEqual(stable_data_version, actual_stable_data_version) |
| |
| def test_should_return_leader_router_id_value_when_leader_router_id_property_is_called(self): |
| # GIVEN |
| leader_router_id = any_leader_router_id() |
| |
| leader_data = mle.LeaderData(any_partition_id(), any_weighting(), any_data_version(), |
| any_stable_data_version(), leader_router_id) |
| |
| # WHEN |
| actual_leader_router_id = leader_data.leader_router_id |
| |
| # THEN |
| self.assertEqual(leader_router_id, actual_leader_router_id) |
| |
| |
| class TestLeaderDataFactory(unittest.TestCase): |
| |
| def test_should_create_Address16_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| partition_id = any_partition_id() |
| weighting = any_weighting() |
| data_version = any_data_version() |
| stable_data_version = any_stable_data_version() |
| leader_router_id = any_leader_router_id() |
| |
| factory = mle.LeaderDataFactory() |
| |
| data = bytearray(struct.pack(">I", partition_id)) + \ |
| bytearray([weighting, data_version, stable_data_version, leader_router_id]) |
| |
| # WHEN |
| actual_leader_data = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_leader_data, mle.LeaderData)) |
| self.assertEqual(partition_id, actual_leader_data.partition_id) |
| self.assertEqual(weighting, actual_leader_data.weighting) |
| self.assertEqual(data_version, actual_leader_data.data_version) |
| self.assertEqual(stable_data_version, actual_leader_data.stable_data_version) |
| self.assertEqual(leader_router_id, actual_leader_data.leader_router_id) |
| |
| |
| class TestNetworkData(unittest.TestCase): |
| |
| def test_should_return_tlvs_value_when_tlvs_property_is_called(self): |
| # GIVEN |
| tlvs = any_tlvs() |
| |
| network_data = mle.NetworkData(tlvs) |
| |
| # WHEN |
| actual_tlvs = network_data.tlvs |
| |
| # THEN |
| self.assertEqual(tlvs, actual_tlvs) |
| |
| |
| class TestNetworkDataFactory(unittest.TestCase): |
| |
| def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| class DummyNetworkTlvsFactory: |
| |
| def parse(self, data, context): |
| return [b for b in bytearray(data.read())] |
| |
| tlvs = any_tlvs() |
| |
| factory = mle.NetworkDataFactory(DummyNetworkTlvsFactory()) |
| |
| data = bytearray(tlvs) |
| |
| # WHEN |
| actual_network_data = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_network_data, mle.NetworkData)) |
| self.assertEqual(tlvs, actual_network_data.tlvs) |
| |
| |
| class TestTlvRequest(unittest.TestCase): |
| |
| def test_should_return_tlvs_value_when_tlvs_property_is_called(self): |
| # GIVEN |
| tlvs = any_tlvs() |
| |
| tlv_request = mle.TlvRequest(tlvs) |
| |
| # WHEN |
| actual_tlvs = tlv_request.tlvs |
| |
| # THEN |
| self.assertEqual(tlvs, actual_tlvs) |
| |
| |
| class TestTlvRequestFactory(unittest.TestCase): |
| |
| def test_should_create_TlvRequest_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| tlvs = any_tlvs() |
| |
| factory = mle.TlvRequestFactory() |
| |
| data = bytearray(tlvs) |
| |
| # WHEN |
| actual_tlv_request = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_tlv_request, mle.TlvRequest)) |
| self.assertEqual(tlvs, actual_tlv_request.tlvs) |
| |
| |
| class TestScanMask(unittest.TestCase): |
| |
| def test_should_return_router_value_when_router_property_is_called(self): |
| # GIVEN |
| router = any_scan_mask_router() |
| |
| scan_mask = mle.ScanMask(router, any_scan_mask_end_device()) |
| |
| # WHEN |
| actual_router = scan_mask.router |
| |
| # THEN |
| self.assertEqual(router, actual_router) |
| |
| def test_should_return_end_device_value_when_end_device_property_is_called(self): |
| # GIVEN |
| end_device = any_scan_mask_end_device() |
| |
| scan_mask = mle.ScanMask(any_scan_mask_router(), end_device) |
| |
| # WHEN |
| actual_end_device = scan_mask.end_device |
| |
| # THEN |
| self.assertEqual(end_device, actual_end_device) |
| |
| |
| class TestScanMaskFactory(unittest.TestCase): |
| |
| def test_should_create_ScanMask_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| scan_mask = any_scan_mask() |
| |
| factory = mle.ScanMaskFactory() |
| |
| data = bytearray([scan_mask]) |
| |
| # WHEN |
| actual_scan_mask = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_scan_mask, mle.ScanMask)) |
| self.assertEqual(scan_mask_map[scan_mask]["router"], actual_scan_mask.router) |
| self.assertEqual(scan_mask_map[scan_mask]["end_device"], actual_scan_mask.end_device) |
| |
| |
| class TestConnectivity(unittest.TestCase): |
| |
| def test_should_return_pp_value_when_pp_property_is_called(self): |
| # GIVEN |
| pp = any_pp() |
| |
| connectivity_obj = mle.Connectivity(pp, |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_pp = connectivity_obj.pp |
| |
| # THEN |
| self.assertEqual(pp, actual_pp) |
| |
| def test_should_return_link_quality_3_value_when_link_quality_3_property_is_called(self): |
| # GIVEN |
| link_quality_3 = any_link_quality_3() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| link_quality_3, |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_link_quality_3 = connectivity_obj.link_quality_3 |
| |
| # THEN |
| self.assertEqual(link_quality_3, actual_link_quality_3) |
| |
| def test_should_return_link_quality_2_value_when_link_quality_2_property_is_called(self): |
| # GIVEN |
| link_quality_2 = any_link_quality_2() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| link_quality_2, |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_link_quality_2 = connectivity_obj.link_quality_2 |
| |
| # THEN |
| self.assertEqual(link_quality_2, actual_link_quality_2) |
| |
| def test_should_return_link_quality_1_value_when_link_quality_1_property_is_called(self): |
| # GIVEN |
| link_quality_1 = any_link_quality_1() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| link_quality_1, |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_link_quality_1 = connectivity_obj.link_quality_1 |
| |
| # THEN |
| self.assertEqual(link_quality_1, actual_link_quality_1) |
| |
| def test_should_return_leader_cost_value_when_leader_cost_property_is_called(self): |
| # GIVEN |
| leader_cost = any_leader_cost() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| leader_cost, |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_leader_cost = connectivity_obj.leader_cost |
| |
| # THEN |
| self.assertEqual(leader_cost, actual_leader_cost) |
| |
| def test_should_return_id_sequence_value_when_id_sequence_property_is_called(self): |
| # GIVEN |
| id_sequence = any_id_sequence() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| id_sequence, |
| any_active_routers(), |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_id_sequence = connectivity_obj.id_sequence |
| |
| # THEN |
| self.assertEqual(id_sequence, actual_id_sequence) |
| |
| def test_should_return_active_routers_value_when_active_routers_property_is_called(self): |
| # GIVEN |
| active_routers = any_active_routers() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| active_routers, |
| any_sed_buffer_size(), |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_active_routers = connectivity_obj.active_routers |
| |
| # THEN |
| self.assertEqual(active_routers, actual_active_routers) |
| |
| def test_should_return_sed_buffer_size_value_when_sed_buffer_size_property_is_called(self): |
| # GIVEN |
| sed_buffer_size = any_sed_buffer_size() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| sed_buffer_size, |
| any_sed_datagram_count()) |
| |
| # WHEN |
| actual_sed_buffer_size = connectivity_obj.sed_buffer_size |
| |
| # THEN |
| self.assertEqual(sed_buffer_size, actual_sed_buffer_size) |
| |
| def test_should_return_sed_datagram_count_value_when_sed_datagram_count_property_is_called(self): |
| # GIVEN |
| sed_datagram_count = any_sed_datagram_count() |
| |
| connectivity_obj = mle.Connectivity(any_pp(), |
| any_link_quality_3(), |
| any_link_quality_2(), |
| any_link_quality_1(), |
| any_leader_cost(), |
| any_id_sequence(), |
| any_active_routers(), |
| any_sed_buffer_size(), |
| sed_datagram_count) |
| |
| # WHEN |
| actual_sed_datagram_count = connectivity_obj.sed_datagram_count |
| |
| # THEN |
| self.assertEqual(sed_datagram_count, actual_sed_datagram_count) |
| |
| |
| class TestConnectivityFactory(unittest.TestCase): |
| |
| def test_should_create_Connectivity_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| pp = any_pp() |
| link_quality_3 = any_link_quality_3() |
| link_quality_2 = any_link_quality_2() |
| link_quality_1 = any_link_quality_1() |
| leader_cost = any_leader_cost() |
| id_sequence = any_id_sequence() |
| active_routers = any_active_routers() |
| sed_buffer_size = any_sed_buffer_size() |
| sed_datagram_count = any_sed_datagram_count() |
| |
| factory = mle.ConnectivityFactory() |
| |
| data = bytearray([pp, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, |
| active_routers]) + struct.pack(">H", sed_buffer_size) + bytearray([sed_datagram_count]) |
| |
| # WHEN |
| actual_connectivity = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_connectivity, mle.Connectivity)) |
| self.assertEqual(pp, actual_connectivity.pp) |
| self.assertEqual(link_quality_3, actual_connectivity.link_quality_3) |
| self.assertEqual(link_quality_2, actual_connectivity.link_quality_2) |
| self.assertEqual(link_quality_1, actual_connectivity.link_quality_1) |
| self.assertEqual(leader_cost, actual_connectivity.leader_cost) |
| self.assertEqual(id_sequence, actual_connectivity.id_sequence) |
| self.assertEqual(active_routers, actual_connectivity.active_routers) |
| self.assertEqual(sed_buffer_size, actual_connectivity.sed_buffer_size) |
| self.assertEqual(sed_datagram_count, actual_connectivity.sed_datagram_count) |
| |
| def test_should_create_Connectivity_without_sed_data_when_parse_method_is_called(self): |
| # GIVEN |
| pp = any_pp() |
| link_quality_3 = any_link_quality_3() |
| link_quality_2 = any_link_quality_2() |
| link_quality_1 = any_link_quality_1() |
| leader_cost = any_leader_cost() |
| id_sequence = any_id_sequence() |
| active_routers = any_active_routers() |
| any_sed_buffer_size() |
| any_sed_datagram_count() |
| |
| factory = mle.ConnectivityFactory() |
| |
| data = bytearray([pp, link_quality_3, link_quality_2, link_quality_1, leader_cost, id_sequence, |
| active_routers]) |
| |
| # WHEN |
| actual_connectivity = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_connectivity, mle.Connectivity)) |
| self.assertEqual(pp, actual_connectivity.pp) |
| self.assertEqual(link_quality_3, actual_connectivity.link_quality_3) |
| self.assertEqual(link_quality_2, actual_connectivity.link_quality_2) |
| self.assertEqual(link_quality_1, actual_connectivity.link_quality_1) |
| self.assertEqual(leader_cost, actual_connectivity.leader_cost) |
| self.assertEqual(id_sequence, actual_connectivity.id_sequence) |
| self.assertEqual(active_routers, actual_connectivity.active_routers) |
| self.assertEqual(None, actual_connectivity.sed_buffer_size) |
| self.assertEqual(None, actual_connectivity.sed_datagram_count) |
| |
| |
| class TestLinkMargin(unittest.TestCase): |
| |
| def test_should_return_link_margin_value_when_link_margin_property_is_called(self): |
| # GIVEN |
| link_margin = any_link_margin() |
| |
| link_margin_obj = mle.LinkMargin(link_margin) |
| |
| # WHEN |
| actual_link_margin = link_margin_obj.link_margin |
| |
| # THEN |
| self.assertEqual(link_margin, actual_link_margin) |
| |
| |
| class TestLinkMarginFactory(unittest.TestCase): |
| |
| def test_should_create_LinkMargin_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| link_margin = any_link_margin() |
| |
| factory = mle.LinkMarginFactory() |
| |
| data = bytearray([link_margin]) |
| |
| # WHEN |
| actual_link_margin = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_link_margin, mle.LinkMargin)) |
| self.assertEqual(link_margin, actual_link_margin.link_margin) |
| |
| |
| class TestStatus(unittest.TestCase): |
| |
| def test_should_return_status_value_when_status_property_is_called(self): |
| # GIVEN |
| status = any_status() |
| |
| status_obj = mle.Status(status) |
| |
| # WHEN |
| actual_status = status_obj.status |
| |
| # THEN |
| self.assertEqual(status, actual_status) |
| |
| |
| class TestStatusFactory(unittest.TestCase): |
| |
| def test_should_create_Status_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| status = any_status() |
| |
| factory = mle.StatusFactory() |
| |
| data = bytearray([status]) |
| |
| # WHEN |
| actual_status = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_status, mle.Status)) |
| self.assertEqual(status, actual_status.status) |
| |
| |
| class TestVersion(unittest.TestCase): |
| |
| def test_should_return_version_value_when_version_property_is_called(self): |
| # GIVEN |
| version = any_version() |
| |
| version_obj = mle.Version(version) |
| |
| # WHEN |
| actual_version = version_obj.version |
| |
| # THEN |
| self.assertEqual(version, actual_version) |
| |
| |
| class TestVersionFactory(unittest.TestCase): |
| |
| def test_should_create_Version_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| version = any_version() |
| |
| factory = mle.VersionFactory() |
| |
| data = struct.pack(">H", version) |
| |
| # WHEN |
| actual_version = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_version, mle.Version)) |
| self.assertEqual(version, actual_version.version) |
| |
| |
| class TestAddressRegistrationFull(unittest.TestCase): |
| |
| def test_should_return_ipv6_address_value_when_ipv6_address_property_is_called(self): |
| # GIVEN |
| ipv6_address = any_ipv6_address() |
| |
| addr_reg_full_obj = mle.AddressFull(ipv6_address) |
| |
| # WHEN |
| actual_ipv6_address = addr_reg_full_obj.ipv6_address |
| |
| # THEN |
| self.assertEqual(ipv6_address, actual_ipv6_address) |
| |
| |
| class TestAddressRegistrationFullFactory(unittest.TestCase): |
| |
| def test_should_create_AddressFull_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| ipv6_address = any_ipv6_address() |
| |
| factory = mle.AddressFullFactory() |
| |
| data = bytearray([0x00]) + bytearray(ipv6_address) |
| |
| # WHEN |
| actual_addr_reg_full = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_addr_reg_full, mle.AddressFull)) |
| self.assertEqual(ipv6_address, actual_addr_reg_full.ipv6_address) |
| |
| |
| class TestAddressRegistrationCompressed(unittest.TestCase): |
| |
| def test_should_return_cid_value_when_cid_property_is_called(self): |
| # GIVEN |
| cid = any_cid() |
| |
| addr_reg_compressed_obj = mle.AddressCompressed(cid, any_iid()) |
| |
| # WHEN |
| actual_cid = addr_reg_compressed_obj.cid |
| |
| # THEN |
| self.assertEqual(cid, actual_cid) |
| |
| def test_should_return_cid_value_when_iid_property_is_called(self): |
| # GIVEN |
| iid = any_iid() |
| |
| addr_reg_compressed_obj = mle.AddressCompressed(any_cid(), iid) |
| |
| # WHEN |
| actual_iid = addr_reg_compressed_obj.iid |
| |
| # THEN |
| self.assertEqual(iid, actual_iid) |
| |
| |
| class TestAddressRegistrationCompressedFactory(unittest.TestCase): |
| |
| def test_should_create_AddressRegistrationCompressed_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| cid = any_cid() |
| iid = any_iid() |
| |
| factory = mle.AddressCompressedFactory() |
| |
| data = bytearray([(1 << 7) | cid]) + iid |
| |
| # WHEN |
| actual_addr_reg_compressed = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_addr_reg_compressed, mle.AddressCompressed)) |
| self.assertEqual(cid, actual_addr_reg_compressed.cid) |
| self.assertEqual(iid, actual_addr_reg_compressed.iid) |
| |
| |
| class TestAddressRegistration(unittest.TestCase): |
| |
| def test_should_return_addresses_value_when_addresses_property_is_called(self): |
| # GIVEN |
| addresses = any_addresses() |
| |
| addr_reg_obj = mle.AddressRegistration(addresses) |
| |
| # WHEN |
| actual_addresses = addr_reg_obj.addresses |
| |
| # THEN |
| self.assertEqual(addresses, actual_addresses) |
| |
| |
| class TestAddressRegistrationFactory(unittest.TestCase): |
| |
| def test_should_create_AddressRegistration_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| cid = any_cid() |
| iid = any_iid() |
| ipv6_address = any_ipv6_address() |
| |
| addresses = [ |
| mle.AddressCompressed(cid, iid), |
| mle.AddressFull(ipv6_address) |
| ] |
| |
| factory = mle.AddressRegistrationFactory(mle.AddressCompressedFactory(), |
| mle.AddressFullFactory()) |
| |
| data = bytearray([(1 << 7) | cid]) + iid + bytearray([0]) + ipv6_address |
| |
| # WHEN |
| actual_addr_reg = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_addr_reg, mle.AddressRegistration)) |
| self.assertEqual(addresses[0].cid, actual_addr_reg.addresses[0].cid) |
| self.assertEqual(addresses[0].iid, actual_addr_reg.addresses[0].iid) |
| self.assertEqual(addresses[1].ipv6_address, actual_addr_reg.addresses[1].ipv6_address) |
| |
| |
| class TestChannel(unittest.TestCase): |
| |
| def test_should_return_channel_page_value_when_channel_page_property_is_called(self): |
| # GIVEN |
| channel_page = any_channel_page() |
| |
| channel_obj = mle.Channel(channel_page, any_channel()) |
| |
| # WHEN |
| actual_channel_page = channel_obj.channel_page |
| |
| # THEN |
| self.assertEqual(channel_page, actual_channel_page) |
| |
| def test_should_return_channel_value_when_channel_property_is_called(self): |
| # GIVEN |
| channel = any_channel() |
| |
| channel_obj = mle.Channel(any_channel_page(), channel) |
| |
| # WHEN |
| actual_channel = channel_obj.channel |
| |
| # THEN |
| self.assertEqual(channel, actual_channel) |
| |
| |
| class TestChannelFactory(unittest.TestCase): |
| |
| def test_should_create_Channel_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| channel_page = any_channel_page() |
| channel = any_channel() |
| |
| factory = mle.ChannelFactory() |
| |
| data = bytearray([channel_page]) + struct.pack(">H", channel) |
| |
| # WHEN |
| actual_channel = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_channel, mle.Channel)) |
| self.assertEqual(channel_page, actual_channel.channel_page) |
| self.assertEqual(channel, actual_channel.channel) |
| |
| |
| class TestPanId(unittest.TestCase): |
| |
| def test_should_return_pan_id_value_when_pan_id_property_is_called(self): |
| # GIVEN |
| pan_id = any_pan_id() |
| |
| pan_id_obj = mle.PanId(pan_id) |
| |
| # WHEN |
| actual_pan_id = pan_id_obj.pan_id |
| |
| # THEN |
| self.assertEqual(pan_id, actual_pan_id) |
| |
| |
| class TestPanIdFactory(unittest.TestCase): |
| |
| def test_should_create_PanId_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| pan_id = any_pan_id() |
| |
| factory = mle.PanIdFactory() |
| |
| data = struct.pack(">H", pan_id) |
| |
| # WHEN |
| actual_pan_id = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_pan_id, mle.PanId)) |
| self.assertEqual(pan_id, actual_pan_id.pan_id) |
| |
| |
| class TestActiveTimestamp(unittest.TestCase): |
| |
| def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self): |
| # GIVEN |
| timestamp_seconds = any_timestamp_seconds() |
| |
| active_timestamp_obj = mle.ActiveTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u()) |
| |
| # WHEN |
| actual_timestamp_seconds = active_timestamp_obj.timestamp_seconds |
| |
| # THEN |
| self.assertEqual(timestamp_seconds, actual_timestamp_seconds) |
| |
| def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self): |
| # GIVEN |
| timestamp_ticks = any_timestamp_ticks() |
| |
| active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u()) |
| |
| # WHEN |
| actual_timestamp_ticks = active_timestamp_obj.timestamp_ticks |
| |
| # THEN |
| self.assertEqual(timestamp_ticks, actual_timestamp_ticks) |
| |
| def test_should_return_u_value_when_u_property_is_called(self): |
| # GIVEN |
| u = any_u() |
| |
| active_timestamp_obj = mle.ActiveTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u) |
| |
| # WHEN |
| actual_u = active_timestamp_obj.u |
| |
| # THEN |
| self.assertEqual(u, actual_u) |
| |
| |
| class TestActiveTimestampFactory(unittest.TestCase): |
| |
| def test_should_create_ActiveTimestamp_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| timestamp_seconds = any_timestamp_seconds() |
| timestamp_ticks = any_timestamp_ticks() |
| u = any_u() |
| |
| factory = mle.ActiveTimestampFactory() |
| |
| data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u) |
| |
| # WHEN |
| active_timestamp = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(active_timestamp, mle.ActiveTimestamp)) |
| self.assertEqual(timestamp_seconds, active_timestamp.timestamp_seconds) |
| self.assertEqual(timestamp_ticks, active_timestamp.timestamp_ticks) |
| self.assertEqual(u, active_timestamp.u) |
| |
| |
| class TestPendingTimestamp(unittest.TestCase): |
| |
| def test_should_return_timestamp_seconds_value_when_timestamp_seconds_property_is_called(self): |
| # GIVEN |
| timestamp_seconds = any_timestamp_seconds() |
| |
| pending_timestamp_obj = mle.PendingTimestamp(timestamp_seconds, any_timestamp_ticks(), any_u()) |
| |
| # WHEN |
| actual_timestamp_seconds = pending_timestamp_obj.timestamp_seconds |
| |
| # THEN |
| self.assertEqual(timestamp_seconds, actual_timestamp_seconds) |
| |
| def test_should_return_timestamp_ticks_value_when_timestamp_ticks_property_is_called(self): |
| # GIVEN |
| timestamp_ticks = any_timestamp_ticks() |
| |
| pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), timestamp_ticks, any_u()) |
| |
| # WHEN |
| actual_timestamp_ticks = pending_timestamp_obj.timestamp_ticks |
| |
| # THEN |
| self.assertEqual(timestamp_ticks, actual_timestamp_ticks) |
| |
| def test_should_return_u_value_when_u_property_is_called(self): |
| # GIVEN |
| u = any_u() |
| |
| pending_timestamp_obj = mle.PendingTimestamp(any_timestamp_seconds(), any_timestamp_ticks(), u) |
| |
| # WHEN |
| actual_u = pending_timestamp_obj.u |
| |
| # THEN |
| self.assertEqual(u, actual_u) |
| |
| |
| class TestPendingTimestampFactory(unittest.TestCase): |
| |
| def test_should_create_PendingTimestamp_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| timestamp_seconds = any_timestamp_seconds() |
| timestamp_ticks = any_timestamp_ticks() |
| u = any_u() |
| |
| factory = mle.PendingTimestampFactory() |
| |
| data = struct.pack(">Q", timestamp_seconds)[2:] + struct.pack(">H", (timestamp_ticks << 1) | u) |
| |
| # WHEN |
| pending_timestamp = factory.parse(io.BytesIO(data), dict()) |
| |
| # THEN |
| self.assertTrue(isinstance(pending_timestamp, mle.PendingTimestamp)) |
| self.assertEqual(timestamp_seconds, pending_timestamp.timestamp_seconds) |
| self.assertEqual(timestamp_ticks, pending_timestamp.timestamp_ticks) |
| self.assertEqual(u, pending_timestamp.u) |
| |
| |
| class TestMleCommandFactory(unittest.TestCase): |
| |
| def test_should_create_MleCommand_from_bytearray_when_parse_method_is_called(self): |
| data = bytearray([0x0b, 0x04, 0x08, 0xa5, 0xf2, 0x9b, 0xde, 0xe3, |
| 0xd8, 0xbe, 0xb9, 0x05, 0x04, 0x00, 0x00, 0x00, |
| 0x00, 0x08, 0x04, 0x00, 0x00, 0x00, 0x01, 0x01, |
| 0x01, 0x0d, 0x02, 0x04, 0x00, 0x00, 0x00, 0xf0, |
| 0x12, 0x02, 0x00, 0x02, 0x13, 0x09, 0x80, 0x86, |
| 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8, 0x0d, |
| 0x03, 0x0a, 0x0c, 0x09]) |
| |
| factory = mle.MleCommandFactory(config.create_default_mle_tlvs_factories()) |
| |
| # WHEN |
| actual_mle_command = factory.parse(io.BytesIO(data), None) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_mle_command, mle.MleCommand)) |
| |
| self.assertEqual(11, actual_mle_command.type) |
| |
| self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])), |
| actual_mle_command.tlvs[0]) |
| |
| self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_command.tlvs[1]) |
| |
| self.assertEqual(mle.MleFrameCounter(1), actual_mle_command.tlvs[2]) |
| |
| self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1), |
| actual_mle_command.tlvs[3]) |
| |
| self.assertEqual(mle.Timeout(240), actual_mle_command.tlvs[4]) |
| |
| self.assertEqual(mle.Version(2), actual_mle_command.tlvs[5]) |
| |
| self.assertEqual(mle.AddressRegistration(addresses=[ |
| mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8]))]), |
| actual_mle_command.tlvs[6]) |
| |
| self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_command.tlvs[7]) |
| |
| |
| class TestMleMessageFactory(unittest.TestCase): |
| |
| def test_should_create_MleMessageSecured_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| message_info = common.MessageInfo() |
| message_info.source_ipv6 = "fe80::10cf:d38b:3b61:5558" |
| message_info.destination_ipv6 = "fe80::383e:9eed:7a01:36a5" |
| |
| message_info.source_mac_address = common.MacAddress.from_eui64( |
| bytearray([0x12, 0xcf, 0xd3, 0x8b, 0x3b, 0x61, 0x55, 0x58])) |
| message_info.destination_mac_address = common.MacAddress.from_eui64( |
| bytearray([0x3a, 0x3e, 0x9e, 0xed, 0x7a, 0x01, 0x36, 0xa5])) |
| |
| data = bytearray([0x00, 0x15, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x01, 0x14, 0x03, 0xe3, 0x72, 0x50, 0x4f, |
| 0x8c, 0x5c, 0x42, 0x81, 0x68, 0xe2, 0x11, 0xfc, |
| 0xf5, 0x8c, 0x62, 0x8e, 0x83, 0x99, 0xe7, 0x26, |
| 0x86, 0x34, 0x3b, 0xa7, 0x68, 0xc7, 0x93, 0xfb, |
| 0x72, 0xd9, 0xcc, 0x13, 0x5e, 0x5b, 0x96, 0x0e, |
| 0xf1, 0x80, 0x03, 0x55, 0x4f, 0x27, 0xc2, 0x96, |
| 0xf4, 0x9c, 0x65, 0x82, 0x97, 0xcf, 0x97, 0x35, |
| 0x89, 0xc2]) |
| |
| factory = config.create_default_mle_message_factory(master_key=config.DEFAULT_MASTER_KEY) |
| |
| # WHEN |
| actual_mle_message = factory.parse(io.BytesIO(data), message_info) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured)) |
| |
| self.assertEqual(11, actual_mle_message.command.type) |
| |
| self.assertEqual(mle.Response(bytearray([0xa5, 0xf2, 0x9b, 0xde, 0xe3, 0xd8, 0xbe, 0xb9])), |
| actual_mle_message.command.tlvs[0]) |
| |
| self.assertEqual(mle.LinkLayerFrameCounter(0), actual_mle_message.command.tlvs[1]) |
| |
| self.assertEqual(mle.MleFrameCounter(1), actual_mle_message.command.tlvs[2]) |
| |
| self.assertEqual(mle.Mode(receiver=1, secure=1, device_type=0, network_data=1), |
| actual_mle_message.command.tlvs[3]) |
| |
| self.assertEqual(mle.Timeout(240), actual_mle_message.command.tlvs[4]) |
| |
| self.assertEqual(mle.Version(2), actual_mle_message.command.tlvs[5]) |
| |
| self.assertEqual(mle.AddressRegistration(addresses=[ |
| mle.AddressCompressed(cid=0, iid=bytearray([0x86, 0xa2, 0x1b, 0x81, 0x6d, 0xb8, 0xb5, 0xe8]))]), |
| actual_mle_message.command.tlvs[6]) |
| |
| self.assertEqual(mle.TlvRequest(tlvs=[10, 12, 9]), actual_mle_message.command.tlvs[7]) |
| |
| self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic) |
| |
| def test_should_create_MleMessageSecured_with_MLE_Data_Response_from_bytearray_when_parse_method_is_called(self): |
| # GIVEN |
| message_info = common.MessageInfo() |
| message_info.source_ipv6 = "fe80::241c:b11b:7b62:caf1" |
| message_info.destination_ipv6 = "ff02::1" |
| |
| message_info.source_mac_address = common.MacAddress.from_eui64( |
| bytearray([0x26, 0x1c, 0xb1, 0x1b, 0x7b, 0x62, 0xca, 0xf1])) |
| message_info.destination_mac_address = common.MacAddress.from_eui64( |
| bytearray([0x3a, 0xba, 0xad, 0xca, 0xfe, 0xde, 0xff, 0xa5])) |
| |
| data = bytearray([0x00, 0x15, 0x15, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x01, 0xca, 0xd3, 0x45, 0xe2, 0x35, |
| 0x1d, 0x00, 0x2d, 0x72, 0x71, 0xb1, 0x19, 0xaf, |
| 0x8b, 0x05, 0xd9, 0x52, 0x74, 0xce, 0xe6, 0x36, |
| 0x53, 0xeb, 0xc6, 0x25, 0x94, 0x01, 0x6d, 0x20, |
| 0xdf, 0x30, 0x82, 0xf8, 0xbb, 0x34, 0x47, 0x42, |
| 0x50, 0xe9, 0x41, 0xa7, 0x33, 0xa5]) |
| |
| factory = config.create_default_mle_message_factory(master_key=config.DEFAULT_MASTER_KEY) |
| |
| # WHEN |
| actual_mle_message = factory.parse(io.BytesIO(data), message_info) |
| |
| # THEN |
| self.assertTrue(isinstance(actual_mle_message, mle.MleMessageSecured)) |
| |
| self.assertEqual(8, actual_mle_message.command.type) |
| |
| self.assertEqual(mle.SourceAddress(address=0x9400), actual_mle_message.command.tlvs[0]) |
| |
| self.assertEqual(mle.LeaderData( |
| partition_id=0x06d014ca, |
| weighting=64, |
| data_version=131, |
| stable_data_version=168, |
| leader_router_id=37 |
| ), actual_mle_message.command.tlvs[1]) |
| |
| self.assertEqual(mle.NetworkData(tlvs=[ |
| network_data.Prefix( |
| domain_id=0, |
| prefix_length=64, |
| prefix=bytearray([0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34]), |
| sub_tlvs=[ |
| network_data.LowpanId(c=1, cid=1, context_length=64, stable=1), |
| network_data.BorderRouter(border_router_16=37888, prf=0, p=1, |
| s=1, d=0, c=0, r=1, o=1, n=0, stable=1) |
| ], |
| stable=1 |
| ) |
| ]), actual_mle_message.command.tlvs[2]) |
| |
| self.assertEqual(bytearray(data[-4:]), actual_mle_message.mic) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |