| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| #*************************************************************************** |
| # _ _ ____ _ |
| # Project ___| | | | _ \| | |
| # / __| | | | |_) | | |
| # | (__| |_| | _ <| |___ |
| # \___|\___/|_| \_\_____| |
| # |
| # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. |
| # |
| # This software is licensed as described in the file COPYING, which |
| # you should have received as part of this distribution. The terms |
| # are also available at https://curl.se/docs/copyright.html. |
| # |
| # You may opt to use, copy, modify, merge, publish, distribute and/or sell |
| # copies of the Software, and permit persons to whom the Software is |
| # furnished to do so, under the terms of the COPYING file. |
| # |
| # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY |
| # KIND, either express or implied. |
| # |
| # SPDX-License-Identifier: curl |
| # |
| ########################################################################### |
| # |
| import logging |
| import os |
| import re |
| import shutil |
| import socket |
| import subprocess |
| import tempfile |
| from configparser import ConfigParser, ExtendedInterpolation |
| from typing import Optional |
| |
| from .certs import CertificateSpec, Credentials, TestCA |
| from .ports import alloc_ports |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| def init_config_from(conf_path): |
| if os.path.isfile(conf_path): |
| config = ConfigParser(interpolation=ExtendedInterpolation()) |
| config.read(conf_path) |
| return config |
| return None |
| |
| |
| TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__)) |
| TOP_PATH = os.path.join(os.getcwd(), os.path.pardir) |
| DEF_CONFIG = init_config_from(os.path.join(TOP_PATH, 'tests', 'http', 'config.ini')) |
| CURL = os.path.join(TOP_PATH, 'src', 'curl') |
| |
| |
| class EnvConfig: |
| |
| def __init__(self): |
| self.tests_dir = TESTS_HTTPD_PATH |
| self.gen_dir = os.path.join(self.tests_dir, 'gen') |
| self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir)) |
| self.build_dir = TOP_PATH |
| self.config = DEF_CONFIG |
| # check cur and its features |
| self.curl = CURL |
| if 'CURL' in os.environ: |
| self.curl = os.environ['CURL'] |
| self.curl_props = { |
| 'version_string': '', |
| 'version': '', |
| 'os': '', |
| 'fullname': '', |
| 'features_string': '', |
| 'features': set(), |
| 'protocols_string': '', |
| 'protocols': set(), |
| 'libs': set(), |
| 'lib_versions': set(), |
| } |
| self.curl_is_debug = False |
| self.curl_protos = [] |
| p = subprocess.run(args=[self.curl, '-V'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}') |
| if p.stderr.startswith('WARNING:'): |
| self.curl_is_debug = True |
| for line in p.stdout.splitlines(keepends=False): |
| if line.startswith('curl '): |
| self.curl_props['version_string'] = line |
| m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line) |
| if m: |
| self.curl_props['fullname'] = m.group(0) |
| self.curl_props['version'] = m.group('version') |
| self.curl_props['os'] = m.group('os') |
| self.curl_props['lib_versions'] = { |
| lib.lower() for lib in m.group('libs').split(' ') |
| } |
| self.curl_props['libs'] = { |
| re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions'] |
| } |
| if line.startswith('Features: '): |
| self.curl_props['features_string'] = line[10:] |
| self.curl_props['features'] = { |
| feat.lower() for feat in line[10:].split(' ') |
| } |
| if line.startswith('Protocols: '): |
| self.curl_props['protocols_string'] = line[11:] |
| self.curl_props['protocols'] = { |
| prot.lower() for prot in line[11:].split(' ') |
| } |
| |
| self.ports = alloc_ports(port_specs={ |
| 'ftp': socket.SOCK_STREAM, |
| 'ftps': socket.SOCK_STREAM, |
| 'http': socket.SOCK_STREAM, |
| 'https': socket.SOCK_STREAM, |
| 'nghttpx_https': socket.SOCK_STREAM, |
| 'proxy': socket.SOCK_STREAM, |
| 'proxys': socket.SOCK_STREAM, |
| 'h2proxys': socket.SOCK_STREAM, |
| 'caddy': socket.SOCK_STREAM, |
| 'caddys': socket.SOCK_STREAM, |
| 'ws': socket.SOCK_STREAM, |
| }) |
| self.httpd = self.config['httpd']['httpd'] |
| self.apachectl = self.config['httpd']['apachectl'] |
| self.apxs = self.config['httpd']['apxs'] |
| if len(self.apxs) == 0: |
| self.apxs = None |
| self._httpd_version = None |
| |
| self.examples_pem = { |
| 'key': 'xxx', |
| 'cert': 'xxx', |
| } |
| self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs') |
| self.tld = 'http.curl.se' |
| self.domain1 = f"one.{self.tld}" |
| self.domain1brotli = f"brotli.one.{self.tld}" |
| self.domain2 = f"two.{self.tld}" |
| self.ftp_domain = f"ftp.{self.tld}" |
| self.proxy_domain = f"proxy.{self.tld}" |
| self.cert_specs = [ |
| CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'), |
| CertificateSpec(domains=[self.domain2], key_type='rsa2048'), |
| CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'), |
| CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'), |
| CertificateSpec(name="clientsX", sub_specs=[ |
| CertificateSpec(name="user1", client=True), |
| ]), |
| ] |
| |
| self.nghttpx = self.config['nghttpx']['nghttpx'] |
| if len(self.nghttpx.strip()) == 0: |
| self.nghttpx = None |
| self._nghttpx_version = None |
| self.nghttpx_with_h3 = False |
| if self.nghttpx is not None: |
| p = subprocess.run(args=[self.nghttpx, '-v'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| # not a working nghttpx |
| self.nghttpx = None |
| else: |
| self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip()) |
| self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None |
| log.debug(f'nghttpx -v: {p.stdout}') |
| |
| self.caddy = self.config['caddy']['caddy'] |
| self._caddy_version = None |
| if len(self.caddy.strip()) == 0: |
| self.caddy = None |
| if self.caddy is not None: |
| try: |
| p = subprocess.run(args=[self.caddy, 'version'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| # not a working caddy |
| self.caddy = None |
| m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout) |
| if m: |
| self._caddy_version = m.group(1) |
| else: |
| raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}') |
| # TODO: specify specific exceptions here |
| except: # noqa: E722 |
| self.caddy = None |
| |
| self.vsftpd = self.config['vsftpd']['vsftpd'] |
| self._vsftpd_version = None |
| if self.vsftpd is not None: |
| try: |
| with tempfile.TemporaryFile('w+') as tmp: |
| p = subprocess.run(args=[self.vsftpd, '-v'], |
| capture_output=True, text=True, stdin=tmp) |
| if p.returncode != 0: |
| # not a working vsftpd |
| self.vsftpd = None |
| if p.stderr: |
| ver_text = p.stderr |
| else: |
| # Oddly, some versions of vsftpd write to stdin (!) |
| # instead of stderr, which is odd but works. If there |
| # is nothing on stderr, read the file on stdin and use |
| # any data there instead. |
| tmp.seek(0) |
| ver_text = tmp.read() |
| m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text) |
| if m: |
| self._vsftpd_version = m.group(1) |
| elif len(p.stderr) == 0: |
| # vsftp does not use stdout or stderr for printing its version... -.- |
| self._vsftpd_version = 'unknown' |
| else: |
| raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}') |
| except Exception: |
| self.vsftpd = None |
| |
| self._tcpdump = shutil.which('tcpdump') |
| |
| @property |
| def httpd_version(self): |
| if self._httpd_version is None and self.apxs is not None: |
| try: |
| p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}') |
| else: |
| self._httpd_version = p.stdout.strip() |
| except Exception: |
| log.exception(f'{self.apxs} failed to run') |
| return self._httpd_version |
| |
| def versiontuple(self, v): |
| v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) |
| return tuple(map(int, v.split('.'))) |
| |
| def httpd_is_at_least(self, minv): |
| if self.httpd_version is None: |
| return False |
| hv = self.versiontuple(self.httpd_version) |
| return hv >= self.versiontuple(minv) |
| |
| def caddy_is_at_least(self, minv): |
| if self.caddy_version is None: |
| return False |
| hv = self.versiontuple(self.caddy_version) |
| return hv >= self.versiontuple(minv) |
| |
| def is_complete(self) -> bool: |
| return os.path.isfile(self.httpd) and \ |
| os.path.isfile(self.apachectl) and \ |
| self.apxs is not None and \ |
| os.path.isfile(self.apxs) |
| |
| def get_incomplete_reason(self) -> Optional[str]: |
| if self.httpd is None or len(self.httpd.strip()) == 0: |
| return 'httpd not configured, see `--with-test-httpd=<path>`' |
| if not os.path.isfile(self.httpd): |
| return f'httpd ({self.httpd}) not found' |
| if not os.path.isfile(self.apachectl): |
| return f'apachectl ({self.apachectl}) not found' |
| if self.apxs is None: |
| return "command apxs not found (commonly provided in apache2-dev)" |
| if not os.path.isfile(self.apxs): |
| return f"apxs ({self.apxs}) not found" |
| return None |
| |
| @property |
| def nghttpx_version(self): |
| return self._nghttpx_version |
| |
| @property |
| def caddy_version(self): |
| return self._caddy_version |
| |
| @property |
| def vsftpd_version(self): |
| return self._vsftpd_version |
| |
| @property |
| def tcpdmp(self) -> Optional[str]: |
| return self._tcpdump |
| |
| |
| class Env: |
| |
| CONFIG = EnvConfig() |
| |
| @staticmethod |
| def setup_incomplete() -> bool: |
| return not Env.CONFIG.is_complete() |
| |
| @staticmethod |
| def incomplete_reason() -> Optional[str]: |
| return Env.CONFIG.get_incomplete_reason() |
| |
| @staticmethod |
| def have_nghttpx() -> bool: |
| return Env.CONFIG.nghttpx is not None |
| |
| @staticmethod |
| def have_h3_server() -> bool: |
| return Env.CONFIG.nghttpx_with_h3 |
| |
| @staticmethod |
| def have_ssl_curl() -> bool: |
| return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl') |
| |
| @staticmethod |
| def have_h2_curl() -> bool: |
| return 'http2' in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def have_h3_curl() -> bool: |
| return 'http3' in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def curl_uses_lib(libname: str) -> bool: |
| return libname.lower() in Env.CONFIG.curl_props['libs'] |
| |
| @staticmethod |
| def curl_uses_ossl_quic() -> bool: |
| if Env.have_h3_curl(): |
| return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3') |
| return False |
| |
| @staticmethod |
| def curl_version_string() -> str: |
| return Env.CONFIG.curl_props['version_string'] |
| |
| @staticmethod |
| def curl_features_string() -> str: |
| return Env.CONFIG.curl_props['features_string'] |
| |
| @staticmethod |
| def curl_has_feature(feature: str) -> bool: |
| return feature.lower() in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def curl_protocols_string() -> str: |
| return Env.CONFIG.curl_props['protocols_string'] |
| |
| @staticmethod |
| def curl_has_protocol(protocol: str) -> bool: |
| return protocol.lower() in Env.CONFIG.curl_props['protocols'] |
| |
| @staticmethod |
| def curl_lib_version(libname: str) -> str: |
| prefix = f'{libname.lower()}/' |
| for lversion in Env.CONFIG.curl_props['lib_versions']: |
| if lversion.startswith(prefix): |
| return lversion[len(prefix):] |
| return 'unknown' |
| |
| @staticmethod |
| def curl_lib_version_at_least(libname: str, min_version) -> bool: |
| lversion = Env.curl_lib_version(libname) |
| if lversion != 'unknown': |
| return Env.CONFIG.versiontuple(min_version) <= \ |
| Env.CONFIG.versiontuple(lversion) |
| return False |
| |
| @staticmethod |
| def curl_os() -> str: |
| return Env.CONFIG.curl_props['os'] |
| |
| @staticmethod |
| def curl_fullname() -> str: |
| return Env.CONFIG.curl_props['fullname'] |
| |
| @staticmethod |
| def curl_version() -> str: |
| return Env.CONFIG.curl_props['version'] |
| |
| @staticmethod |
| def curl_is_debug() -> bool: |
| return Env.CONFIG.curl_is_debug |
| |
| @staticmethod |
| def have_h3() -> bool: |
| return Env.have_h3_curl() and Env.have_h3_server() |
| |
| @staticmethod |
| def httpd_version() -> str: |
| return Env.CONFIG.httpd_version |
| |
| @staticmethod |
| def nghttpx_version() -> str: |
| return Env.CONFIG.nghttpx_version |
| |
| @staticmethod |
| def caddy_version() -> str: |
| return Env.CONFIG.caddy_version |
| |
| @staticmethod |
| def caddy_is_at_least(minv) -> bool: |
| return Env.CONFIG.caddy_is_at_least(minv) |
| |
| @staticmethod |
| def httpd_is_at_least(minv) -> bool: |
| return Env.CONFIG.httpd_is_at_least(minv) |
| |
| @staticmethod |
| def has_caddy() -> bool: |
| return Env.CONFIG.caddy is not None |
| |
| @staticmethod |
| def has_vsftpd() -> bool: |
| return Env.CONFIG.vsftpd is not None |
| |
| @staticmethod |
| def vsftpd_version() -> str: |
| return Env.CONFIG.vsftpd_version |
| |
| @staticmethod |
| def tcpdump() -> Optional[str]: |
| return Env.CONFIG.tcpdmp |
| |
| def __init__(self, pytestconfig=None): |
| self._verbose = pytestconfig.option.verbose \ |
| if pytestconfig is not None else 0 |
| self._ca = None |
| self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds |
| |
| def issue_certs(self): |
| if self._ca is None: |
| ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca') |
| self._ca = TestCA.create_root(name=self.CONFIG.tld, |
| store_dir=ca_dir, |
| key_type="rsa2048") |
| self._ca.issue_certs(self.CONFIG.cert_specs) |
| |
| def setup(self): |
| os.makedirs(self.gen_dir, exist_ok=True) |
| os.makedirs(self.htdocs_dir, exist_ok=True) |
| self.issue_certs() |
| |
| def get_credentials(self, domain) -> Optional[Credentials]: |
| creds = self.ca.get_credentials_for_name(domain) |
| if len(creds) > 0: |
| return creds[0] |
| return None |
| |
| @property |
| def verbose(self) -> int: |
| return self._verbose |
| |
| @property |
| def test_timeout(self) -> Optional[float]: |
| return self._test_timeout |
| |
| @test_timeout.setter |
| def test_timeout(self, val: Optional[float]): |
| self._test_timeout = val |
| |
| @property |
| def gen_dir(self) -> str: |
| return self.CONFIG.gen_dir |
| |
| @property |
| def project_dir(self) -> str: |
| return self.CONFIG.project_dir |
| |
| @property |
| def build_dir(self) -> str: |
| return self.CONFIG.build_dir |
| |
| @property |
| def ca(self): |
| return self._ca |
| |
| @property |
| def htdocs_dir(self) -> str: |
| return self.CONFIG.htdocs_dir |
| |
| @property |
| def tld(self) -> str: |
| return self.CONFIG.tld |
| |
| @property |
| def domain1(self) -> str: |
| return self.CONFIG.domain1 |
| |
| @property |
| def domain1brotli(self) -> str: |
| return self.CONFIG.domain1brotli |
| |
| @property |
| def domain2(self) -> str: |
| return self.CONFIG.domain2 |
| |
| @property |
| def ftp_domain(self) -> str: |
| return self.CONFIG.ftp_domain |
| |
| @property |
| def proxy_domain(self) -> str: |
| return self.CONFIG.proxy_domain |
| |
| @property |
| def http_port(self) -> int: |
| return self.CONFIG.ports['http'] |
| |
| @property |
| def https_port(self) -> int: |
| return self.CONFIG.ports['https'] |
| |
| @property |
| def nghttpx_https_port(self) -> int: |
| return self.CONFIG.ports['nghttpx_https'] |
| |
| @property |
| def h3_port(self) -> int: |
| return self.https_port |
| |
| @property |
| def proxy_port(self) -> int: |
| return self.CONFIG.ports['proxy'] |
| |
| @property |
| def proxys_port(self) -> int: |
| return self.CONFIG.ports['proxys'] |
| |
| @property |
| def ftp_port(self) -> int: |
| return self.CONFIG.ports['ftp'] |
| |
| @property |
| def ftps_port(self) -> int: |
| return self.CONFIG.ports['ftps'] |
| |
| @property |
| def h2proxys_port(self) -> int: |
| return self.CONFIG.ports['h2proxys'] |
| |
| def pts_port(self, proto: str = 'http/1.1') -> int: |
| # proxy tunnel port |
| return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys'] |
| |
| @property |
| def caddy(self) -> str: |
| return self.CONFIG.caddy |
| |
| @property |
| def caddy_https_port(self) -> int: |
| return self.CONFIG.ports['caddys'] |
| |
| @property |
| def caddy_http_port(self) -> int: |
| return self.CONFIG.ports['caddy'] |
| |
| @property |
| def vsftpd(self) -> str: |
| return self.CONFIG.vsftpd |
| |
| @property |
| def ws_port(self) -> int: |
| return self.CONFIG.ports['ws'] |
| |
| @property |
| def curl(self) -> str: |
| return self.CONFIG.curl |
| |
| @property |
| def httpd(self) -> str: |
| return self.CONFIG.httpd |
| |
| @property |
| def apachectl(self) -> str: |
| return self.CONFIG.apachectl |
| |
| @property |
| def apxs(self) -> str: |
| return self.CONFIG.apxs |
| |
| @property |
| def nghttpx(self) -> Optional[str]: |
| return self.CONFIG.nghttpx |
| |
| @property |
| def slow_network(self) -> bool: |
| return "CURL_DBG_SOCK_WBLOCK" in os.environ or \ |
| "CURL_DBG_SOCK_WPARTIAL" in os.environ |
| |
| @property |
| def ci_run(self) -> bool: |
| return "CURL_CI" in os.environ |
| |
| def port_for(self, alpn_proto: Optional[str] = None): |
| if alpn_proto is None or \ |
| alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']: |
| return self.https_port |
| if alpn_proto in ['h3']: |
| return self.h3_port |
| return self.http_port |
| |
| def authority_for(self, domain: str, alpn_proto: Optional[str] = None): |
| return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}' |
| |
| def make_data_file(self, indir: str, fname: str, fsize: int, |
| line_length: int = 1024) -> str: |
| if line_length < 11: |
| raise RuntimeError('line_length less than 11 not supported') |
| fpath = os.path.join(indir, fname) |
| s10 = "0123456789" |
| s = round((line_length / 10) + 1) * s10 |
| s = s[0:line_length-11] |
| with open(fpath, 'w') as fd: |
| for i in range(int(fsize / line_length)): |
| fd.write(f"{i:09d}-{s}\n") |
| remain = int(fsize % line_length) |
| if remain != 0: |
| i = int(fsize / line_length) + 1 |
| fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n") |
| return fpath |