| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| #*************************************************************************** |
| # _ _ ____ _ |
| # Project ___| | | | _ \| | |
| # / __| | | | |_) | | |
| # | (__| |_| | _ <| |___ |
| # \___|\___/|_| \_\_____| |
| # |
| # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. |
| # |
| # This software is licensed as described in the file COPYING, which |
| # you should have received as part of this distribution. The terms |
| # are also available at https://curl.se/docs/copyright.html. |
| # |
| # You may opt to use, copy, modify, merge, publish, distribute and/or sell |
| # copies of the Software, and permit persons to whom the Software is |
| # furnished to do so, under the terms of the COPYING file. |
| # |
| # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY |
| # KIND, either express or implied. |
| # |
| # SPDX-License-Identifier: curl |
| # |
| ########################################################################### |
| # |
| import gzip |
| import logging |
| import os |
| import re |
| import shutil |
| import subprocess |
| import tempfile |
| from configparser import ConfigParser, ExtendedInterpolation |
| from datetime import timedelta |
| from typing import Optional, Dict, List |
| |
| import pytest |
| from filelock import FileLock |
| |
| from .certs import CertificateSpec, Credentials, TestCA |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| def init_config_from(conf_path): |
| if os.path.isfile(conf_path): |
| config = ConfigParser(interpolation=ExtendedInterpolation()) |
| config.read(conf_path) |
| return config |
| return None |
| |
| |
| TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__)) |
| PROJ_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH)) |
| TOP_PATH = os.path.join(os.getcwd(), os.path.pardir) |
| CONFIG_PATH = os.path.join(TOP_PATH, 'tests', 'http', 'config.ini') |
| if not os.path.exists(CONFIG_PATH): |
| ALT_CONFIG_PATH = os.path.join(PROJ_PATH, 'tests', 'http', 'config.ini') |
| if not os.path.exists(ALT_CONFIG_PATH): |
| raise Exception(f'unable to find config.ini in {CONFIG_PATH} nor {ALT_CONFIG_PATH}') |
| TOP_PATH = PROJ_PATH |
| CONFIG_PATH = ALT_CONFIG_PATH |
| DEF_CONFIG = init_config_from(CONFIG_PATH) |
| CURL = os.path.join(TOP_PATH, 'src', 'curl') |
| CURLINFO = os.path.join(TOP_PATH, 'src', 'curlinfo') |
| |
| |
| class NghttpxUtil: |
| |
| CMD = None |
| VERSION_FULL = None |
| |
| @classmethod |
| def version(cls, cmd): |
| if cmd is None: |
| return None |
| if cls.VERSION_FULL is None or cmd != cls.CMD: |
| p = subprocess.run(args=[cmd, '--version'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| raise RuntimeError(f'{cmd} --version failed with exit code: {p.returncode}') |
| cls.CMD = cmd |
| for line in p.stdout.splitlines(keepends=False): |
| if line.startswith('nghttpx '): |
| cls.VERSION_FULL = line |
| if cls.VERSION_FULL is None: |
| raise RuntimeError(f'{cmd}: unable to determine version') |
| return cls.VERSION_FULL |
| |
| @staticmethod |
| def version_with_h3(version): |
| return re.match(r'.* ngtcp2/\d+\.\d+\.\d+.*', version) is not None |
| |
| |
| class EnvConfig: |
| |
| def __init__(self, pytestconfig: Optional[pytest.Config] = None, |
| testrun_uid=None, |
| worker_id=None): |
| self.pytestconfig = pytestconfig |
| self.testrun_uid = testrun_uid |
| self.worker_id = worker_id if worker_id is not None else 'master' |
| self.tests_dir = TESTS_HTTPD_PATH |
| self.gen_root = self.gen_dir = os.path.join(self.tests_dir, 'gen') |
| if self.worker_id != 'master': |
| self.gen_dir = os.path.join(self.gen_dir, self.worker_id) |
| self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir)) |
| self.build_dir = TOP_PATH |
| self.config = DEF_CONFIG |
| # check cur and its features |
| self.curl = CURL |
| self.curlinfo = CURLINFO |
| if 'CURL' in os.environ: |
| self.curl = os.environ['CURL'] |
| self.curl_props = { |
| 'version_string': '', |
| 'version': '', |
| 'os': '', |
| 'fullname': '', |
| 'features_string': '', |
| 'features': set(), |
| 'protocols_string': '', |
| 'protocols': set(), |
| 'libs': set(), |
| 'lib_versions': set(), |
| } |
| self.curl_is_debug = False |
| self.curl_protos = [] |
| p = subprocess.run(args=[self.curl, '-V'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| raise RuntimeError(f'{self.curl} -V failed with exit code: {p.returncode}') |
| if p.stderr.startswith('WARNING:'): |
| self.curl_is_debug = True |
| for line in p.stdout.splitlines(keepends=False): |
| if line.startswith('curl '): |
| self.curl_props['version_string'] = line |
| m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line) |
| if m: |
| self.curl_props['fullname'] = m.group(0) |
| self.curl_props['version'] = m.group('version') |
| self.curl_props['os'] = m.group('os') |
| self.curl_props['lib_versions'] = { |
| lib.lower() for lib in m.group('libs').split(' ') |
| } |
| self.curl_props['libs'] = { |
| re.sub(r'/[a-z0-9.-]*', '', lib) for lib in self.curl_props['lib_versions'] |
| } |
| if line.startswith('Features: '): |
| self.curl_props['features_string'] = line[10:] |
| self.curl_props['features'] = { |
| feat.lower() for feat in line[10:].split(' ') |
| } |
| if line.startswith('Protocols: '): |
| self.curl_props['protocols_string'] = line[11:] |
| self.curl_props['protocols'] = { |
| prot.lower() for prot in line[11:].split(' ') |
| } |
| |
| p = subprocess.run(args=[self.curlinfo], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| raise RuntimeError(f'{self.curlinfo} failed with exit code: {p.returncode}') |
| self.curl_is_verbose = 'verbose-strings: ON' in p.stdout |
| self.curl_can_cert_status = 'cert-status: ON' in p.stdout |
| |
| self.ports = {} |
| |
| self.httpd = self.config['httpd']['httpd'] |
| self.apxs = self.config['httpd']['apxs'] |
| if len(self.apxs) == 0: |
| self.apxs = None |
| self._httpd_version = None |
| |
| self.examples_pem = { |
| 'key': 'xxx', |
| 'cert': 'xxx', |
| } |
| self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs') |
| self.tld = 'http.curl.se' |
| self.domain1 = f"one.{self.tld}" |
| self.domain1brotli = f"brotli.one.{self.tld}" |
| self.domain2 = f"two.{self.tld}" |
| self.ftp_domain = f"ftp.{self.tld}" |
| self.proxy_domain = f"proxy.{self.tld}" |
| self.expired_domain = f"expired.{self.tld}" |
| self.cert_specs = [ |
| CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'), |
| CertificateSpec(name='domain1-no-ip', domains=[self.domain1, self.domain1brotli], key_type='rsa2048'), |
| CertificateSpec(name='domain1-very-bad', domains=[self.domain1, 'dns:127.0.0.1'], key_type='rsa2048'), |
| CertificateSpec(domains=[self.domain2], key_type='rsa2048'), |
| CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'), |
| CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'), |
| CertificateSpec(domains=[self.expired_domain], key_type='rsa2048', |
| valid_from=timedelta(days=-100), valid_to=timedelta(days=-10)), |
| CertificateSpec(name="clientsX", sub_specs=[ |
| CertificateSpec(name="user1", client=True), |
| ]), |
| ] |
| |
| self.openssl = 'openssl' |
| p = subprocess.run(args=[self.openssl, 'version'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| # no openssl in path |
| self.openssl = None |
| self.openssl_version = None |
| else: |
| self.openssl_version = p.stdout.strip() |
| |
| self.nghttpx = self.config['nghttpx']['nghttpx'] |
| if len(self.nghttpx.strip()) == 0: |
| self.nghttpx = None |
| self._nghttpx_version = None |
| self.nghttpx_with_h3 = False |
| if self.nghttpx is not None: |
| self._nghttpx_version = NghttpxUtil.version(self.nghttpx) |
| self.nghttpx_with_h3 = NghttpxUtil.version_with_h3(self._nghttpx_version) |
| |
| self.caddy = self.config['caddy']['caddy'] |
| self._caddy_version = None |
| if len(self.caddy.strip()) == 0: |
| self.caddy = None |
| if self.caddy is not None: |
| p = subprocess.run(args=[self.caddy, 'version'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| # not a working caddy |
| self.caddy = None |
| m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout) |
| if m: |
| self._caddy_version = m.group(1) |
| else: |
| raise RuntimeError(f'Unable to determine cadd version from: {p.stdout}') |
| |
| self.vsftpd = self.config['vsftpd']['vsftpd'] |
| if self.vsftpd == '': |
| self.vsftpd = None |
| self._vsftpd_version = None |
| if self.vsftpd is not None: |
| with tempfile.TemporaryFile('w+') as tmp: |
| p = subprocess.run(args=[self.vsftpd, '-v'], |
| capture_output=True, text=True, stdin=tmp) |
| if p.returncode != 0: |
| # not a working vsftpd |
| self.vsftpd = None |
| if p.stderr: |
| ver_text = p.stderr |
| else: |
| # Oddly, some versions of vsftpd write to stdin (!) |
| # instead of stderr, which is odd but works. If there |
| # is nothing on stderr, read the file on stdin and use |
| # any data there instead. |
| tmp.seek(0) |
| ver_text = tmp.read() |
| m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', ver_text) |
| if m: |
| self._vsftpd_version = m.group(1) |
| elif len(p.stderr) == 0: |
| # vsftp does not use stdout or stderr for printing its version... -.- |
| self._vsftpd_version = 'unknown' |
| else: |
| raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}') |
| |
| self.danted = self.config['danted']['danted'] |
| if self.danted == '': |
| self.danted = None |
| self._danted_version = None |
| if self.danted is not None: |
| p = subprocess.run(args=[self.danted, '-v'], |
| capture_output=True, text=True) |
| assert p.returncode == 0 |
| if p.returncode != 0: |
| # not a working vsftpd |
| self.danted = None |
| m = re.match(r'^Dante v(\d+\.\d+\.\d+).*', p.stdout) |
| if not m: |
| m = re.match(r'^Dante v(\d+\.\d+\.\d+).*', p.stderr) |
| if m: |
| self._danted_version = m.group(1) |
| else: |
| self.danted = None |
| raise Exception(f'Unable to determine danted version from: {p.stderr}') |
| |
| self.sshd = self.config['sshd']['sshd'] |
| if self.sshd == '': |
| self.sshd = None |
| self._sshd_version = None |
| if self.sshd is not None: |
| p = subprocess.run(args=[self.sshd, '-V'], |
| capture_output=True, text=True) |
| assert p.returncode == 0 |
| if p.returncode != 0: |
| self.sshd = None |
| else: |
| m = re.match(r'^OpenSSH_(\d+\.\d+.*),.*', p.stderr) |
| assert m, f'version: {p.stderr}' |
| if m: |
| self._sshd_version = m.group(1) |
| else: |
| self.sshd = None |
| raise Exception(f'Unable to determine sshd version from: {p.stderr}') |
| |
| if self.sshd: |
| self.sftpd = self.config['sshd']['sftpd'] |
| if self.sftpd == '': |
| self.sftpd = None |
| else: |
| self.sftpd = None |
| |
| self._tcpdump = shutil.which('tcpdump') |
| |
| @property |
| def httpd_version(self): |
| if self._httpd_version is None and self.apxs is not None: |
| try: |
| p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'], |
| capture_output=True, text=True) |
| if p.returncode != 0: |
| log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}') |
| else: |
| self._httpd_version = p.stdout.strip() |
| except Exception: |
| log.exception(f'{self.apxs} failed to run') |
| return self._httpd_version |
| |
| def versiontuple(self, v): |
| v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v) |
| return tuple(map(int, v.split('.'))) |
| |
| def httpd_is_at_least(self, minv): |
| if self.httpd_version is None: |
| return False |
| hv = self.versiontuple(self.httpd_version) |
| return hv >= self.versiontuple(minv) |
| |
| def caddy_is_at_least(self, minv): |
| if self.caddy_version is None: |
| return False |
| hv = self.versiontuple(self.caddy_version) |
| return hv >= self.versiontuple(minv) |
| |
| def is_complete(self) -> bool: |
| return os.path.isfile(self.httpd) and \ |
| self.apxs is not None and \ |
| os.path.isfile(self.apxs) |
| |
| def get_incomplete_reason(self) -> Optional[str]: |
| if self.httpd is None or len(self.httpd.strip()) == 0: |
| return 'httpd not configured, see `--with-test-httpd=<path>`' |
| if not os.path.isfile(self.httpd): |
| return f'httpd ({self.httpd}) not found' |
| if self.apxs is None: |
| return "command apxs not found (commonly provided in apache2-dev)" |
| if not os.path.isfile(self.apxs): |
| return f"apxs ({self.apxs}) not found" |
| return None |
| |
| @property |
| def nghttpx_version(self): |
| return self._nghttpx_version |
| |
| @property |
| def caddy_version(self): |
| return self._caddy_version |
| |
| @property |
| def vsftpd_version(self): |
| return self._vsftpd_version |
| |
| @property |
| def tcpdmp(self) -> Optional[str]: |
| return self._tcpdump |
| |
| def clear_locks(self): |
| ca_lock = os.path.join(self.gen_root, 'ca/ca.lock') |
| if os.path.exists(ca_lock): |
| os.remove(ca_lock) |
| |
| |
| class Env: |
| |
| SERVER_TIMEOUT = 30 # seconds to wait for server to come up/reload |
| |
| CONFIG = EnvConfig() |
| |
| @staticmethod |
| def setup_incomplete() -> bool: |
| return not Env.CONFIG.is_complete() |
| |
| @staticmethod |
| def incomplete_reason() -> Optional[str]: |
| return Env.CONFIG.get_incomplete_reason() |
| |
| @staticmethod |
| def have_openssl() -> bool: |
| return Env.CONFIG.openssl is not None |
| |
| @staticmethod |
| def have_nghttpx() -> bool: |
| return Env.CONFIG.nghttpx is not None |
| |
| @staticmethod |
| def have_h3_server() -> bool: |
| return Env.CONFIG.nghttpx_with_h3 |
| |
| @staticmethod |
| def have_ssl_curl() -> bool: |
| return Env.curl_has_feature('ssl') or Env.curl_has_feature('multissl') |
| |
| @staticmethod |
| def have_h2_curl() -> bool: |
| return 'http2' in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def have_h3_curl() -> bool: |
| return 'http3' in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def have_compressed_curl() -> bool: |
| return 'brotli' in Env.CONFIG.curl_props['libs'] or \ |
| 'zlib' in Env.CONFIG.curl_props['libs'] or \ |
| 'zstd' in Env.CONFIG.curl_props['libs'] |
| |
| @staticmethod |
| def curl_uses_lib(libname: str) -> bool: |
| return libname.lower() in Env.CONFIG.curl_props['libs'] |
| |
| @staticmethod |
| def curl_uses_any_libs(libs: List[str]) -> bool: |
| for libname in libs: |
| if libname.lower() in Env.CONFIG.curl_props['libs']: |
| return True |
| return False |
| |
| @staticmethod |
| def curl_uses_ossl_quic() -> bool: |
| if Env.have_h3_curl(): |
| return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3') |
| return False |
| |
| @staticmethod |
| def curl_version_string() -> str: |
| return Env.CONFIG.curl_props['version_string'] |
| |
| @staticmethod |
| def curl_features_string() -> str: |
| return Env.CONFIG.curl_props['features_string'] |
| |
| @staticmethod |
| def curl_has_feature(feature: str) -> bool: |
| return feature.lower() in Env.CONFIG.curl_props['features'] |
| |
| @staticmethod |
| def curl_protocols_string() -> str: |
| return Env.CONFIG.curl_props['protocols_string'] |
| |
| @staticmethod |
| def curl_has_protocol(protocol: str) -> bool: |
| return protocol.lower() in Env.CONFIG.curl_props['protocols'] |
| |
| @staticmethod |
| def curl_lib_version(libname: str) -> str: |
| prefix = f'{libname.lower()}/' |
| for lversion in Env.CONFIG.curl_props['lib_versions']: |
| if lversion.startswith(prefix): |
| return lversion[len(prefix):] |
| return 'unknown' |
| |
| @staticmethod |
| def curl_lib_version_at_least(libname: str, min_version) -> bool: |
| lversion = Env.curl_lib_version(libname) |
| if lversion != 'unknown': |
| return Env.CONFIG.versiontuple(min_version) <= \ |
| Env.CONFIG.versiontuple(lversion) |
| return False |
| |
| @staticmethod |
| def curl_lib_version_before(libname: str, lib_version) -> bool: |
| lversion = Env.curl_lib_version(libname) |
| if lversion != 'unknown': |
| if m := re.match(r'(\d+\.\d+\.\d+).*', lversion): |
| lversion = m.group(1) |
| return Env.CONFIG.versiontuple(lib_version) > \ |
| Env.CONFIG.versiontuple(lversion) |
| return False |
| |
| @staticmethod |
| def curl_os() -> str: |
| return Env.CONFIG.curl_props['os'] |
| |
| @staticmethod |
| def curl_fullname() -> str: |
| return Env.CONFIG.curl_props['fullname'] |
| |
| @staticmethod |
| def curl_version() -> str: |
| return Env.CONFIG.curl_props['version'] |
| |
| @staticmethod |
| def curl_is_debug() -> bool: |
| return Env.CONFIG.curl_is_debug |
| |
| @staticmethod |
| def curl_is_verbose() -> bool: |
| return Env.CONFIG.curl_is_verbose |
| |
| @staticmethod |
| def curl_can_cert_status() -> bool: |
| return Env.CONFIG.curl_can_cert_status |
| |
| @staticmethod |
| def curl_can_early_data() -> bool: |
| if Env.curl_uses_lib('gnutls'): |
| return Env.curl_lib_version_at_least('gnutls', '3.6.13') |
| return Env.curl_uses_any_libs(['wolfssl', 'quictls', 'openssl']) |
| |
| @staticmethod |
| def curl_can_h3_early_data() -> bool: |
| return Env.curl_can_early_data() and \ |
| Env.curl_uses_lib('ngtcp2') |
| |
| @staticmethod |
| def http_protos() -> List[str]: |
| # http protocols we can test |
| if Env.have_h2_curl(): |
| if Env.have_h3(): |
| return ['http/1.1', 'h2', 'h3'] |
| else: |
| return ['http/1.1', 'h2'] |
| else: |
| return ['http/1.1'] |
| |
| @staticmethod |
| def http_h1_h2_protos() -> List[str]: |
| # http 1+2 protocols we can test |
| if Env.have_h2_curl(): |
| return ['http/1.1', 'h2'] |
| else: |
| return ['http/1.1'] |
| |
| @staticmethod |
| def http_mplx_protos() -> List[str]: |
| # http multiplexing protocols we can test |
| if Env.have_h2_curl(): |
| if Env.have_h3(): |
| return ['h2', 'h3'] |
| else: |
| return ['h2'] |
| else: |
| return [] |
| |
| @staticmethod |
| def have_h3() -> bool: |
| return Env.have_h3_curl() and Env.have_h3_server() |
| |
| @staticmethod |
| def httpd_version() -> str: |
| return Env.CONFIG.httpd_version |
| |
| @staticmethod |
| def nghttpx_version() -> str: |
| return Env.CONFIG.nghttpx_version |
| |
| @staticmethod |
| def caddy_version() -> str: |
| return Env.CONFIG.caddy_version |
| |
| @staticmethod |
| def caddy_is_at_least(minv) -> bool: |
| return Env.CONFIG.caddy_is_at_least(minv) |
| |
| @staticmethod |
| def httpd_is_at_least(minv) -> bool: |
| return Env.CONFIG.httpd_is_at_least(minv) |
| |
| @staticmethod |
| def has_caddy() -> bool: |
| return Env.CONFIG.caddy is not None |
| |
| @staticmethod |
| def has_vsftpd() -> bool: |
| return Env.CONFIG.vsftpd is not None |
| |
| @staticmethod |
| def vsftpd_version() -> str: |
| return Env.CONFIG.vsftpd_version |
| |
| @staticmethod |
| def has_danted() -> bool: |
| return Env.CONFIG.danted is not None |
| |
| @staticmethod |
| def has_sshd() -> bool: |
| return Env.CONFIG.sshd is not None |
| |
| @staticmethod |
| def has_sftpd() -> bool: |
| return Env.has_sshd() and Env.CONFIG.sftpd is not None |
| |
| @staticmethod |
| def tcpdump() -> Optional[str]: |
| return Env.CONFIG.tcpdmp |
| |
| def __init__(self, pytestconfig=None, env_config=None): |
| if env_config: |
| Env.CONFIG = env_config |
| self._verbose = pytestconfig.option.verbose \ |
| if pytestconfig is not None else 0 |
| self._ca = None |
| self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds |
| |
| def issue_certs(self): |
| if self._ca is None: |
| # ca_dir = os.path.join(self.CONFIG.gen_root, 'ca') |
| ca_dir = os.path.join(self.gen_dir, 'ca') |
| os.makedirs(ca_dir, exist_ok=True) |
| lock_file = os.path.join(ca_dir, 'ca.lock') |
| with FileLock(lock_file): |
| self._ca = TestCA.create_root(name=self.CONFIG.tld, |
| store_dir=ca_dir, |
| key_type="rsa2048") |
| self._ca.issue_certs(self.CONFIG.cert_specs) |
| if self.have_openssl(): |
| self._ca.create_hashdir(self.openssl) |
| |
| def setup(self): |
| os.makedirs(self.gen_dir, exist_ok=True) |
| os.makedirs(self.htdocs_dir, exist_ok=True) |
| self.issue_certs() |
| |
| def get_credentials(self, domain) -> Optional[Credentials]: |
| creds = self.ca.get_credentials_for_name(domain) |
| if len(creds) > 0: |
| return creds[0] |
| return None |
| |
| @property |
| def verbose(self) -> int: |
| return self._verbose |
| |
| @property |
| def test_timeout(self) -> Optional[float]: |
| return self._test_timeout |
| |
| @test_timeout.setter |
| def test_timeout(self, val: Optional[float]): |
| self._test_timeout = val |
| |
| @property |
| def gen_dir(self) -> str: |
| return self.CONFIG.gen_dir |
| |
| @property |
| def gen_root(self) -> str: |
| return self.CONFIG.gen_root |
| |
| @property |
| def project_dir(self) -> str: |
| return self.CONFIG.project_dir |
| |
| @property |
| def build_dir(self) -> str: |
| return self.CONFIG.build_dir |
| |
| @property |
| def ca(self): |
| return self._ca |
| |
| @property |
| def htdocs_dir(self) -> str: |
| return self.CONFIG.htdocs_dir |
| |
| @property |
| def tld(self) -> str: |
| return self.CONFIG.tld |
| |
| @property |
| def domain1(self) -> str: |
| return self.CONFIG.domain1 |
| |
| @property |
| def domain1brotli(self) -> str: |
| return self.CONFIG.domain1brotli |
| |
| @property |
| def domain2(self) -> str: |
| return self.CONFIG.domain2 |
| |
| @property |
| def ftp_domain(self) -> str: |
| return self.CONFIG.ftp_domain |
| |
| @property |
| def proxy_domain(self) -> str: |
| return self.CONFIG.proxy_domain |
| |
| @property |
| def expired_domain(self) -> str: |
| return self.CONFIG.expired_domain |
| |
| @property |
| def ports(self) -> Dict[str, int]: |
| return self.CONFIG.ports |
| |
| def update_ports(self, ports: Dict[str, int]): |
| self.CONFIG.ports.update(ports) |
| |
| @property |
| def http_port(self) -> int: |
| return self.CONFIG.ports.get('http', 0) |
| |
| @property |
| def https_port(self) -> int: |
| return self.CONFIG.ports['https'] |
| |
| @property |
| def https_only_tcp_port(self) -> int: |
| return self.CONFIG.ports['https-tcp-only'] |
| |
| @property |
| def nghttpx_https_port(self) -> int: |
| return self.CONFIG.ports['nghttpx_https'] |
| |
| @property |
| def h3_port(self) -> int: |
| return self.https_port |
| |
| @property |
| def proxy_port(self) -> int: |
| return self.CONFIG.ports['proxy'] |
| |
| @property |
| def proxys_port(self) -> int: |
| return self.CONFIG.ports['proxys'] |
| |
| @property |
| def ftp_port(self) -> int: |
| return self.CONFIG.ports['ftp'] |
| |
| @property |
| def ftps_port(self) -> int: |
| return self.CONFIG.ports['ftps'] |
| |
| @property |
| def h2proxys_port(self) -> int: |
| return self.CONFIG.ports['h2proxys'] |
| |
| def pts_port(self, proto: str = 'http/1.1') -> int: |
| # proxy tunnel port |
| return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys'] |
| |
| @property |
| def caddy(self) -> str: |
| return self.CONFIG.caddy |
| |
| @property |
| def caddy_https_port(self) -> int: |
| return self.CONFIG.ports['caddys'] |
| |
| @property |
| def caddy_http_port(self) -> int: |
| return self.CONFIG.ports['caddy'] |
| |
| @property |
| def danted(self) -> str: |
| return self.CONFIG.danted |
| |
| @property |
| def vsftpd(self) -> str: |
| return self.CONFIG.vsftpd |
| |
| @property |
| def ws_port(self) -> int: |
| return self.CONFIG.ports['ws'] |
| |
| @property |
| def curl(self) -> str: |
| return self.CONFIG.curl |
| |
| @property |
| def openssl(self) -> Optional[str]: |
| return self.CONFIG.openssl |
| |
| @property |
| def httpd(self) -> str: |
| return self.CONFIG.httpd |
| |
| @property |
| def apxs(self) -> str: |
| return self.CONFIG.apxs |
| |
| @property |
| def nghttpx(self) -> Optional[str]: |
| return self.CONFIG.nghttpx |
| |
| @property |
| def slow_network(self) -> bool: |
| return "CURL_DBG_SOCK_WBLOCK" in os.environ or \ |
| "CURL_DBG_SOCK_WPARTIAL" in os.environ |
| |
| @property |
| def ci_run(self) -> bool: |
| return "CURL_CI" in os.environ |
| |
| def port_for(self, alpn_proto: Optional[str] = None): |
| if alpn_proto is None or \ |
| alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']: |
| return self.https_port |
| if alpn_proto in ['h3']: |
| return self.h3_port |
| return self.http_port |
| |
| def authority_for(self, domain: str, alpn_proto: Optional[str] = None): |
| return f'{domain}:{self.port_for(alpn_proto=alpn_proto)}' |
| |
| def make_data_file(self, indir: str, fname: str, fsize: int, |
| line_length: int = 1024) -> str: |
| if line_length < 11: |
| raise RuntimeError('line_length less than 11 not supported') |
| fpath = os.path.join(indir, fname) |
| s10 = "0123456789" |
| s = round((line_length / 10) + 1) * s10 |
| s = s[0:line_length-11] |
| with open(fpath, 'w') as fd: |
| for i in range(int(fsize / line_length)): |
| fd.write(f"{i:09d}-{s}\n") |
| remain = int(fsize % line_length) |
| if remain != 0: |
| i = int(fsize / line_length) + 1 |
| fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n") |
| return fpath |
| |
| def make_data_gzipbomb(self, indir: str, fname: str, fsize: int) -> str: |
| fpath = os.path.join(indir, fname) |
| gzpath = f'{fpath}.gz' |
| varpath = f'{fpath}.var' |
| |
| with open(fpath, 'w') as fd: |
| fd.write('not what we are looking for!\n') |
| count = int(fsize / 1024) |
| zero1k = bytearray(1024) |
| with gzip.open(gzpath, 'wb') as fd: |
| for _ in range(count): |
| fd.write(zero1k) |
| with open(varpath, 'w') as fd: |
| fd.write(f'URI: {fname}\n') |
| fd.write('\n') |
| fd.write(f'URI: {fname}.gz\n') |
| fd.write('Content-Type: text/plain\n') |
| fd.write('Content-Encoding: x-gzip\n') |
| fd.write('\n') |
| return fpath |