| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| #*************************************************************************** |
| # _ _ ____ _ |
| # Project ___| | | | _ \| | |
| # / __| | | | |_) | | |
| # | (__| |_| | _ <| |___ |
| # \___|\___/|_| \_\_____| |
| # |
| # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. |
| # |
| # This software is licensed as described in the file COPYING, which |
| # you should have received as part of this distribution. The terms |
| # are also available at https://curl.se/docs/copyright.html. |
| # |
| # You may opt to use, copy, modify, merge, publish, distribute and/or sell |
| # copies of the Software, and permit persons to whom the Software is |
| # furnished to do so, under the terms of the COPYING file. |
| # |
| # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY |
| # KIND, either express or implied. |
| # |
| # SPDX-License-Identifier: curl |
| # |
| ########################################################################### |
| # |
| import logging |
| import re |
| import pytest |
| |
| from testenv import Env |
| from testenv import CurlClient |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| class TestBasic: |
| |
| # simple http: GET |
| def test_01_01_http_get(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'http://{env.domain1}:{env.http_port}/data.json' |
| r = curl.http_get(url=url) |
| r.check_response(http_status=200) |
| assert r.json['server'] == env.domain1 |
| |
| # simple https: GET, any http version |
| @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL") |
| def test_01_02_https_get(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'https://{env.domain1}:{env.https_port}/data.json' |
| r = curl.http_get(url=url) |
| r.check_response(http_status=200) |
| assert r.json['server'] == env.domain1 |
| |
| # simple https: GET, h2 wanted and got |
| @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL") |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_03_h2_get(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'https://{env.domain1}:{env.https_port}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http2']) |
| r.check_response(http_status=200, protocol='HTTP/2') |
| assert r.json['server'] == env.domain1 |
| |
| # simple https: GET, h2 unsupported, fallback to h1 |
| @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL") |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_04_h2_unsupported(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'https://{env.domain2}:{env.https_port}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http2']) |
| r.check_response(http_status=200, protocol='HTTP/1.1') |
| assert r.json['server'] == env.domain2 |
| |
| # simple h3: GET, want h3 and get it |
| @pytest.mark.skipif(condition=not Env.have_h3(), reason="h3 not supported") |
| def test_01_05_h3_get(self, env: Env, httpd, nghttpx): |
| curl = CurlClient(env=env) |
| url = f'https://{env.domain1}:{env.h3_port}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http3-only']) |
| r.check_response(http_status=200, protocol='HTTP/3') |
| assert r.json['server'] == env.domain1 |
| |
| # simple download, check connect/handshake timings |
| @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL") |
| @pytest.mark.parametrize("proto", Env.http_protos()) |
| def test_01_06_timings(self, env: Env, httpd, nghttpx, proto): |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}/data.json' |
| r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True) |
| r.check_stats(http_status=200, count=1, |
| remote_port=env.port_for(alpn_proto=proto), |
| remote_ip='127.0.0.1') |
| # there are cases where time_connect is reported as 0 |
| assert r.stats[0]['time_connect'] >= 0, f'{r.stats[0]}' |
| assert r.stats[0]['time_appconnect'] > 0, f'{r.stats[0]}' |
| # ports are reported correctly |
| assert r.stats[0]['remote_port'] == env.port_for(proto), f'{r.dump_logs()}' |
| assert r.stats[0]['local_port'] > 0, f'{r.dump_logs()}' |
| |
| # simple https: HEAD |
| @pytest.mark.parametrize("proto", Env.http_protos()) |
| @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL") |
| def test_01_07_head(self, env: Env, httpd, nghttpx, proto): |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}/data.json' |
| r = curl.http_download(urls=[url], with_stats=True, with_headers=True, |
| extra_args=['-I']) |
| r.check_stats(http_status=200, count=1, exitcode=0, |
| remote_port=env.port_for(alpn_proto=proto), |
| remote_ip='127.0.0.1') |
| # got the Content-Length: header, but did not download anything |
| assert r.responses[0]['header']['content-length'] == '30', f'{r.responses[0]}' |
| assert r.stats[0]['size_download'] == 0, f'{r.stats[0]}' |
| |
| # http: GET for HTTP/2, see Upgrade:, 101 switch |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_08_h2_upgrade(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'http://{env.domain1}:{env.http_port}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http2']) |
| r.check_exit_code(0) |
| assert len(r.responses) == 2, f'{r.responses}' |
| assert r.responses[0]['status'] == 101, f'{r.responses[0]}' |
| assert r.responses[1]['status'] == 200, f'{r.responses[1]}' |
| assert r.responses[1]['protocol'] == 'HTTP/2', f'{r.responses[1]}' |
| assert r.json['server'] == env.domain1 |
| |
| # http: GET for HTTP/2 with prior knowledge |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_09_h2_prior_knowledge(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'http://{env.domain1}:{env.http_port}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http2-prior-knowledge']) |
| r.check_exit_code(0) |
| assert len(r.responses) == 1, f'{r.responses}' |
| assert r.response['status'] == 200, f'{r.responsw}' |
| assert r.response['protocol'] == 'HTTP/2', f'{r.response}' |
| assert r.json['server'] == env.domain1 |
| |
| # http: strip TE header in HTTP/2 requests |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_10_te_strip(self, env: Env, httpd): |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, "h2")}/data.json' |
| r = curl.http_get(url=url, extra_args=['--http2', '-H', 'TE: gzip']) |
| r.check_exit_code(0) |
| assert len(r.responses) == 1, f'{r.responses}' |
| assert r.responses[0]['status'] == 200, f'{r.responses[1]}' |
| assert r.responses[0]['protocol'] == 'HTTP/2', f'{r.responses[1]}' |
| |
| # http: large response headers |
| # send 48KB+ sized response headers to check we handle that correctly |
| # larger than 64KB headers expose a bug in Apache HTTP/2 that is not |
| # RSTing the stream correctly when its internal limits are exceeded. |
| @pytest.mark.parametrize("proto", Env.http_protos()) |
| def test_01_11_large_resp_headers(self, env: Env, httpd, proto): |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}' \ |
| f'/curltest/tweak?x-hd={48 * 1024}' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[]) |
| r.check_exit_code(0) |
| assert len(r.responses) == 1, f'{r.responses}' |
| assert r.responses[0]['status'] == 200, f'{r.responses}' |
| |
| # http: response headers larger than what curl buffers for |
| @pytest.mark.skipif(condition=not Env.httpd_is_at_least('2.4.64'), |
| reason='httpd must be at least 2.4.64') |
| @pytest.mark.parametrize("proto", Env.http_h1_h2_protos()) |
| def test_01_12_xlarge_resp_headers(self, env: Env, httpd, configures_httpd, proto): |
| httpd.set_extra_config('base', [ |
| f'H2MaxHeaderBlockLen {130 * 1024}', |
| ]) |
| httpd.reload_if_config_changed() |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}' \ |
| f'/curltest/tweak?x-hd={128 * 1024}' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[]) |
| r.check_exit_code(0) |
| assert len(r.responses) == 1, f'{r.responses}' |
| assert r.responses[0]['status'] == 200, f'{r.responses}' |
| |
| # http: 1 response header larger than what curl buffers for |
| @pytest.mark.skipif(condition=not Env.httpd_is_at_least('2.4.64'), |
| reason='httpd must be at least 2.4.64') |
| @pytest.mark.parametrize("proto", Env.http_h1_h2_protos()) |
| def test_01_13_megalarge_resp_headers(self, env: Env, httpd, configures_httpd, proto): |
| httpd.set_extra_config('base', [ |
| 'LogLevel http2:trace2', |
| f'H2MaxHeaderBlockLen {130 * 1024}', |
| ]) |
| httpd.reload_if_config_changed() |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}' \ |
| f'/curltest/tweak?x-hd1={128 * 1024}' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[]) |
| if proto == 'h2': |
| r.check_exit_code(16) # CURLE_HTTP2 |
| else: |
| r.check_exit_code(100) # CURLE_TOO_LARGE |
| |
| # http: several response headers, together > 256 KB |
| # nghttp2 error -905: Too many CONTINUATION frames following a HEADER frame |
| @pytest.mark.skipif(condition=not Env.httpd_is_at_least('2.4.64'), |
| reason='httpd must be at least 2.4.64') |
| @pytest.mark.parametrize("proto", Env.http_h1_h2_protos()) |
| def test_01_14_gigalarge_resp_headers(self, env: Env, httpd, configures_httpd, proto): |
| httpd.set_extra_config('base', [ |
| 'LogLevel http2:trace2', |
| f'H2MaxHeaderBlockLen {1024 * 1024}', |
| ]) |
| httpd.reload_if_config_changed() |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}' \ |
| f'/curltest/tweak?x-hd={256 * 1024}' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[]) |
| if proto == 'h2': |
| r.check_exit_code(16) # CURLE_HTTP2 |
| else: |
| r.check_exit_code(0) # 1.1 can do |
| |
| # http: one response header > 256 KB |
| @pytest.mark.skipif(condition=not Env.httpd_is_at_least('2.4.64'), |
| reason='httpd must be at least 2.4.64') |
| @pytest.mark.parametrize("proto", Env.http_h1_h2_protos()) |
| def test_01_15_gigalarge_resp_headers(self, env: Env, httpd, configures_httpd, proto): |
| httpd.set_extra_config('base', [ |
| 'LogLevel http2:trace2', |
| f'H2MaxHeaderBlockLen {1024 * 1024}', |
| ]) |
| httpd.reload_if_config_changed() |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}' \ |
| f'/curltest/tweak?x-hd1={256 * 1024}' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[]) |
| if proto == 'h2': |
| r.check_exit_code(16) # CURLE_HTTP2 |
| else: |
| r.check_exit_code(100) # CURLE_TOO_LARGE |
| |
| # http: invalid request headers, GET, issue #16998 |
| @pytest.mark.parametrize("proto", Env.http_protos()) |
| def test_01_16_inv_req_get(self, env: Env, httpd, nghttpx, proto): |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo' |
| r = curl.http_get(url=url, alpn_proto=proto, extra_args=[ |
| '-H', "a: a\x0ab" |
| ]) |
| # on h1, request is sent, h2/h3 reject |
| if proto == 'http/1.1': |
| r.check_exit_code(0) |
| else: |
| r.check_exit_code(43) |
| |
| # http: special handling of TE request header |
| @pytest.mark.parametrize("te_in, te_out", [ |
| pytest.param('trailers', 'trailers', id='trailers'), |
| pytest.param('chunked', None, id='chunked'), |
| pytest.param('gzip, trailers', 'trailers', id='gzip+trailers'), |
| pytest.param('gzip ;q=0.2;x="y,x", trailers', 'trailers', id='gzip+q+x+trailers'), |
| pytest.param('gzip ;x="trailers", chunks', None, id='gzip+x+chunks'), |
| ]) |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_17_TE(self, env: Env, httpd, te_in, te_out): |
| proto = 'h2' |
| curl = CurlClient(env=env) |
| url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo' |
| r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True, |
| with_headers=True, |
| extra_args=['-H', f'TE: {te_in}']) |
| r.check_response(200) |
| if te_out is not None: |
| assert r.responses[0]['header']['request-te'] == te_out, f'{r.responses[0]}' |
| else: |
| assert 'request-te' not in r.responses[0]['header'], f'{r.responses[0]}' |
| |
| # check that an existing https: connection is not reused for http: |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_18_tls_reuse(self, env: Env, httpd): |
| proto = 'h2' |
| curl = CurlClient(env=env) |
| url1 = f'https://{env.authority_for(env.domain1, proto)}/data.json' |
| url2 = f'http://{env.authority_for(env.domain1, proto)}/data.json' |
| r = curl.http_download(urls=[url1, url2], alpn_proto=proto, with_stats=True) |
| assert len(r.stats) == 2 |
| assert r.total_connects == 2, f'{r.dump_logs()}' |
| |
| # check that an existing http: connection is not reused for https: |
| @pytest.mark.skipif(condition=not Env.have_h2_curl(), reason="curl without h2") |
| def test_01_19_plain_reuse(self, env: Env, httpd): |
| proto = 'h2' |
| curl = CurlClient(env=env) |
| url1 = f'http://{env.domain1}:{env.http_port}/data.json' |
| url2 = f'https://{env.domain1}:{env.http_port}/data.json' |
| r = curl.http_download(urls=[url1, url2], alpn_proto=proto, with_stats=True) |
| assert len(r.stats) == 2 |
| assert r.total_connects == 2, f'{r.dump_logs()}' |
| |
| # use a custom method containing a space |
| # check that h2/h3 did send that in the :method pseudo header. #19543 |
| @pytest.mark.skipif(condition=not Env.curl_is_verbose(), reason="needs verbosecurl") |
| @pytest.mark.parametrize("proto", Env.http_protos()) |
| def test_01_20_method_space(self, env: Env, proto, httpd): |
| curl = CurlClient(env=env) |
| method = 'IN SANE' |
| url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo' |
| r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True, |
| extra_args=['-X', method]) |
| if proto == 'h2' or proto == 'h3': |
| # h2+3 may close the connection for such invalid requests |
| re_m = re.compile(r'.*\[:method: ([^\]]+)\].*') |
| lines = [line for line in r.trace_lines if re_m.match(line)] |
| assert len(lines) == 1, f'{r.dump_logs()}' |
| m = re_m.match(lines[0]) |
| assert m.group(1) == method, f'{r.dump_logs()}' |
| else: |
| # h1 should give us a real response |
| assert len(r.stats) == 1 |
| r.check_response(http_status=400) |