| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2016, The OpenThread Authors. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are met: |
| # 1. Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # 2. Redistributions in binary form must reproduce the above copyright |
| # notice, this list of conditions and the following disclaimer in the |
| # documentation and/or other materials provided with the distribution. |
| # 3. Neither the name of the copyright holder nor the |
| # names of its contributors may be used to endorse or promote products |
| # derived from this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| # POSSIBILITY OF SUCH DAMAGE. |
| # |
| |
| """ |
| This module provides simple 802.15.4 MAC parser. |
| """ |
| |
| import io |
| import struct |
| |
| import config |
| from common import MacAddress, MacAddressType, MessageInfo |
| from net_crypto import ( |
| AuxiliarySecurityHeader, |
| CryptoEngine, |
| MacCryptoMaterialCreator, |
| ) |
| |
| |
| class DeviceDescriptors: |
| |
| """Class representing 802.15.4 Device Descriptors.""" |
| |
| device_descriptors = {} |
| |
| @classmethod |
| def add(cls, short_address, extended_address): |
| short_address = cls._get_short_address_value(short_address) |
| cls.device_descriptors[short_address] = extended_address |
| |
| @classmethod |
| def get_extended(cls, short_address): |
| short_address = cls._get_short_address_value(short_address) |
| return cls.device_descriptors[short_address] |
| |
| @staticmethod |
| def _get_short_address_value(short_address): |
| if isinstance(short_address, MacAddress): |
| short_address = short_address.rloc |
| return short_address |
| |
| |
| class InformationElement: |
| |
| """Class representing 802.15.4 MAC Information Element.""" |
| def __init__(self, id, length, content): |
| self._id = id |
| self._length = length |
| self._content = content |
| |
| @property |
| def id(self): |
| return self._id |
| |
| @property |
| def length(self): |
| return self._length |
| |
| @property |
| def content(self): |
| return self._content |
| |
| |
| class MacHeader: |
| |
| """Class representing 802.15.4 MAC header.""" |
| |
| class FrameType: |
| BEACON = 0 |
| DATA = 1 |
| ACK = 2 |
| COMMAND = 3 |
| |
| class AddressMode: |
| NOT_PRESENT = 0 |
| SHORT = 2 |
| EXTENDED = 3 |
| |
| class CommandIdentifier: |
| DATA_REQUEST = 4 |
| |
| def __init__( |
| self, |
| frame_type, |
| frame_pending, |
| ack_request, |
| frame_version, |
| seq, |
| dest_pan_id=None, |
| dest_address=None, |
| src_pan_id=None, |
| src_address=None, |
| command_type=None, |
| aux_sec_header=None, |
| mic=None, |
| fcs=None, |
| ): |
| |
| self.frame_type = frame_type |
| self.frame_pending = frame_pending |
| self.ack_request = ack_request |
| self.frame_version = frame_version |
| self.seq = seq |
| |
| self.dest_pan_id = dest_pan_id |
| self.dest_address = dest_address |
| self.src_pan_id = src_pan_id |
| self.src_address = src_address |
| self.command_type = command_type |
| |
| self.aux_sec_header = aux_sec_header |
| self.mic = mic |
| |
| self.fcs = fcs |
| |
| |
| class MacPayload: |
| |
| """Class representing 802.15.4 MAC payload.""" |
| |
| def __init__(self, data): |
| self.data = bytearray(data) |
| |
| |
| class MacFrame: |
| |
| """Class representing 802.15.4 MAC frame.""" |
| |
| IEEE802154_HEADER_IE_TYPE_MASK = 0x8000 |
| IEEE802154_HEADER_IE_ID_MASK = 0x7F80 |
| IEEE802154_HEADER_IE_LENGTH_MASK = 0x007F |
| |
| IEEE802154_HEADER_IE_HT1 = 0x7E |
| IEEE802154_HEADER_IE_HT2 = 0x7F |
| |
| IEEE802154_VERSION_2015 = 0x02 |
| |
| def parse(self, data): |
| """Parse a MAC 802.15.4 frame |
| |
| Format of MAC 802.15.4 Frame: |
| | Frame Header | ASH | IE | Frame Payload | MIC | FCS | |
| |
| Frame Header: Frame Control | Sequence Number | Addressing Field |
| ASH: Auxiliary Security Header (optional) |
| IE: Header IE | Payload IE (optional) |
| Frame Payload: Data payload (optional) |
| MIC: To authenticate the frame (optional) |
| |
| In the parsing process, Frame Header is parsed first. Then it can be |
| determined if security is enabled for this frame. If it's enabled, |
| then ASH should be parsed. |
| After that, go directly to the end to parse FCS first instead |
| of parsing IE or Frame Payload. This is because sometimes, IE doesn't |
| have a termination IE and parsing needs to know the start of MIC to determine |
| where's the end of IEs(In such cases there are no payload). |
| After parsing FCS, MIC is parsed if security is enabled. |
| Then the end of IE(or payload) is known. And at last IEs and payload |
| are parsed. |
| And after parsing everything, build MacHeader and MacPayload with |
| the things parsed. |
| """ |
| mhr_start = data.tell() |
| |
| fc, seq = struct.unpack("<HB", data.read(3)) |
| |
| frame_type = fc & 0x0007 |
| security_enabled = bool(fc & 0x0008) |
| frame_pending = bool(fc & 0x0010) |
| ack_request = bool(fc & 0x0020) |
| panid_compression = bool(fc & 0x0040) |
| dest_addr_mode = (fc & 0x0C00) >> 10 |
| frame_version = (fc & 0x3000) >> 12 |
| source_addr_mode = (fc & 0xC000) >> 14 |
| ie_present = bool(fc & 0x0200) |
| |
| if frame_type == MacHeader.FrameType.ACK: |
| fcs = self._parse_fcs(data, data.tell()) |
| self.header = MacHeader( |
| frame_type, |
| frame_pending, |
| ack_request, |
| frame_version, |
| seq, |
| fcs=fcs, |
| ) |
| self.payload = None |
| return |
| |
| # Presence of PAN Ids is not fully implemented yet but should be enough |
| # for Thread. |
| dest_pan_id = struct.unpack("<H", data.read(2))[0] |
| |
| dest_address = self._parse_address(data, dest_addr_mode) |
| |
| src_pan_present = not panid_compression |
| while src_pan_present: |
| if frame_version < 2: |
| break |
| if frame_version == 3: |
| assert False |
| if dest_addr_mode == 0: |
| break |
| if dest_addr_mode == 2: |
| break |
| if dest_addr_mode == 3 and source_addr_mode == 2: |
| break |
| src_pan_present = False |
| |
| if src_pan_present: |
| src_pan_id = struct.unpack("<H", data.read(2))[0] |
| else: |
| src_pan_id = dest_pan_id |
| |
| src_address = self._parse_address(data, source_addr_mode) |
| |
| mhr_end = data.tell() |
| |
| if security_enabled: |
| aux_sec_header = self._parse_aux_sec_header(data) |
| aux_sec_header_end = data.tell() |
| else: |
| aux_sec_header = None |
| |
| # Record the beginning position of header IE. |
| # Or the position of payload if there is no header IE. |
| cur_pos = data.tell() |
| header_ie_start = cur_pos |
| |
| data.seek(-2, io.SEEK_END) |
| fcs_start = data.tell() |
| fcs = self._parse_fcs(data, fcs_start) |
| |
| data.seek(fcs_start) |
| if aux_sec_header and aux_sec_header.security_level: |
| mic, payload_end = self._parse_mic( |
| data, aux_sec_header.security_level |
| ) |
| else: |
| payload_end = fcs_start |
| mic = None |
| |
| # There may be no termination IE. In such case, there is no payload, get |
| # IE until the beginning position of MIC or a termination IE. |
| header_ie_list = [] |
| data.seek(cur_pos) |
| while ie_present and cur_pos + 2 < payload_end: |
| header_ie = struct.unpack("<H", data.read(2))[0] |
| id = ( |
| header_ie & MacFrame.IEEE802154_HEADER_IE_ID_MASK |
| ) >> 7 |
| # Currently, payload IE doesn't exist in the code. So only HT2 is required. |
| # TODO: support HT1 when there are Payload IEs in our code |
| assert id != MacFrame.IEEE802154_HEADER_IE_HT1, \ |
| 'Currently there should be no HT1!' |
| header_ie_length = ( |
| header_ie & MacFrame.IEEE802154_HEADER_IE_LENGTH_MASK |
| ) |
| assert cur_pos + 2 + header_ie_length <= payload_end, \ |
| 'Parsing Header IE error, IE id:{} length:{}'.format(id, header_ie_length) |
| header_ie_content = data.read(header_ie_length) |
| header_ie_list.append( |
| InformationElement(id, header_ie_length, header_ie_content) |
| ) |
| cur_pos += 2 + header_ie_length |
| if id == MacFrame.IEEE802154_HEADER_IE_HT2: |
| break |
| header_ie_end = cur_pos |
| |
| payload_pos = cur_pos |
| payload_len = payload_end - payload_pos |
| data.seek(payload_pos) |
| payload = data.read(payload_len) |
| |
| if security_enabled: |
| mhr_len = mhr_end - mhr_start |
| data.seek(mhr_start) |
| mhr_bytes = data.read(mhr_len) |
| |
| aux_sec_header_len = aux_sec_header_end - mhr_end |
| aux_sec_hdr_bytes = data.read(aux_sec_header_len) |
| |
| extra_open_fields = bytearray([]) |
| |
| if ie_present: |
| data.seek(header_ie_start) |
| extra_open_fields += data.read( |
| header_ie_end - header_ie_start |
| ) |
| |
| message_info = MessageInfo() |
| message_info.aux_sec_hdr = aux_sec_header |
| message_info.aux_sec_hdr_bytes = aux_sec_hdr_bytes |
| message_info.extra_open_fields = extra_open_fields |
| message_info.mhr_bytes = mhr_bytes |
| |
| open_payload = [] |
| private_payload = payload |
| |
| # Check end of MAC frame |
| if frame_type == MacHeader.FrameType.COMMAND: |
| if frame_version < MacFrame.IEEE802154_VERSION_2015: |
| extra_open_fields.append(payload[0]) |
| open_payload = payload[0:1] |
| private_payload = payload[1:] |
| message_info.open_payload_length = 1 |
| |
| if src_address.type == MacAddressType.SHORT: |
| message_info.source_mac_address = DeviceDescriptors.get_extended( |
| src_address).mac_address |
| else: |
| message_info.source_mac_address = src_address.mac_address |
| |
| sec_obj = CryptoEngine( |
| MacCryptoMaterialCreator(config.DEFAULT_MASTER_KEY) |
| ) |
| self.payload = MacPayload( |
| bytearray(open_payload) |
| + sec_obj.decrypt(private_payload, mic, message_info) |
| ) |
| |
| else: |
| self.payload = MacPayload(payload) |
| |
| if frame_type == MacHeader.FrameType.COMMAND: |
| command_type = self.payload.data[0] |
| else: |
| command_type = None |
| |
| # Create Header object |
| self.header = MacHeader( |
| frame_type, |
| frame_pending, |
| ack_request, |
| frame_version, |
| seq, |
| dest_pan_id, |
| dest_address, |
| src_pan_id, |
| src_address, |
| command_type, |
| aux_sec_header, |
| mic, |
| fcs, |
| ) |
| |
| def _parse_address(self, data, mode): |
| if mode == MacHeader.AddressMode.SHORT: |
| return MacAddress( |
| data.read(2), MacAddressType.SHORT, big_endian=False |
| ) |
| |
| if mode == MacHeader.AddressMode.EXTENDED: |
| return MacAddress( |
| data.read(8), MacAddressType.LONG, big_endian=False |
| ) |
| |
| else: |
| return None |
| |
| def _parse_aux_sec_header(self, data): |
| security_control, frame_counter = struct.unpack("<BL", data.read(5)) |
| |
| security_level = security_control & 0x07 |
| key_id_mode = (security_control & 0x18) >> 3 |
| |
| if key_id_mode == 0: |
| key_id = data.read(9) |
| elif key_id_mode == 1: |
| key_id = data.read(1) |
| elif key_id_mode == 2: |
| key_id = data.read(5) |
| else: |
| pass |
| |
| return AuxiliarySecurityHeader( |
| key_id_mode, |
| security_level, |
| frame_counter, |
| key_id, |
| big_endian=False, |
| ) |
| |
| def _parse_mic(self, data, security_level): |
| if security_level in (1, 5): |
| data.seek(-4, io.SEEK_CUR) |
| payload_end = data.tell() |
| mic = data.read(4) |
| elif security_level in (2, 6): |
| data.seek(-8, io.SEEK_CUR) |
| payload_end = data.tell() |
| mic = data.read(8) |
| elif security_level in (3, 7): |
| data.seek(-16, io.SEEK_CUR) |
| payload_end = data.tell() |
| mic = data.read(16) |
| else: |
| payload_end = data.tell() |
| |
| return mic, payload_end |
| |
| def _parse_fcs(self, data, fcs_start): |
| data.seek(fcs_start) |
| fcs = bytearray(data.read(2)) |
| return fcs |