tests: More WPS ER HTTP protocol testing
Signed-off-by: Jouni Malinen <j@w1.fi>
diff --git a/tests/hwsim/test_ap_wps.py b/tests/hwsim/test_ap_wps.py
index b7f6948..16441c3 100644
--- a/tests/hwsim/test_ap_wps.py
+++ b/tests/hwsim/test_ap_wps.py
@@ -3002,21 +3002,8 @@
wps_event_url = None
-class WPSAPHTTPServer(SocketServer.StreamRequestHandler):
- def handle(self):
- data = self.rfile.readline().strip()
- logger.info("HTTP server received: " + data)
- while True:
- hdr = self.rfile.readline().strip()
- if len(hdr) == 0:
- break
- logger.info("HTTP header: " + hdr)
- if "CALLBACK:" in hdr:
- global wps_event_url
- wps_event_url = hdr.split(' ')[1].strip('<>')
-
- if "GET /foo.xml" in data:
- payload = '''<?xml version="1.0"?>
+def gen_upnp_info(eventSubURL='wps_event'):
+ payload = '''<?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0">
<specVersion>
<major>1</major>
@@ -3036,22 +3023,24 @@
<serviceId>urn:wifialliance-org:serviceId:WFAWLANConfig1</serviceId>
<SCPDURL>wps_scpd.xml</SCPDURL>
<controlURL>wps_control</controlURL>
-<eventSubURL>wps_event</eventSubURL>
-</service>
+'''
+ if eventSubURL:
+ payload += '<eventSubURL>' + eventSubURL + '</eventSubURL>'
+ payload += '''</service>
</serviceList>
</device>
</root>
'''
- hdr = 'HTTP/1.1 200 OK\r\n' + \
- 'Content-Type: text/xml; charset="utf-8"\r\n' + \
- 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
- 'Connection: close\r\n' + \
- 'Content-Length: ' + str(len(payload)) + '\r\n' + \
- 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
- self.wfile.write(hdr + payload)
+ hdr = 'HTTP/1.1 200 OK\r\n' + \
+ 'Content-Type: text/xml; charset="utf-8"\r\n' + \
+ 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
+ 'Connection: close\r\n' + \
+ 'Content-Length: ' + str(len(payload)) + '\r\n' + \
+ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
+ return hdr + payload
- if "POST /wps_control" in data:
- payload = '''<?xml version="1.0"?>
+def gen_wps_control():
+ payload = '''<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:GetDeviceInfoResponse xmlns:u="urn:schemas-wifialliance-org:service:WFAWLANConfig:1">
@@ -3068,25 +3057,89 @@
</s:Body>
</s:Envelope>
'''
- hdr = 'HTTP/1.1 200 OK\r\n' + \
- 'Content-Type: text/xml; charset="utf-8"\r\n' + \
- 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
- 'Connection: close\r\n' + \
- 'Content-Length: ' + str(len(payload)) + '\r\n' + \
- 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
- self.wfile.write(hdr + payload)
+ hdr = 'HTTP/1.1 200 OK\r\n' + \
+ 'Content-Type: text/xml; charset="utf-8"\r\n' + \
+ 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
+ 'Connection: close\r\n' + \
+ 'Content-Length: ' + str(len(payload)) + '\r\n' + \
+ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
+ return hdr + payload
+
+def gen_wps_event():
+ payload = ""
+ hdr = 'HTTP/1.1 200 OK\r\n' + \
+ 'Content-Type: text/xml; charset="utf-8"\r\n' + \
+ 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
+ 'Connection: close\r\n' + \
+ 'Content-Length: ' + str(len(payload)) + '\r\n' + \
+ 'SID: uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a\r\n' + \
+ 'Timeout: Second-1801\r\n' + \
+ 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
+ return hdr + payload
+
+class WPSAPHTTPServer(SocketServer.StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.readline().strip()
+ logger.info("HTTP server received: " + data)
+ while True:
+ hdr = self.rfile.readline().strip()
+ if len(hdr) == 0:
+ break
+ logger.info("HTTP header: " + hdr)
+ if "CALLBACK:" in hdr:
+ global wps_event_url
+ wps_event_url = hdr.split(' ')[1].strip('<>')
+
+ if "GET /foo.xml" in data:
+ self.wfile.write(gen_upnp_info())
+
+ if "POST /wps_control" in data:
+ self.wfile.write(gen_wps_control())
if "SUBSCRIBE /wps_event" in data:
- payload = ""
- hdr = 'HTTP/1.1 200 OK\r\n' + \
- 'Content-Type: text/xml; charset="utf-8"\r\n' + \
- 'Server: Unspecified, UPnP/1.0, Unspecified\r\n' + \
- 'Connection: close\r\n' + \
- 'Content-Length: ' + str(len(payload)) + '\r\n' + \
- 'SID: uuid:7eb3342a-8a5f-47fe-a585-0785bfec6d8a\r\n' + \
- 'Timeout: Second-1801\r\n' + \
- 'Date: Sat, 15 Aug 2015 18:55:08 GMT\r\n\r\n'
- self.wfile.write(hdr + payload)
+ self.wfile.write(gen_wps_event())
+
+class MyTCPServer(SocketServer.TCPServer):
+ def __init__(self, addr, handler):
+ self.allow_reuse_address = True
+ SocketServer.TCPServer.__init__(self, addr, handler)
+
+def wps_er_start(dev, http_server):
+ socket.setdefaulttimeout(1)
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(("239.255.255.250", 1900))
+ dev.request("WPS_ER_START ifname=lo")
+ (msg,addr) = sock.recvfrom(1000)
+ logger.debug("Received SSDP message from %s: %s" % (str(addr), msg))
+ if "M-SEARCH" not in msg:
+ raise Exception("Not an M-SEARCH")
+
+ # Add an AP with a valid URL and server listing to it
+ server = MyTCPServer(("127.0.0.1", 12345), http_server)
+ sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr)
+ server.timeout = 1
+ return server,sock
+
+def wps_er_stop(dev, sock, server, on_alloc_fail=False):
+ sock.close()
+ server.server_close()
+
+ if on_alloc_fail:
+ done = False
+ for i in range(50):
+ res = dev.request("GET_ALLOC_FAIL")
+ if res.startswith("0:"):
+ done = True
+ break
+ time.sleep(0.1)
+ if not done:
+ raise Exception("No allocation failure reported")
+ else:
+ ev = dev.wait_event(["WPS-ER-AP-REMOVE"], timeout=5)
+ if ev is None:
+ raise Exception("No WPS-ER-AP-REMOVE event on max-age timeout")
+ dev.request("WPS_ER_STOP")
def test_ap_wps_er_http_proto(dev, apdev):
"""WPS ER HTTP protocol testing"""
@@ -3097,20 +3150,7 @@
def _test_ap_wps_er_http_proto(dev, apdev):
uuid = '27ea801a-9e5c-4e73-bd82-f89cbcd10d7e'
- socket.setdefaulttimeout(1)
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(("239.255.255.250", 1900))
- dev[0].request("WPS_ER_START ifname=lo")
- (msg,addr) = sock.recvfrom(1000)
- logger.debug("Received SSDP message from %s: %s" % (str(addr), msg))
- if "M-SEARCH" not in msg:
- raise Exception("Not an M-SEARCH")
-
- # Add an AP with a valid URL and server listing to it
- server = SocketServer.TCPServer(("127.0.0.1", 12345), WPSAPHTTPServer)
- sock.sendto("HTTP/1.1 200 OK\r\nST: urn:schemas-wifialliance-org:device:WFADevice:1\r\nlocation:http://127.0.0.1:12345/foo.xml\r\ncache-control:max-age=1\r\n\r\n", addr)
- server.timeout = 1
+ server,sock = wps_er_start(dev[0], WPSAPHTTPServer)
global wps_event_url
wps_event_url = None
server.handle_request()
@@ -3197,3 +3237,81 @@
conn = httplib.HTTPConnection(url.netloc)
conn.request("FOOBAR", '/foobar/123456789/123', payload, headers)
time.sleep(0.5)
+
+class WPSAPHTTPServer2(SocketServer.StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.readline().strip()
+ logger.info("HTTP server received: " + data)
+ while True:
+ hdr = self.rfile.readline().strip()
+ if len(hdr) == 0:
+ break
+ logger.info("HTTP header: " + hdr)
+
+ if "GET /foo.xml" in data:
+ self.wfile.write(gen_upnp_info(eventSubURL=None))
+
+ if "POST /wps_control" in data:
+ self.wfile.write(gen_wps_control())
+
+def test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev):
+ """WPS ER HTTP protocol testing - no eventSubURL"""
+ try:
+ _test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev)
+ finally:
+ dev[0].request("WPS_ER_STOP")
+
+def _test_ap_wps_er_http_proto_no_event_sub_url(dev, apdev):
+ server,sock = wps_er_start(dev[0], WPSAPHTTPServer2)
+ server.handle_request()
+ server.handle_request()
+ wps_er_stop(dev[0], sock, server)
+
+class WPSAPHTTPServer3(SocketServer.StreamRequestHandler):
+ def handle(self):
+ data = self.rfile.readline().strip()
+ logger.info("HTTP server received: " + data)
+ while True:
+ hdr = self.rfile.readline().strip()
+ if len(hdr) == 0:
+ break
+ logger.info("HTTP header: " + hdr)
+
+ if "GET /foo.xml" in data:
+ self.wfile.write(gen_upnp_info(eventSubURL='http://example.com/wps_event'))
+
+ if "POST /wps_control" in data:
+ self.wfile.write(gen_wps_control())
+
+def test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev):
+ """WPS ER HTTP protocol testing - DNS name in eventSubURL"""
+ try:
+ _test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev)
+ finally:
+ dev[0].request("WPS_ER_STOP")
+
+def _test_ap_wps_er_http_proto_event_sub_url_dns(dev, apdev):
+ server,sock = wps_er_start(dev[0], WPSAPHTTPServer3)
+ server.handle_request()
+ server.handle_request()
+ wps_er_stop(dev[0], sock, server)
+
+def test_ap_wps_er_http_proto_subscribe_oom(dev, apdev):
+ """WPS ER HTTP protocol testing - subscribe OOM"""
+ try:
+ _test_ap_wps_er_http_proto_subscribe_oom(dev, apdev)
+ finally:
+ dev[0].request("WPS_ER_STOP")
+
+def _test_ap_wps_er_http_proto_subscribe_oom(dev, apdev):
+ tests = [ (1, "http_client_url_parse"),
+ (1, "wpabuf_alloc;wps_er_subscribe"),
+ (1, "http_client_addr"),
+ (1, "eloop_register_sock;http_client_addr"),
+ (1, "eloop_register_timeout;http_client_addr") ]
+ for count,func in tests:
+ with alloc_fail(dev[0], count, func):
+ server,sock = wps_er_start(dev[0], WPSAPHTTPServer)
+ server.handle_request()
+ server.handle_request()
+ wps_er_stop(dev[0], sock, server, on_alloc_fail=True)