blob: 250501077f463547f2921135e76aa14f426281f5 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2018, The OpenThread Authors.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import binascii
import bisect
import os
import socket
import struct
import traceback
import time
import io
import config
import mesh_cop
import message
import pcap
import wpan
def dbg_print(*args):
if False:
print(args)
class BaseSimulator(object):
def __init__(self):
self._nodes = {}
self.commissioning_messages = {}
self._payload_parse_factory = mesh_cop.MeshCopCommandFactory(mesh_cop.create_default_mesh_cop_tlv_factories())
self._mesh_cop_msg_set = mesh_cop.create_mesh_cop_message_type_set()
def __del__(self):
self._nodes = None
def add_node(self, node):
self._nodes[node.nodeid] = node
self.commissioning_messages[node.nodeid] = []
def set_lowpan_context(self, cid, prefix):
raise NotImplementedError
def get_messages_sent_by(self, nodeid):
raise NotImplementedError
def go(self, duration, nodeid=None):
raise NotImplementedError
def stop(self):
raise NotImplementedError
def read_cert_messages_in_commissioning_log(self, nodeids):
for nodeid in nodeids:
node = self._nodes[nodeid]
if not node:
continue
for (
direction,
type,
payload,
) in node.read_cert_messages_in_commissioning_log():
msg = self._payload_parse_factory.parse(type.decode("utf-8"), io.BytesIO(payload))
self.commissioning_messages[nodeid].append(msg)
class RealTime(BaseSimulator):
def __init__(self, use_message_factory=True):
super(RealTime, self).__init__()
self._sniffer = config.create_default_thread_sniffer(use_message_factory=use_message_factory)
self._sniffer.start()
def set_lowpan_context(self, cid, prefix):
self._sniffer.set_lowpan_context(cid, prefix)
def get_messages_sent_by(self, nodeid):
messages = self._sniffer.get_messages_sent_by(nodeid).messages
ret = message.MessagesSet(messages, self.commissioning_messages[nodeid])
self.commissioning_messages[nodeid] = []
return ret
def now(self):
return time.time()
def go(self, duration, nodeid=None):
time.sleep(duration)
def stop(self):
if self.is_running:
# self._sniffer.stop() # FIXME: seems it blocks forever
self._sniffer = None
@property
def is_running(self):
return self._sniffer is not None
class VirtualTime(BaseSimulator):
OT_SIM_EVENT_ALARM_FIRED = 0
OT_SIM_EVENT_RADIO_RECEIVED = 1
OT_SIM_EVENT_UART_WRITE = 2
OT_SIM_EVENT_RADIO_SPINEL_WRITE = 3
OT_SIM_EVENT_POSTCMD = 4
EVENT_TIME = 0
EVENT_SEQUENCE = 1
EVENT_ADDR = 2
EVENT_TYPE = 3
EVENT_DATA_LENGTH = 4
EVENT_DATA = 5
BASE_PORT = 9000
MAX_NODES = 33
MAX_MESSAGE = 1024
END_OF_TIME = float('inf')
PORT_OFFSET = int(os.getenv('PORT_OFFSET', '0'))
BLOCK_TIMEOUT = 10
NCP_SIM = os.getenv('NODE_TYPE', 'sim') == 'ncp-sim'
_message_factory = None
def __init__(self, use_message_factory=True):
super(VirtualTime, self).__init__()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 * 1024 * 1024)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2 * 1024 * 1024)
ip = '127.0.0.1'
self.port = self.BASE_PORT + (self.PORT_OFFSET * (self.MAX_NODES + 1))
self.sock.bind((ip, self.port))
self.devices = {}
self.event_queue = []
# there could be events scheduled at exactly the same time
self.event_sequence = 0
self.current_time = 0
self.current_event = None
self.awake_devices = set()
self._nodes_by_ack_seq = {}
self._node_ack_seq = {}
self._pcap = pcap.PcapCodec(os.getenv('TEST_NAME', 'current'))
# the addr for spinel-cli sending OT_SIM_EVENT_POSTCMD
self._spinel_cli_addr = (ip, self.BASE_PORT + self.port)
self.current_nodeid = None
self._pause_time = 0
if use_message_factory:
self._message_factory = config.create_default_thread_message_factory()
else:
self._message_factory = None
def __del__(self):
if self.sock:
self.stop()
def stop(self):
if self.sock:
self.sock.close()
self.sock = None
@property
def is_running(self):
return self.sock is not None
def _add_message(self, nodeid, message_obj):
addr = ('127.0.0.1', self.port + nodeid)
# Ignore any exceptions
try:
if self._message_factory is not None:
messages = self._message_factory.create(io.BytesIO(message_obj))
self.devices[addr]['msgs'] += messages
except message.DropPacketException:
print('Drop current packet because it cannot be handled in test scripts')
except Exception as e:
# Just print the exception to the console
print("EXCEPTION: %s" % e)
traceback.print_exc()
def set_lowpan_context(self, cid, prefix):
if self._message_factory is not None:
self._message_factory.set_lowpan_context(cid, prefix)
def get_messages_sent_by(self, nodeid):
""" Get sniffed messages.
Note! This method flushes the message queue so calling this
method again will return only the newly logged messages.
Args:
nodeid (int): node id
Returns:
MessagesSet: a set with received messages.
"""
addr = ('127.0.0.1', self.port + nodeid)
messages = self.devices[addr]['msgs']
self.devices[addr]['msgs'] = []
ret = message.MessagesSet(messages, self.commissioning_messages[nodeid])
self.commissioning_messages[nodeid] = []
return ret
def _is_radio(self, addr):
return addr[1] < self.BASE_PORT * 2
def _to_core_addr(self, addr):
assert self._is_radio(addr)
return (addr[0], addr[1] + self.BASE_PORT)
def _to_radio_addr(self, addr):
assert not self._is_radio(addr)
return (addr[0], addr[1] - self.BASE_PORT)
def _core_addr_from(self, nodeid):
if self._nodes[nodeid].is_posix:
return ('127.0.0.1', self.BASE_PORT + self.port + nodeid)
else:
return ('127.0.0.1', self.port + nodeid)
def _next_event_time(self):
if len(self.event_queue) == 0:
return self.END_OF_TIME
else:
return self.event_queue[0][0]
def receive_events(self):
""" Receive events until all devices are asleep. """
while True:
if (self.current_event or len(self.awake_devices) or
(self._next_event_time() > self._pause_time and self.current_nodeid)):
self.sock.settimeout(self.BLOCK_TIMEOUT)
try:
msg, addr = self.sock.recvfrom(self.MAX_MESSAGE)
except socket.error:
# print debug information on failure
print('Current nodeid:')
print(self.current_nodeid)
print('Current awake:')
print(self.awake_devices)
print('Current time:')
print(self.current_time)
print('Current event:')
print(self.current_event)
print('Events:')
for event in self.event_queue:
print(event)
raise
else:
self.sock.settimeout(0)
try:
msg, addr = self.sock.recvfrom(self.MAX_MESSAGE)
except socket.error:
break
if addr != self._spinel_cli_addr and addr not in self.devices:
self.devices[addr] = {}
self.devices[addr]['alarm'] = None
self.devices[addr]['msgs'] = []
self.devices[addr]['time'] = self.current_time
self.awake_devices.discard(addr)
# print "New device:", addr, self.devices
delay, type, datalen = struct.unpack('=QBH', msg[:11])
data = msg[11:]
event_time = self.current_time + delay
if data:
dbg_print(
"New event: ",
event_time,
addr,
type,
datalen,
binascii.hexlify(data),
)
else:
dbg_print("New event: ", event_time, addr, type, datalen)
if type == self.OT_SIM_EVENT_ALARM_FIRED:
# remove any existing alarm event for device
if self.devices[addr]['alarm']:
self.event_queue.remove(self.devices[addr]['alarm'])
# print "-- Remove\t", self.devices[addr]['alarm']
# add alarm event to event queue
event = (event_time, self.event_sequence, addr, type, datalen)
self.event_sequence += 1
# print "-- Enqueue\t", event, delay, self.current_time
bisect.insort(self.event_queue, event)
self.devices[addr]['alarm'] = event
self.awake_devices.discard(addr)
if (self.current_event and self.current_event[self.EVENT_ADDR] == addr):
# print "Done\t", self.current_event
self.current_event = None
elif type == self.OT_SIM_EVENT_RADIO_RECEIVED:
assert self._is_radio(addr)
# add radio receive events event queue
frame_info = wpan.dissect(data)
recv_devices = None
if frame_info.frame_type == wpan.FrameType.ACK:
recv_devices = self._nodes_by_ack_seq.get(frame_info.seq_no)
recv_devices = recv_devices or self.devices.keys()
for device in recv_devices:
if device != addr and self._is_radio(device):
event = (
event_time,
self.event_sequence,
device,
type,
datalen,
data,
)
self.event_sequence += 1
# print "-- Enqueue\t", event
bisect.insort(self.event_queue, event)
self._pcap.append(data, (event_time // 1000000, event_time % 1000000))
self._add_message(addr[1] - self.port, data)
# add radio transmit done events to event queue
event = (
event_time,
self.event_sequence,
addr,
type,
datalen,
data,
)
self.event_sequence += 1
bisect.insort(self.event_queue, event)
if frame_info.frame_type != wpan.FrameType.ACK and not frame_info.is_broadcast:
self._on_ack_seq_change(addr, frame_info.seq_no)
self.awake_devices.add(addr)
elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE:
assert not self._is_radio(addr)
radio_addr = self._to_radio_addr(addr)
if radio_addr not in self.devices:
self.awake_devices.add(radio_addr)
event = (
event_time,
self.event_sequence,
radio_addr,
self.OT_SIM_EVENT_UART_WRITE,
datalen,
data,
)
self.event_sequence += 1
bisect.insort(self.event_queue, event)
self.awake_devices.add(addr)
elif type == self.OT_SIM_EVENT_UART_WRITE:
assert self._is_radio(addr)
core_addr = self._to_core_addr(addr)
if core_addr not in self.devices:
self.awake_devices.add(core_addr)
event = (
event_time,
self.event_sequence,
core_addr,
self.OT_SIM_EVENT_RADIO_SPINEL_WRITE,
datalen,
data,
)
self.event_sequence += 1
bisect.insort(self.event_queue, event)
self.awake_devices.add(addr)
elif type == self.OT_SIM_EVENT_POSTCMD:
assert self.current_time == self._pause_time
nodeid = struct.unpack('=B', data)[0]
if self.current_nodeid == nodeid:
self.current_nodeid = None
def _on_ack_seq_change(self, device: tuple, seq_no: int):
old_seq = self._node_ack_seq.pop(device, None)
if old_seq is not None:
self._nodes_by_ack_seq[old_seq].remove(device)
self._node_ack_seq[device] = seq_no
self._nodes_by_ack_seq.setdefault(seq_no, set()).add(device)
def _send_message(self, message, addr):
while True:
try:
sent = self.sock.sendto(message, addr)
except socket.error:
traceback.print_exc()
time.sleep(0)
else:
break
assert sent == len(message)
def process_next_event(self):
assert self.current_event is None
assert self._next_event_time() < self.END_OF_TIME
# process next event
event = self.event_queue.pop(0)
if len(event) == 5:
event_time, sequence, addr, type, datalen = event
dbg_print("Pop event: ", event_time, addr, type, datalen)
else:
event_time, sequence, addr, type, datalen, data = event
dbg_print(
"Pop event: ",
event_time,
addr,
type,
datalen,
binascii.hexlify(data),
)
self.current_event = event
assert event_time >= self.current_time
self.current_time = event_time
elapsed = event_time - self.devices[addr]['time']
self.devices[addr]['time'] = event_time
message = struct.pack('=QBH', elapsed, type, datalen)
if type == self.OT_SIM_EVENT_ALARM_FIRED:
self.devices[addr]['alarm'] = None
self._send_message(message, addr)
elif type == self.OT_SIM_EVENT_RADIO_RECEIVED:
message += data
self._send_message(message, addr)
elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE:
message += data
self._send_message(message, addr)
elif type == self.OT_SIM_EVENT_UART_WRITE:
message += data
self._send_message(message, addr)
def sync_devices(self):
self.current_time = self._pause_time
for addr in self.devices:
elapsed = self.current_time - self.devices[addr]['time']
if elapsed == 0:
continue
dbg_print('syncing', addr, elapsed)
self.devices[addr]['time'] = self.current_time
message = struct.pack('=QBH', elapsed, self.OT_SIM_EVENT_ALARM_FIRED, 0)
self._send_message(message, addr)
self.awake_devices.add(addr)
self.receive_events()
self.awake_devices.clear()
def now(self):
return self.current_time / 1000000
def go(self, duration, nodeid=None):
assert self.current_time == self._pause_time
duration = int(duration * 1000000)
dbg_print('running for %d us' % duration)
self._pause_time += duration
if nodeid:
if self.NCP_SIM:
self.current_nodeid = nodeid
self.awake_devices.add(self._core_addr_from(nodeid))
self.receive_events()
while self._next_event_time() <= self._pause_time:
self.process_next_event()
self.receive_events()
if duration > 0:
self.sync_devices()
dbg_print('current time %d us' % self.current_time)
if __name__ == '__main__':
simulator = VirtualTime()
while True:
simulator.go(0)