blob: c9c949a463bc600a91999a0cb2f6e4458fae74e8 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2020, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 'AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import struct
from collections import namedtuple
from enum import Enum
FCF_FRAME_BEACON = 0 << 0
FCF_FRAME_DATA = 1 << 0
FCF_FRAME_ACK = 2 << 0
FCF_FRAME_MAC_CMD = 3 << 0
FCF_FRAME_TYPE_MASK = 7 << 0
FCF_SECURITY_ENABLED = 1 << 3
FCF_FRAME_PENDING = 1 << 4
FCF_ACK_REQUEST = 1 << 5
FCF_PANID_COMPRESSION = 1 << 6
FCF_IE_PRESENT = 1 << 9
FCF_DST_ADDR_NONE = 0 << 10
FCF_DST_ADDR_SHORT = 2 << 10
FCF_DST_ADDR_EXT = 3 << 10
FCF_DST_ADDR_MASK = 3 << 10
FCF_FRAME_VERSION_2006 = 1 << 12
FCF_FRAME_VERSION_2015 = 2 << 12
FCF_FRAME_VERSION_MASK = 3 << 12
FCF_SRC_ADDR_NONE = 0 << 14
FCF_SRC_ADDR_SHORT = 2 << 14
FCF_SRC_ADDR_EXT = 3 << 14
FCD_SRC_ADDR_MASK = 3 << 14
class DstAddrMode(Enum):
NONE = 0
RESERVED = 1
SHORT = 2
EXTENDED = 3
class FrameType(Enum):
BEACON = 0
DATA = 1
ACK = 2
COMMAND = 3
class WpanFrameInfo(namedtuple('WpanFrameInfo', ['fcf', 'seq_no', 'dst_extaddr', 'dst_short'])):
@property
def frame_type(self) -> FrameType:
return FrameType(self.fcf & 0x7)
@property
def dst_addr_mode(self) -> DstAddrMode:
return DstAddrMode((self.fcf & 0x0c00) >> 10)
@property
def is_broadcast(self) -> bool:
return self.dst_addr_mode == DstAddrMode.SHORT and self.dst_short == 0xffff
def _is_version_2015(fcf: int) -> bool:
return (fcf & FCF_FRAME_VERSION_MASK) == FCF_FRAME_VERSION_2015
def _is_dst_addr_present(fcf: int) -> bool:
dst_addr_mode = DstAddrMode((fcf & 0x0c00) >> 10)
return dst_addr_mode != DstAddrMode.NONE
_DST_PAN_ID_NOT_PRESENT_SET = {
FCF_DST_ADDR_NONE | FCF_SRC_ADDR_NONE,
FCF_DST_ADDR_EXT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION,
FCF_DST_ADDR_SHORT | FCF_SRC_ADDR_NONE | FCF_PANID_COMPRESSION,
FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT,
FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT,
FCF_DST_ADDR_NONE | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION,
FCF_DST_ADDR_NONE | FCF_SRC_ADDR_SHORT | FCF_PANID_COMPRESSION,
FCF_DST_ADDR_EXT | FCF_SRC_ADDR_EXT | FCF_PANID_COMPRESSION,
}
def _is_dst_pan_id_present(fcf: int) -> bool:
if _is_version_2015(fcf):
v = fcf & (FCF_DST_ADDR_MASK | FCD_SRC_ADDR_MASK | FCF_PANID_COMPRESSION)
present = v not in _DST_PAN_ID_NOT_PRESENT_SET
else:
present = _is_dst_addr_present(fcf)
return present
def dissect(frame: bytes) -> WpanFrameInfo:
fcf = struct.unpack("<H", frame[1:3])[0]
seq_no = frame[3]
dst_addr_mode = (fcf & 0x0c00) >> 10
dst_extaddr, dst_short = None, None
if dst_addr_mode in [DstAddrMode.SHORT.value, DstAddrMode.EXTENDED.value]:
offset = 4
if _is_dst_pan_id_present(fcf):
offset += 2
if dst_addr_mode == DstAddrMode.SHORT.value:
dst_short = struct.unpack('<H', frame[offset:offset + 2])[0]
else:
dst_extaddr = '%016x' % struct.unpack('<Q', frame[offset:offset + 8])[0]
return WpanFrameInfo(fcf=fcf, seq_no=seq_no, dst_extaddr=dst_extaddr, dst_short=dst_short)