| # FST tests related classes |
| # Copyright (c) 2015, Qualcomm Atheros, Inc. |
| # |
| # This software may be distributed under the terms of the BSD license. |
| # See README for more details. |
| |
| import logging |
| import subprocess |
| import os |
| import signal |
| import time |
| import re |
| |
| import hostapd |
| import wpaspy |
| import utils |
| from wpasupplicant import WpaSupplicant |
| |
| import fst_test_common |
| |
| logger = logging.getLogger() |
| |
| def parse_fst_iface_event(ev): |
| """Parses FST iface event that comes as a string, e.g. |
| "<3>FST-EVENT-IFACE attached ifname=wlan9 group=fstg0" |
| Returns a dictionary with parsed "event_type", "ifname", and "group"; or |
| None if not an FST event or can't be parsed.""" |
| event = {} |
| if ev.find("FST-EVENT-IFACE") == -1: |
| return None |
| if ev.find("attached") != -1: |
| event['event_type'] = 'attached' |
| elif ev.find("detached") != -1: |
| event['event_type'] = 'detached' |
| else: |
| return None |
| f = re.search("ifname=(\S+)", ev) |
| if f is not None: |
| event['ifname'] = f.group(1) |
| f = re.search("group=(\S+)", ev) |
| if f is not None: |
| event['group'] = f.group(1) |
| return event |
| |
| def parse_fst_session_event(ev): |
| """Parses FST session event that comes as a string, e.g. |
| "<3>FST-EVENT-SESSION event_type=EVENT_FST_SESSION_STATE session_id=0 reason=REASON_STT" |
| Returns a dictionary with parsed "type", "id", and "reason"; or None if not |
| a FST event or can't be parsed""" |
| event = {} |
| if ev.find("FST-EVENT-SESSION") == -1: |
| return None |
| event['new_state'] = '' # The field always exists in the dictionary |
| f = re.search("event_type=(\S+)", ev) |
| if f is None: |
| return None |
| event['type'] = f.group(1) |
| f = re.search("session_id=(\d+)", ev) |
| if f is not None: |
| event['id'] = f.group(1) |
| f = re.search("old_state=(\S+)", ev) |
| if f is not None: |
| event['old_state'] = f.group(1) |
| f = re.search("new_state=(\S+)", ev) |
| if f is not None: |
| event['new_state'] = f.group(1) |
| f = re.search("reason=(\S+)", ev) |
| if f is not None: |
| event['reason'] = f.group(1) |
| return event |
| |
| def start_two_ap_sta_pairs(apdev, rsn=False): |
| """auxiliary function that creates two pairs of APs and STAs""" |
| ap1 = FstAP(apdev[0]['ifname'], 'fst_11a', 'a', |
| fst_test_common.fst_test_def_chan_a, |
| fst_test_common.fst_test_def_group, |
| fst_test_common.fst_test_def_prio_low, |
| fst_test_common.fst_test_def_llt, rsn=rsn) |
| ap1.start() |
| ap2 = FstAP(apdev[1]['ifname'], 'fst_11g', 'g', |
| fst_test_common.fst_test_def_chan_g, |
| fst_test_common.fst_test_def_group, |
| fst_test_common.fst_test_def_prio_high, |
| fst_test_common.fst_test_def_llt, rsn=rsn) |
| ap2.start() |
| |
| sta1 = FstSTA('wlan5', |
| fst_test_common.fst_test_def_group, |
| fst_test_common.fst_test_def_prio_low, |
| fst_test_common.fst_test_def_llt, rsn=rsn) |
| sta1.start() |
| sta2 = FstSTA('wlan6', |
| fst_test_common.fst_test_def_group, |
| fst_test_common.fst_test_def_prio_high, |
| fst_test_common.fst_test_def_llt, rsn=rsn) |
| sta2.start() |
| |
| return ap1, ap2, sta1, sta2 |
| |
| def stop_two_ap_sta_pairs(ap1, ap2, sta1, sta2): |
| sta1.stop() |
| sta2.stop() |
| ap1.stop() |
| ap2.stop() |
| |
| def connect_two_ap_sta_pairs(ap1, ap2, dev1, dev2, rsn=False): |
| """Connects a pair of stations, each one to a separate AP""" |
| dev1.scan(freq=fst_test_common.fst_test_def_freq_a) |
| dev2.scan(freq=fst_test_common.fst_test_def_freq_g) |
| |
| if rsn: |
| dev1.connect(ap1, psk="12345678", |
| scan_freq=fst_test_common.fst_test_def_freq_a) |
| dev2.connect(ap2, psk="12345678", |
| scan_freq=fst_test_common.fst_test_def_freq_g) |
| else: |
| dev1.connect(ap1, key_mgmt="NONE", |
| scan_freq=fst_test_common.fst_test_def_freq_a) |
| dev2.connect(ap2, key_mgmt="NONE", |
| scan_freq=fst_test_common.fst_test_def_freq_g) |
| |
| def disconnect_two_ap_sta_pairs(ap1, ap2, dev1, dev2): |
| dev1.disconnect() |
| dev2.disconnect() |
| |
| def external_sta_connect(sta, ap, **kwargs): |
| """Connects the external station to the given AP""" |
| if not isinstance(sta, WpaSupplicant): |
| raise Exception("Bad STA object") |
| if not isinstance(ap, FstAP): |
| raise Exception("Bad AP object to connect to") |
| hap = ap.get_instance() |
| sta.connect(ap.get_ssid(), **kwargs) |
| |
| def disconnect_external_sta(sta, ap, check_disconnect=True): |
| """Disconnects the external station from the AP""" |
| if not isinstance(sta, WpaSupplicant): |
| raise Exception("Bad STA object") |
| if not isinstance(ap, FstAP): |
| raise Exception("Bad AP object to connect to") |
| sta.request("DISCONNECT") |
| if check_disconnect: |
| hap = ap.get_instance() |
| ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10) |
| if ev is None: |
| raise Exception("No disconnection event received from %s" % ap.get_ssid()) |
| |
| # |
| # FstDevice class |
| # This is the parent class for the AP (FstAP) and STA (FstSTA) that implements |
| # FST functionality. |
| # |
| class FstDevice: |
| def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False): |
| self.iface = iface |
| self.fst_group = fst_group |
| self.fst_pri = fst_pri |
| self.fst_llt = fst_llt # None llt means no llt parameter will be set |
| self.instance = None # Hostapd/WpaSupplicant instance |
| self.peer_obj = None # Peer object, must be a FstDevice child object |
| self.new_peer_addr = None # Peer MAC address for new session iface |
| self.old_peer_addr = None # Peer MAC address for old session iface |
| self.role = 'initiator' # Role: initiator/responder |
| s = self.grequest("FST-MANAGER TEST_REQUEST IS_SUPPORTED") |
| if not s.startswith('OK'): |
| raise utils.HwsimSkip("FST not supported") |
| self.rsn = rsn |
| |
| def ifname(self): |
| return self.iface |
| |
| def get_instance(self): |
| """Gets the Hostapd/WpaSupplicant instance""" |
| raise Exception("Virtual get_instance() called!") |
| |
| def get_own_mac_address(self): |
| """Gets the device's own MAC address""" |
| raise Exception("Virtual get_own_mac_address() called!") |
| |
| def get_new_peer_addr(self): |
| return self.new_peer_addr |
| |
| def get_old_peer_addr(self): |
| return self.old_peer_addr |
| |
| def get_actual_peer_addr(self): |
| """Gets the peer address. A connected AP/station address is returned.""" |
| raise Exception("Virtual get_actual_peer_addr() called!") |
| |
| def grequest(self, req): |
| """Send request on the global control interface""" |
| raise Exception, "Virtual grequest() called!" |
| |
| def wait_gevent(self, events, timeout=None): |
| """Wait for a list of events on the global interface""" |
| raise Exception("Virtual wait_gevent() called!") |
| |
| def request(self, req): |
| """Issue a request to the control interface""" |
| h = self.get_instance() |
| return h.request(req) |
| |
| def wait_event(self, events, timeout=None): |
| """Wait for an event from the control interface""" |
| h = self.get_instance() |
| if timeout is not None: |
| return h.wait_event(events, timeout=timeout) |
| else: |
| return h.wait_event(events) |
| |
| def set_old_peer_addr(self, peer_addr=None): |
| """Sets the peer address""" |
| if peer_addr is not None: |
| self.old_peer_addr = peer_addr |
| else: |
| self.old_peer_addr = self.get_actual_peer_addr() |
| |
| def set_new_peer_addr(self, peer_addr=None): |
| """Sets the peer address""" |
| if peer_addr is not None: |
| self.new_peer_addr = peer_addr |
| else: |
| self.new_peer_addr = self.get_actual_peer_addr() |
| |
| def add_peer(self, obj, old_peer_addr=None, new_peer_addr=None): |
| """Add peer for FST session(s). 'obj' is a FstDevice subclass object. |
| The method must be called before add_session(). |
| If peer_addr is not specified, the address of the currently connected |
| station is used.""" |
| if not isinstance(obj, FstDevice): |
| raise Exception("Peer must be a FstDevice object") |
| self.peer_obj = obj |
| self.set_old_peer_addr(old_peer_addr) |
| self.set_new_peer_addr(new_peer_addr) |
| |
| def get_peer(self): |
| """Returns peer object""" |
| return self.peer_obj |
| |
| def set_fst_parameters(self, group_id=None, pri=None, llt=None): |
| """Change/set new FST parameters. Can be used to start FST sessions with |
| different FST parameters than defined in the configuration file.""" |
| if group_id is not None: |
| self.fst_group = group_id |
| if pri is not None: |
| self.fst_pri = pri |
| if llt is not None: |
| self.fst_llt = llt |
| |
| def get_local_mbies(self, ifname=None): |
| if_name = ifname if ifname is not None else self.iface |
| return self.grequest("FST-MANAGER TEST_REQUEST GET_LOCAL_MBIES " + if_name) |
| |
| def add_session(self): |
| """Adds an FST session. add_peer() must be called calling this |
| function""" |
| if self.peer_obj is None: |
| raise Exception("Peer wasn't added before starting session") |
| grp = ' ' + self.fst_group if self.fst_group != '' else '' |
| sid = self.grequest("FST-MANAGER SESSION_ADD" + grp) |
| sid = sid.strip() |
| if sid.startswith("FAIL"): |
| raise Exception("Cannot add FST session with groupid ==" + grp) |
| return sid |
| |
| def set_session_param(self, params): |
| request = "FST-MANAGER SESSION_SET" |
| if params is not None and params != '': |
| request = request + ' ' + params |
| return self.grequest(request) |
| |
| def get_session_params(self, sid): |
| request = "FST-MANAGER SESSION_GET " + sid |
| res = self.grequest(request) |
| if res.startswith("FAIL"): |
| return None |
| params = {} |
| for i in res.splitlines(): |
| p = i.split('=') |
| params[p[0]] = p[1] |
| return params |
| |
| def iface_peers(self, ifname): |
| grp = self.fst_group if self.fst_group != '' else '' |
| res = self.grequest("FST-MANAGER IFACE_PEERS " + grp + ' ' + ifname) |
| if res.startswith("FAIL"): |
| return None |
| return res.splitlines() |
| |
| def get_peer_mbies(self, ifname, peer_addr): |
| return self.grequest("FST-MANAGER GET_PEER_MBIES %s %s" % (ifname, peer_addr)) |
| |
| def list_ifaces(self): |
| grp = self.fst_group if self.fst_group != '' else '' |
| res = self.grequest("FST-MANAGER LIST_IFACES " + grp) |
| if res.startswith("FAIL"): |
| return None |
| ifaces = [] |
| for i in res.splitlines(): |
| p = i.split(':') |
| iface = {} |
| iface['name'] = p[0] |
| iface['priority'] = p[1] |
| iface['llt'] = p[2] |
| ifaces.append(iface) |
| return ifaces |
| |
| def list_groups(self): |
| res = self.grequest("FST-MANAGER LIST_GROUPS") |
| if res.startswith("FAIL"): |
| return None |
| return res.splitlines() |
| |
| def configure_session(self, sid, new_iface, old_iface = None): |
| """Calls session_set for a number of parameters some of which are stored |
| in "self" while others are passed to this function explicitly. If |
| old_iface is None, current iface is used; if old_iface is an empty |
| string.""" |
| oldiface = old_iface if old_iface is not None else self.iface |
| s = self.set_session_param(sid + ' old_ifname=' + oldiface) |
| if not s.startswith("OK"): |
| raise Exception("Cannot set FST session old_ifname: " + s) |
| if new_iface is not None: |
| s = self.set_session_param(sid + " new_ifname=" + new_iface) |
| if not s.startswith("OK"): |
| raise Exception("Cannot set FST session new_ifname:" + s) |
| if self.new_peer_addr is not None and self.new_peer_addr != '': |
| s = self.set_session_param(sid + " new_peer_addr=" + self.new_peer_addr) |
| if not s.startswith("OK"): |
| raise Exception("Cannot set FST session peer address:" + s + " (new)") |
| if self.old_peer_addr is not None and self.old_peer_addr != '': |
| s = self.set_session_param(sid + " old_peer_addr=" + self.old_peer_addr) |
| if not s.startswith("OK"): |
| raise Exception("Cannot set FST session peer address:" + s + " (old)") |
| if self.fst_llt is not None and self.fst_llt != '': |
| s = self.set_session_param(sid + " llt=" + self.fst_llt) |
| if not s.startswith("OK"): |
| raise Exception("Cannot set FST session llt:" + s) |
| |
| def send_iface_attach_request(self, ifname, group, llt, priority): |
| request = "FST-ATTACH " + ifname + ' ' + group |
| if llt is not None: |
| request += " llt=" + llt |
| if priority is not None: |
| request += " priority=" + priority |
| res = self.grequest(request) |
| if not res.startswith("OK"): |
| raise Exception("Cannot attach FST iface: " + res) |
| |
| def send_iface_detach_request(self, ifname): |
| res = self.grequest("FST-DETACH " + ifname) |
| if not res.startswith("OK"): |
| raise Exception("Cannot detach FST iface: " + res) |
| |
| def send_session_setup_request(self, sid): |
| s = self.grequest("FST-MANAGER SESSION_INITIATE " + sid) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send setup request: %s" % s) |
| return s |
| |
| def send_session_setup_response(self, sid, response): |
| request = "FST-MANAGER SESSION_RESPOND " + sid + " " + response |
| s = self.grequest(request) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send setup response: %s" % s) |
| return s |
| |
| def send_test_session_setup_request(self, fsts_id, |
| additional_parameter = None): |
| request = "FST-MANAGER TEST_REQUEST SEND_SETUP_REQUEST " + fsts_id |
| if additional_parameter is not None: |
| request += " " + additional_parameter |
| s = self.grequest(request) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send FST setup request: %s" % s) |
| return s |
| |
| def send_test_session_setup_response(self, fsts_id, |
| response, additional_parameter = None): |
| request = "FST-MANAGER TEST_REQUEST SEND_SETUP_RESPONSE " + fsts_id + " " + response |
| if additional_parameter is not None: |
| request += " " + additional_parameter |
| s = self.grequest(request) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send FST setup response: %s" % s) |
| return s |
| |
| def send_test_ack_request(self, fsts_id): |
| s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_REQUEST " + fsts_id) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send FST ack request: %s" % s) |
| return s |
| |
| def send_test_ack_response(self, fsts_id): |
| s = self.grequest("FST-MANAGER TEST_REQUEST SEND_ACK_RESPONSE " + fsts_id) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send FST ack response: %s" % s) |
| return s |
| |
| def send_test_tear_down(self, fsts_id): |
| s = self.grequest("FST-MANAGER TEST_REQUEST SEND_TEAR_DOWN " + fsts_id) |
| if not s.startswith('OK'): |
| raise Exception("Cannot send FST tear down: %s" % s) |
| return s |
| |
| def get_fsts_id_by_sid(self, sid): |
| s = self.grequest("FST-MANAGER TEST_REQUEST GET_FSTS_ID " + sid) |
| if s == ' ' or s.startswith('FAIL'): |
| raise Exception("Cannot get fsts_id for sid == %s" % sid) |
| return int(s) |
| |
| def wait_for_iface_event(self, timeout): |
| while True: |
| ev = self.wait_gevent(["FST-EVENT-IFACE"], timeout) |
| if ev is None: |
| raise Exception("No FST-EVENT-IFACE received") |
| event = parse_fst_iface_event(ev) |
| if event is None: |
| # We can't parse so it's not our event, wait for next one |
| continue |
| return event |
| |
| def wait_for_session_event(self, timeout, events_to_ignore=[], |
| events_to_count=[]): |
| while True: |
| ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout) |
| if ev is None: |
| raise Exception("No FST-EVENT-SESSION received") |
| event = parse_fst_session_event(ev) |
| if event is None: |
| # We can't parse so it's not our event, wait for next one |
| continue |
| if len(events_to_ignore) > 0: |
| if event['type'] in events_to_ignore: |
| continue |
| elif len(events_to_count) > 0: |
| if not event['type'] in events_to_count: |
| continue |
| return event |
| |
| def initiate_session(self, sid, response="accept"): |
| """Initiates FST session with given session id 'sid'. |
| 'response' is the session respond answer: "accept", "reject", or a |
| special "timeout" value to skip the response in order to test session |
| timeouts. |
| Returns: "OK" - session has been initiated, otherwise the reason for the |
| reset: REASON_REJECT, REASON_STT.""" |
| strsid = ' ' + sid if sid != '' else '' |
| s = self.grequest("FST-MANAGER SESSION_INITIATE"+ strsid) |
| if not s.startswith('OK'): |
| raise Exception("Cannot initiate fst session: %s" % s) |
| ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5) |
| if ev is None: |
| raise Exception("No FST-EVENT-SESSION received") |
| # We got FST event |
| event = parse_fst_session_event(ev) |
| if event == None: |
| raise Exception("Unrecognized FST event: " % ev) |
| if event['type'] != 'EVENT_FST_SETUP': |
| raise Exception("Expected FST_SETUP event, got: " + event['type']) |
| ev = self.peer_obj.wait_gevent(["FST-EVENT-SESSION"], timeout=5) |
| if ev is None: |
| raise Exception("No FST-EVENT-SESSION received") |
| event = parse_fst_session_event(ev) |
| if event == None: |
| raise Exception("Unrecognized FST event: " % ev) |
| if event['type'] != 'EVENT_FST_SESSION_STATE': |
| raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type']) |
| if event['new_state'] != "SETUP_COMPLETION": |
| raise Exception("Expected new state SETUP_COMPLETION, got: " + event['new_state']) |
| if response == '': |
| return 'OK' |
| if response != "timeout": |
| s = self.peer_obj.grequest("FST-MANAGER SESSION_RESPOND "+ event['id'] + " " + response) # Or reject |
| if not s.startswith('OK'): |
| raise Exception("Error session_respond: %s" % s) |
| # Wait for EVENT_FST_SESSION_STATE events. We should get at least 2 |
| # events. The 1st event will be EVENT_FST_SESSION_STATE |
| # old_state=INITIAL new_state=SETUP_COMPLETED. The 2nd event will be |
| # either EVENT_FST_ESTABLISHED with the session id or |
| # EVENT_FST_SESSION_STATE with new_state=INITIAL if the session was |
| # reset, the reason field will tell why. |
| result = '' |
| while result == '': |
| ev = self.wait_gevent(["FST-EVENT-SESSION"], timeout=5) |
| if ev is None: |
| break # No session event received |
| event = parse_fst_session_event(ev) |
| if event == None: |
| # We can't parse so it's not our event, wait for next one |
| continue |
| if event['type'] == 'EVENT_FST_ESTABLISHED': |
| result = "OK" |
| break |
| elif event['type'] == "EVENT_FST_SESSION_STATE": |
| if event['new_state'] == "INITIAL": |
| # Session was reset, the only reason to get back to initial |
| # state. |
| result = event['reason'] |
| break |
| if result == '': |
| raise Exception("No event for session respond") |
| return result |
| |
| def transfer_session(self, sid): |
| """Transfers the session. 'sid' is the session id. 'hsta' is the |
| station-responder object. |
| Returns: REASON_SWITCH - the session has been transferred successfully |
| or a REASON_... reported by the reset event.""" |
| request = "FST-MANAGER SESSION_TRANSFER" |
| if sid != '': |
| request += ' ' + sid |
| s = self.grequest(request) |
| if not s.startswith('OK'): |
| raise Exception("Cannot transfer fst session: %s" % s) |
| result = '' |
| while result == '': |
| ev = self.peer_obj.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5) |
| if ev is None: |
| raise Exception("Missing session transfer event") |
| # We got FST event. We expect TRANSITION_CONFIRMED state and then |
| # INITIAL (reset) with the reason (e.g. "REASON_SWITCH"). |
| # Right now we'll be waiting for the reset event and record the |
| # reason. |
| event = parse_fst_session_event(ev) |
| if event == None: |
| raise Exception("Unrecognized FST event: " % ev) |
| if event['new_state'] == 'INITIAL': |
| result = event['reason'] |
| return result |
| |
| def wait_for_tear_down(self): |
| ev = self.wait_gevent([ "FST-EVENT-SESSION" ], timeout=5) |
| if ev is None: |
| raise Exception("No FST-EVENT-SESSION received") |
| # We got FST event |
| event = parse_fst_session_event(ev) |
| if event == None: |
| raise Exception("Unrecognized FST event: " % ev) |
| if event['type'] != 'EVENT_FST_SESSION_STATE': |
| raise Exception("Expected EVENT_FST_SESSION_STATE event, got: " + event['type']) |
| if event['new_state'] != "INITIAL": |
| raise Exception("Expected new state INITIAL, got: " + event['new_state']) |
| if event['reason'] != 'REASON_TEARDOWN': |
| raise Exception("Expected reason REASON_TEARDOWN, got: " + event['reason']) |
| |
| def teardown_session(self, sid): |
| """Tears down FST session with a given session id ('sid')""" |
| strsid = ' ' + sid if sid != '' else '' |
| s = self.grequest("FST-MANAGER SESSION_TEARDOWN" + strsid) |
| if not s.startswith('OK'): |
| raise Exception("Cannot tear down fst session: %s" % s) |
| self.peer_obj.wait_for_tear_down() |
| |
| |
| def remove_session(self, sid, wait_for_tear_down=True): |
| """Removes FST session with a given session id ('sid')""" |
| strsid = ' ' + sid if sid != '' else '' |
| s = self.grequest("FST-MANAGER SESSION_REMOVE" + strsid) |
| if not s.startswith('OK'): |
| raise Exception("Cannot remove fst session: %s" % s) |
| if wait_for_tear_down == True: |
| self.peer_obj.wait_for_tear_down() |
| |
| def remove_all_sessions(self): |
| """Removes FST session with a given session id ('sid')""" |
| grp = ' ' + self.fst_group if self.fst_group != '' else '' |
| s = self.grequest("FST-MANAGER LIST_SESSIONS" + grp) |
| if not s.startswith('FAIL'): |
| for sid in s.splitlines(): |
| sid = sid.strip() |
| if len(sid) != 0: |
| self.remove_session(sid, wait_for_tear_down=False) |
| |
| |
| # |
| # FstAP class |
| # |
| class FstAP (FstDevice): |
| def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri, |
| fst_llt=None, rsn=False): |
| """If fst_group is empty, then FST parameters will not be set |
| If fst_llt is empty, the parameter will not be set and the default value |
| is expected to be configured.""" |
| self.ssid = ssid |
| self.mode = mode |
| self.chan = chan |
| self.reg_ctrl = fst_test_common.HapdRegCtrl() |
| self.reg_ctrl.add_ap(iface, self.chan) |
| self.global_instance = hostapd.HostapdGlobal() |
| FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn) |
| |
| def start(self, return_early=False): |
| """Starts AP the "standard" way as it was intended by hostapd tests. |
| This will work only when FST supports fully dynamically loading |
| parameters in hostapd.""" |
| params = {} |
| params['ssid'] = self.ssid |
| params['hw_mode'] = self.mode |
| params['channel'] = self.chan |
| params['country_code'] = 'US' |
| if self.rsn: |
| params['wpa'] = '2' |
| params['wpa_key_mgmt'] = 'WPA-PSK' |
| params['rsn_pairwise'] = 'CCMP' |
| params['wpa_passphrase'] = '12345678' |
| self.hapd=hostapd.add_ap(self.iface, params) |
| if not self.hapd.ping(): |
| raise Exception("Could not ping FST hostapd") |
| self.reg_ctrl.start() |
| self.get_global_instance() |
| if return_early: |
| return self.hapd |
| if len(self.fst_group) != 0: |
| self.send_iface_attach_request(self.iface, self.fst_group, |
| self.fst_llt, self.fst_pri) |
| return self.hapd |
| |
| def stop(self): |
| """Removes the AP, To be used when dynamic fst APs are implemented in |
| hostapd.""" |
| if len(self.fst_group) != 0: |
| self.remove_all_sessions() |
| self.send_iface_detach_request(self.iface) |
| self.reg_ctrl.stop() |
| del self.global_instance |
| self.global_instance = None |
| |
| def get_instance(self): |
| """Return the Hostapd/WpaSupplicant instance""" |
| if self.instance is None: |
| self.instance = hostapd.Hostapd(self.iface) |
| return self.instance |
| |
| def get_global_instance(self): |
| return self.global_instance |
| |
| def get_own_mac_address(self): |
| """Gets the device's own MAC address""" |
| h = self.get_instance() |
| status = h.get_status() |
| return status['bssid[0]'] |
| |
| def get_actual_peer_addr(self): |
| """Gets the peer address. A connected station address is returned.""" |
| # Use the device instance, the global control interface doesn't have |
| # station address |
| h = self.get_instance() |
| sta = h.get_sta(None) |
| if sta is None or 'addr' not in sta: |
| # Maybe station is not connected? |
| addr = None |
| else: |
| addr=sta['addr'] |
| return addr |
| |
| def grequest(self, req): |
| """Send request on the global control interface""" |
| logger.debug("FstAP::grequest: " + req) |
| h = self.get_global_instance() |
| return h.request(req) |
| |
| def wait_gevent(self, events, timeout=None): |
| """Wait for a list of events on the global interface""" |
| h = self.get_global_instance() |
| if timeout is not None: |
| return h.wait_event(events, timeout=timeout) |
| else: |
| return h.wait_event(events) |
| |
| def get_ssid(self): |
| return self.ssid |
| |
| # |
| # FstSTA class |
| # |
| class FstSTA (FstDevice): |
| def __init__(self, iface, fst_group, fst_pri, fst_llt=None, rsn=False): |
| """If fst_group is empty, then FST parameters will not be set |
| If fst_llt is empty, the parameter will not be set and the default value |
| is expected to be configured.""" |
| FstDevice.__init__(self, iface, fst_group, fst_pri, fst_llt, rsn) |
| self.connected = None # FstAP object the station is connected to |
| |
| def start(self): |
| """Current implementation involves running another instance of |
| wpa_supplicant with fixed FST STAs configurations. When any type of |
| dynamic STA loading is implemented, rewrite the function similarly to |
| FstAP.""" |
| h = self.get_instance() |
| h.interface_add(self.iface, drv_params="force_connect_cmd=1") |
| if not h.global_ping(): |
| raise Exception("Could not ping FST wpa_supplicant") |
| if len(self.fst_group) != 0: |
| self.send_iface_attach_request(self.iface, self.fst_group, |
| self.fst_llt, self.fst_pri) |
| return None |
| |
| def stop(self): |
| """Removes the STA. In a static (temporary) implementation does nothing, |
| the STA will be removed when the fst wpa_supplicant process is killed by |
| fstap.cleanup().""" |
| h = self.get_instance() |
| if len(self.fst_group) != 0: |
| self.remove_all_sessions() |
| self.send_iface_detach_request(self.iface) |
| h.interface_remove(self.iface) |
| h.close_ctrl() |
| del h |
| self.instance = None |
| |
| def get_instance(self): |
| """Return the Hostapd/WpaSupplicant instance""" |
| if self.instance is None: |
| self.instance = WpaSupplicant(global_iface='/tmp/wpas-wlan5') |
| return self.instance |
| |
| def get_own_mac_address(self): |
| """Gets the device's own MAC address""" |
| h = self.get_instance() |
| status = h.get_status() |
| return status['address'] |
| |
| def get_actual_peer_addr(self): |
| """Gets the peer address. A connected station address is returned""" |
| h = self.get_instance() |
| status = h.get_status() |
| return status['bssid'] |
| |
| def grequest(self, req): |
| """Send request on the global control interface""" |
| logger.debug("FstSTA::grequest: " + req) |
| h = self.get_instance() |
| return h.global_request(req) |
| |
| def wait_gevent(self, events, timeout=None): |
| """Wait for a list of events on the global interface""" |
| h = self.get_instance() |
| if timeout is not None: |
| return h.wait_global_event(events, timeout=timeout) |
| else: |
| return h.wait_global_event(events) |
| |
| def scan(self, freq=None, no_wait=False, only_new=False): |
| """Issue Scan with given parameters. Returns the BSS dictionary for the |
| AP found (the 1st BSS found. TODO: What if the AP required is not the |
| 1st in list?) or None if no BSS found. None call be also a result of |
| no_wait=True. Note, request("SCAN_RESULTS") can be used to get all the |
| results at once.""" |
| h = self.get_instance() |
| h.scan(None, freq, no_wait, only_new) |
| r = h.get_bss('0') |
| return r |
| |
| def connect(self, ap, **kwargs): |
| """Connects to the given AP""" |
| if not isinstance(ap, FstAP): |
| raise Exception("Bad AP object to connect to") |
| h = self.get_instance() |
| hap = ap.get_instance() |
| h.connect(ap.get_ssid(), **kwargs) |
| self.connected = ap |
| |
| def connect_to_external_ap(self, ap, ssid, check_connection=True, **kwargs): |
| """Connects to the given external AP""" |
| if not isinstance(ap, hostapd.Hostapd): |
| raise Exception("Bad AP object to connect to") |
| h = self.get_instance() |
| h.connect(ssid, **kwargs) |
| self.connected = ap |
| if check_connection: |
| ev = ap.wait_event([ "AP-STA-CONNECTED" ], timeout=10) |
| if ev is None: |
| self.connected = None |
| raise Exception("No connection event received from %s" % ssid) |
| |
| def disconnect(self, check_disconnect=True): |
| """Disconnects from the AP the station is currently connected to""" |
| if self.connected is not None: |
| h = self.get_instance() |
| h.request("DISCONNECT") |
| if check_disconnect: |
| hap = self.connected.get_instance() |
| ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10) |
| if ev is None: |
| raise Exception("No disconnection event received from %s" % self.connected.get_ssid()) |
| self.connected = None |
| |
| |
| def disconnect_from_external_ap(self, check_disconnect=True): |
| """Disconnects from the external AP the station is currently connected |
| to""" |
| if self.connected is not None: |
| h = self.get_instance() |
| h.request("DISCONNECT") |
| if check_disconnect: |
| hap = self.connected |
| ev = hap.wait_event([ "AP-STA-DISCONNECTED" ], timeout=10) |
| if ev is None: |
| raise Exception("No disconnection event received from AP") |
| self.connected = None |