blob: 46632cb712e3f4eaf3ae16c9a8dc229381f8bcb2 [file] [log] [blame]
# Copyright 2022 The Fuchsia Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import time
from antlion import signals, utils
from antlion.controllers.openwrt_lib import network_const
SERVICE_DNSMASQ = "dnsmasq"
SERVICE_STUNNEL = "stunnel"
SERVICE_NETWORK = "network"
SERVICE_PPTPD = "pptpd"
SERVICE_FIREWALL = "firewall"
SERVICE_IPSEC = "ipsec"
SERVICE_XL2TPD = "xl2tpd"
SERVICE_ODHCPD = "odhcpd"
SERVICE_OPENNDS = "opennds"
SERVICE_UHTTPD = "uhttpd"
PPTP_PACKAGE = "pptpd kmod-nf-nathelper-extra"
L2TP_PACKAGE = "strongswan-full openssl-util xl2tpd"
NAT6_PACKAGE = "ip6tables kmod-ipt-nat6"
CAPTIVE_PORTAL_PACKAGE = "opennds php7-cli php7-mod-openssl php7-cgi php7"
MDNS_PACKAGE = "avahi-utils avahi-daemon-service-http avahi-daemon-service-ssh libavahi-client avahi-dbus-daemon"
STUNNEL_CONFIG_PATH = "/etc/stunnel/DoTServer.conf"
HISTORY_CONFIG_PATH = "/etc/dirty_configs"
PPTPD_OPTION_PATH = "/etc/ppp/options.pptpd"
XL2TPD_CONFIG_PATH = "/etc/xl2tpd/xl2tpd.conf"
XL2TPD_OPTION_CONFIG_PATH = "/etc/ppp/options.xl2tpd"
FIREWALL_CUSTOM_OPTION_PATH = "/etc/firewall.user"
PPP_CHAP_SECRET_PATH = "/etc/ppp/chap-secrets"
IKEV2_VPN_CERT_KEYS_PATH = "/var/ikev2_cert.sh"
TCPDUMP_DIR = "/tmp/tcpdump/"
LOCALHOST = "192.168.1.1"
DEFAULT_PACKAGE_INSTALL_TIMEOUT = 200
class NetworkSettings(object):
"""Class for network settings.
Attributes:
ssh: ssh connection object.
ssh_settings: ssh settings for AccessPoint.
service_manager: Object manage service configuration.
user: username for ssh.
ip: ip address for AccessPoint.
log: Logging object for AccessPoint.
config: A list to store changes on network settings.
firewall_rules_list: A list of firewall rule name list.
l2tp: profile for vpn l2tp server.
"""
def __init__(self, ssh, ssh_settings, logger):
"""Initialize wireless settings.
Args:
ssh: ssh connection object.
ssh_settings: ssh settings for AccessPoint.
logger: Logging object for AccessPoint.
"""
self.ssh = ssh
self.service_manager = ServiceManager(ssh)
self.ssh_settings = ssh_settings
self.user = self.ssh_settings.username
self.ip = self.ssh_settings.hostname
self.log = logger
self.config = set()
self.firewall_rules_list = []
# This map contains cleanup functions to restore the configuration to
# its default state. We write these keys to HISTORY_CONFIG_PATH prior to
# making any changes to that subsystem.
# This makes it easier to recover after an aborted test.
self.update_firewall_rules_list()
self.cleanup_network_settings()
self.clear_tcpdump()
def cleanup_network_settings(self):
"""Reset all changes on Access point."""
# Detect if any changes that is not clean up.
if self.file_exists(HISTORY_CONFIG_PATH):
out = self.ssh.run(f"cat {HISTORY_CONFIG_PATH}").stdout
if out:
self.config = set(out.split("\n"))
if self.config:
temp = self.config.copy()
for change in temp:
change_list = change.split()
command = change_list[0]
args = change_list[1:]
if command == "setup_dns_server":
self.remove_dns_server()
elif command == "setup_vpn_pptp_server":
self.remove_vpn_pptp_server()
elif command == "setup_vpn_l2tp_server":
self.remove_vpn_l2tp_server()
elif command == "disable_ipv6":
self.enable_ipv6()
elif command == "setup_ipv6_bridge":
self.remove_ipv6_bridge()
elif command == "default_dns":
addr_list = str(change_list[1])
self.del_default_dns(addr_list)
elif command == "default_v6_dns":
addr_list = str(change_list[1])
self.del_default_v6_dns(addr_list)
elif command == "ipv6_prefer_option":
self.remove_ipv6_prefer_option()
elif command == "block_dns_response":
self.unblock_dns_response()
elif command == "setup_mdns":
self.remove_mdns()
elif command == "add_dhcp_rapid_commit":
self.remove_dhcp_rapid_commit()
elif command == "setup_captive_portal":
try:
fas_port = int(change_list[1])
except IndexError:
fas_port = 1000
self.remove_cpative_portal(fas_port)
else:
raise TypeError(f'Unknown command "{change}"')
self.config = set()
if self.file_exists(HISTORY_CONFIG_PATH):
out = self.ssh.run(f"cat {HISTORY_CONFIG_PATH}").stdout
if not out:
self.ssh.run(f"rm {HISTORY_CONFIG_PATH}")
def commit_changes(self):
"""Apply changes on Access point."""
self.ssh.run("uci commit")
self.service_manager.restart_services()
self.create_config_file("\n".join(self.config), HISTORY_CONFIG_PATH)
def package_install(self, package_list):
"""Install packages on OpenWrtAP via opkg If not installed.
Args:
package_list: package list to install.
e.g. "pptpd kmod-mppe kmod-nf-nathelper-extra"
"""
self.ssh.run("opkg update")
for package_name in package_list.split(" "):
if not self._package_installed(package_name):
self.ssh.run(
f"opkg install {package_name}",
timeout=DEFAULT_PACKAGE_INSTALL_TIMEOUT,
)
self.log.info(f"Package: {package_name} installed.")
else:
self.log.info(f"Package: {package_name} skipped (already installed).")
def package_remove(self, package_list):
"""Remove packages on OpenWrtAP via opkg If existed.
Args:
package_list: package list to remove.
"""
for package_name in package_list.split(" "):
if self._package_installed(package_name):
self.ssh.run(f"opkg remove {package_name}")
self.log.info(f"Package: {package_name} removed.")
else:
self.log.info(f"No exist package {package_name} found.")
def _package_installed(self, package_name):
"""Check if target package installed on OpenWrtAP.
Args:
package_name: package name want to check.
Returns:
True if installed.
"""
if self.ssh.run(f"opkg list-installed {package_name}").stdout:
return True
return False
def file_exists(self, abs_file_path):
"""Check if target file exist on specific path on OpenWrt.
Args:
abs_file_path: Absolute path for the file.
Returns:
True if Existed.
"""
path, file_name = abs_file_path.rsplit("/", 1)
if self.ssh.run(f"ls {path} | grep {file_name}", ignore_status=True).stdout:
return True
return False
def path_exists(self, abs_path):
"""Check if dir exist on OpenWrt.
Args:
abs_path: absolutely path for create folder.
"""
try:
self.ssh.run(f"ls {abs_path}")
except:
return False
return True
def create_folder(self, abs_path):
"""If dir not exist, create it.
Args:
abs_path: absolutely path for create folder.
"""
if not self.path_exists(abs_path):
self.ssh.run(f"mkdir {abs_path}")
else:
self.log.info(f"{abs_path} already existed.")
def count(self, config, key):
"""Count in uci config.
Args:
config: config or section to research
key: keywords to e.g. rule, domain
Returns:
Numbers of the count.
"""
count = self.ssh.run(
f"uci show {config} | grep ={key}", ignore_status=True
).stdout
return len(count.split("\n"))
def create_config_file(self, config, file_path):
"""Create config file. Overwrite if file already exist.
Args:
config: A string of content of config.
file_path: Config's abs_path.
"""
self.ssh.run(f'echo -e "{config}" > {file_path}')
def replace_config_option(self, old_option, new_option, file_path):
"""Replace config option if pattern match.
If find match pattern with old_option, then replace it with new_option.
Else add new_option to the file.
Args:
old_option: the regexp pattern to replace.
new_option: the option to add.
file_path: Config's abs_path.
"""
config = self.ssh.run(f"cat {file_path}").stdout
config, count = re.subn(old_option, new_option, config)
if not count:
config = f"{config}\n{new_option}"
self.create_config_file(config, file_path)
def remove_config_option(self, option, file_path):
"""Remove option from config file.
Args:
option: Option to remove. Support regular expression.
file_path: Config's abs_path.
Returns:
Boolean for find option to remove.
"""
config = self.ssh.run(f"cat {file_path}").stdout.split("\n")
for line in config:
count = re.subn(option, "", line)[1]
if count > 0:
config.remove(line)
self.create_config_file("\n".join(config), file_path)
return True
self.log.warning("No match option to remove.")
return False
def setup_dns_server(self, domain_name):
"""Setup DNS server on OpenWrtAP.
Args:
domain_name: Local dns domain name.
"""
self.config.add("setup_dns_server")
self.log.info(f"Setup DNS server with domain name {domain_name}")
self.ssh.run(f"uci set dhcp.@dnsmasq[0].local='/{domain_name}/'")
self.ssh.run(f"uci set dhcp.@dnsmasq[0].domain='{domain_name}'")
self.add_resource_record(domain_name, self.ip)
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
# Check stunnel package is installed
self.package_install("stunnel")
self.service_manager.stop(SERVICE_STUNNEL)
self.service_manager.disable(SERVICE_STUNNEL)
# Enable stunnel
self.create_stunnel_config()
self.ssh.run("stunnel /etc/stunnel/DoTServer.conf")
def remove_dns_server(self):
"""Remove DNS server on OpenWrtAP."""
if self.file_exists("/var/run/stunnel.pid"):
self.ssh.run("kill $(cat /var/run/stunnel.pid)")
self.ssh.run("uci set dhcp.@dnsmasq[0].local='/lan/'")
self.ssh.run("uci set dhcp.@dnsmasq[0].domain='lan'")
self.clear_resource_record()
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.config.discard("setup_dns_server")
self.commit_changes()
def add_resource_record(self, domain_name, domain_ip):
"""Add resource record.
Args:
domain_name: A string for domain name.
domain_ip: A string for domain ip.
"""
self.ssh.run("uci add dhcp domain")
self.ssh.run(f"uci set dhcp.@domain[-1].name='{domain_name}'")
self.ssh.run(f"uci set dhcp.@domain[-1].ip='{domain_ip}'")
self.service_manager.need_restart(SERVICE_DNSMASQ)
def del_resource_record(self):
"""Delete the last resource record."""
self.ssh.run("uci delete dhcp.@domain[-1]")
self.service_manager.need_restart(SERVICE_DNSMASQ)
def clear_resource_record(self):
"""Delete the all resource record."""
rr = self.ssh.run("uci show dhcp | grep =domain", ignore_status=True).stdout
if rr:
for _ in rr.split("\n"):
self.del_resource_record()
self.service_manager.need_restart(SERVICE_DNSMASQ)
def create_stunnel_config(self):
"""Create config for stunnel service."""
stunnel_config = [
"pid = /var/run/stunnel.pid",
"[dns]",
"accept = 853",
"connect = 127.0.0.1:53",
"cert = /etc/stunnel/fullchain.pem",
"key = /etc/stunnel/privkey.pem",
]
config_string = "\n".join(stunnel_config)
self.create_config_file(config_string, STUNNEL_CONFIG_PATH)
def setup_vpn_pptp_server(self, local_ip, user, password):
"""Setup pptp vpn server on OpenWrt.
Args:
local_ip: local pptp server ip address.
user: username for pptp user.
password: password for pptp user.
"""
# Install pptp service
self.package_install(PPTP_PACKAGE)
self.config.add("setup_vpn_pptp_server")
# Edit /etc/config/pptpd & /etc/ppp/options.pptpd
self.setup_pptpd(local_ip, user, password)
# Edit /etc/config/firewall & /etc/firewall.user
self.setup_firewall_rules_for_pptp()
# Enable service
self.service_manager.enable(SERVICE_PPTPD)
self.service_manager.need_restart(SERVICE_PPTPD)
self.service_manager.need_restart(SERVICE_FIREWALL)
self.commit_changes()
def remove_vpn_pptp_server(self):
"""Remove pptp vpn server on OpenWrt."""
# Edit /etc/config/pptpd
self.restore_pptpd()
# Edit /etc/config/firewall & /etc/firewall.user
self.restore_firewall_rules_for_pptp()
# Disable service
self.service_manager.disable(SERVICE_PPTPD)
self.service_manager.need_restart(SERVICE_PPTPD)
self.service_manager.need_restart(SERVICE_FIREWALL)
self.config.discard("setup_vpn_pptp_server")
self.commit_changes()
self.package_remove(PPTP_PACKAGE)
self.ssh.run("rm /etc/ppp/options.pptpd")
self.ssh.run("rm /etc/config/pptpd")
def setup_pptpd(self, local_ip, username, password, ms_dns="8.8.8.8"):
"""Setup pptpd config for ip addr and account.
Args:
local_ip: vpn server address
username: pptp vpn username
password: pptp vpn password
ms_dns: DNS server
"""
# Calculate remote ip address
# e.g. local_ip = 10.10.10.9
# remote_ip = 10.10.10.10 -250
remote_ip = local_ip.split(".")
remote_ip.append(str(int(remote_ip.pop(-1)) + 1))
remote_ip = ".".join(remote_ip)
# Enable pptp service and set ip addr
self.ssh.run("uci set pptpd.pptpd.enabled=1")
self.ssh.run(f"uci set pptpd.pptpd.localip='{local_ip}'")
self.ssh.run(f"uci set pptpd.pptpd.remoteip='{remote_ip}-250'")
# Setup pptp service account
self.ssh.run(f"uci set pptpd.@login[0].username='{username}'")
self.ssh.run(f"uci set pptpd.@login[0].password='{password}'")
self.service_manager.need_restart(SERVICE_PPTPD)
self.replace_config_option(
r"#*ms-dns \d+.\d+.\d+.\d+", f"ms-dns {ms_dns}", PPTPD_OPTION_PATH
)
self.replace_config_option("(#no)*proxyarp", "proxyarp", PPTPD_OPTION_PATH)
def restore_pptpd(self):
"""Disable pptpd."""
self.ssh.run("uci set pptpd.pptpd.enabled=0")
self.remove_config_option(r"\S+ pptp-server \S+ \*", PPP_CHAP_SECRET_PATH)
self.service_manager.need_restart(SERVICE_PPTPD)
def setup_vpn_l2tp_server(
self,
vpn_server_hostname,
vpn_server_address,
vpn_username,
vpn_password,
psk_secret,
server_name,
country,
org,
):
"""Setup l2tp vpn server on OpenWrt.
Args:
vpn_server_hostname: vpn server domain name
vpn_server_address: vpn server addr
vpn_username: vpn account
vpn_password: vpn password
psk_secret: psk for ipsec
server_name: vpn server name for register in OpenWrt
country: country code for generate cert keys.
org: Organization name for generate cert keys.
"""
self.l2tp = network_const.VpnL2tp(
vpn_server_hostname,
vpn_server_address,
vpn_username,
vpn_password,
psk_secret,
server_name,
)
self.package_install(L2TP_PACKAGE)
self.config.add("setup_vpn_l2tp_server")
# /etc/strongswan.conf: Strongswan configuration file
self.setup_strongswan()
# /etc/ipsec.conf /etc/ipsec.secrets
self.setup_ipsec()
# /etc/xl2tpd/xl2tpd.conf & /etc/ppp/options.xl2tpd
self.setup_xl2tpd()
# /etc/ppp/chap-secrets
self.setup_ppp_secret()
# /etc/config/firewall & /etc/firewall.user
self.setup_firewall_rules_for_l2tp()
# setup vpn server local ip
self.setup_vpn_local_ip()
# generate cert and key for rsa
if self.l2tp.name == "ikev2-server":
self.generate_ikev2_vpn_cert_keys(country, org)
self.add_resource_record(self.l2tp.hostname, LOCALHOST)
else:
self.generate_vpn_cert_keys(country, org)
# restart service
self.service_manager.need_restart(SERVICE_IPSEC)
self.service_manager.need_restart(SERVICE_XL2TPD)
self.service_manager.need_restart(SERVICE_FIREWALL)
self.commit_changes()
def remove_vpn_l2tp_server(self):
"""Remove l2tp vpn server on OpenWrt."""
self.config.discard("setup_vpn_l2tp_server")
self.restore_firewall_rules_for_l2tp()
self.remove_vpn_local_ip()
if self.l2tp.name == "ikev2-server":
self.clear_resource_record()
self.service_manager.need_restart(SERVICE_IPSEC)
self.service_manager.need_restart(SERVICE_XL2TPD)
self.service_manager.need_restart(SERVICE_FIREWALL)
self.commit_changes()
self.package_remove(L2TP_PACKAGE)
if hasattr(self, "l2tp"):
delattr(self, "l2tp")
def setup_strongswan(self, dns="8.8.8.8"):
"""Setup strongswan config."""
config = [
"charon {",
" load_modular = yes",
" plugins {",
" include strongswan.d/charon/*.conf",
" }",
f" dns1={dns}",
"}",
]
self.create_config_file("\n".join(config), "/etc/strongswan.conf")
def setup_ipsec(self):
"""Setup ipsec config."""
config: list[str] = []
def load_ipsec_config(data, rightsourceip=False):
for i in data.keys():
config.append(i)
for j in data[i].keys():
config.append(f"\t {j}={data[i][j]}")
if rightsourceip:
config.append(
f"\t rightsourceip={self.l2tp.address.rsplit('.', 1)[0]}.16/26"
)
config.append("")
load_ipsec_config(network_const.IPSEC_IKEV2_MSCHAPV2, True)
load_ipsec_config(network_const.IPSEC_IKEV2_PSK, True)
load_ipsec_config(network_const.IPSEC_IKEV2_RSA, True)
load_ipsec_config(network_const.IPSEC_IKEV2_MSCHAPV2_HOSTNAME, True)
load_ipsec_config(network_const.IPSEC_IKEV2_PSK_HOSTNAME, True)
load_ipsec_config(network_const.IPSEC_IKEV2_RSA_HOSTNAME, True)
load_ipsec_config(network_const.IPSEC_CONF)
load_ipsec_config(network_const.IPSEC_L2TP_PSK)
load_ipsec_config(network_const.IPSEC_L2TP_RSA)
load_ipsec_config(network_const.IPSEC_HYBRID_RSA, True)
load_ipsec_config(network_const.IPSEC_XAUTH_PSK, True)
load_ipsec_config(network_const.IPSEC_XAUTH_RSA, True)
self.create_config_file("\n".join(config), "/etc/ipsec.conf")
ipsec_secret = []
ipsec_secret.append(r": PSK \"%s\"" % self.l2tp.psk_secret)
ipsec_secret.append(r": RSA \"%s\"" % "serverKey.der")
ipsec_secret.append(
r"%s : XAUTH \"%s\"" % (self.l2tp.username, self.l2tp.password)
)
self.create_config_file("\n".join(ipsec_secret), "/etc/ipsec.secrets")
def setup_xl2tpd(self, ip_range=20):
"""Setup xl2tpd config."""
net_id, host_id = self.l2tp.address.rsplit(".", 1)
xl2tpd_conf = list(network_const.XL2TPD_CONF_GLOBAL)
xl2tpd_conf.append(f"auth file = {PPP_CHAP_SECRET_PATH}")
xl2tpd_conf.extend(network_const.XL2TPD_CONF_INS)
xl2tpd_conf.append(
f"ip range = {net_id}.{host_id}-{net_id}.{str(int(host_id) + ip_range)}"
)
xl2tpd_conf.append(f"local ip = {self.l2tp.address}")
xl2tpd_conf.append(f"name = {self.l2tp.name}")
xl2tpd_conf.append(f"pppoptfile = {XL2TPD_OPTION_CONFIG_PATH}")
self.create_config_file("\n".join(xl2tpd_conf), XL2TPD_CONFIG_PATH)
xl2tpd_option = list(network_const.XL2TPD_OPTION)
xl2tpd_option.append(f"name {self.l2tp.name}")
self.create_config_file("\n".join(xl2tpd_option), XL2TPD_OPTION_CONFIG_PATH)
def setup_ppp_secret(self):
self.replace_config_option(
r"\S+ %s \S+ \*" % self.l2tp.name,
f"{self.l2tp.username} {self.l2tp.name} {self.l2tp.password} *",
PPP_CHAP_SECRET_PATH,
)
def generate_vpn_cert_keys(self, country, org):
"""Generate cert and keys for vpn server."""
rsa = "--type rsa"
lifetime = "--lifetime 365"
size = "--size 4096"
self.ssh.run(f"ipsec pki --gen {rsa} {size} --outform der > caKey.der")
self.ssh.run(
"ipsec pki --self --ca %s --in caKey.der %s --dn "
'"C=%s, O=%s, CN=%s" --outform der > caCert.der'
% (lifetime, rsa, country, org, self.l2tp.hostname)
)
self.ssh.run(f"ipsec pki --gen {size} {rsa} --outform der > serverKey.der")
self.ssh.run(
"ipsec pki --pub --in serverKey.der %s | ipsec pki "
"--issue %s --cacert caCert.der --cakey caKey.der "
'--dn "C=%s, O=%s, CN=%s" --san %s --flag serverAuth'
" --flag ikeIntermediate --outform der > serverCert.der"
% (rsa, lifetime, country, org, self.l2tp.hostname, LOCALHOST)
)
self.ssh.run(f"ipsec pki --gen {size} {rsa} --outform der > clientKey.der")
self.ssh.run(
"ipsec pki --pub --in clientKey.der %s | ipsec pki "
"--issue %s --cacert caCert.der --cakey caKey.der "
'--dn "C=%s, O=%s, CN=%s@%s" --outform der > '
"clientCert.der"
% (rsa, lifetime, country, org, self.l2tp.username, self.l2tp.hostname)
)
self.ssh.run(
"openssl rsa -inform DER -in clientKey.der"
" -out clientKey.pem -outform PEM"
)
self.ssh.run(
"openssl x509 -inform DER -in clientCert.der"
" -out clientCert.pem -outform PEM"
)
self.ssh.run(
"openssl x509 -inform DER -in caCert.der" " -out caCert.pem -outform PEM"
)
self.ssh.run(
"openssl pkcs12 -in clientCert.pem -inkey clientKey.pem"
" -certfile caCert.pem -export -out clientPkcs.p12 -passout pass:"
)
self.ssh.run("mv caCert.pem /etc/ipsec.d/cacerts/")
self.ssh.run("mv *Cert* /etc/ipsec.d/certs/")
self.ssh.run("mv *Key* /etc/ipsec.d/private/")
if not self.path_exists("/www/downloads/"):
self.ssh.run("mkdir /www/downloads/")
self.ssh.run("mv clientPkcs.p12 /www/downloads/")
self.ssh.run("chmod 664 /www/downloads/clientPkcs.p12")
def generate_ikev2_vpn_cert_keys(self, country, org):
rsa = "--type rsa"
lifetime = "--lifetime 365"
size = "--size 4096"
if not self.path_exists("/www/downloads/"):
self.ssh.run("mkdir /www/downloads/")
ikev2_vpn_cert_keys = [
f"ipsec pki --gen {rsa} {size} --outform der > caKey.der",
"ipsec pki --self --ca %s --in caKey.der %s --dn "
'"C=%s, O=%s, CN=%s" --outform der > caCert.der'
% (lifetime, rsa, country, org, self.l2tp.hostname),
f"ipsec pki --gen {size} {rsa} --outform der > serverKey.der",
"ipsec pki --pub --in serverKey.der %s | ipsec pki --issue %s "
r"--cacert caCert.der --cakey caKey.der --dn \"C=%s, O=%s, CN=%s\" "
"--san %s --san %s --flag serverAuth --flag ikeIntermediate "
"--outform der > serverCert.der"
% (
rsa,
lifetime,
country,
org,
self.l2tp.hostname,
LOCALHOST,
self.l2tp.hostname,
),
f"ipsec pki --gen {size} {rsa} --outform der > clientKey.der",
"ipsec pki --pub --in clientKey.der %s | ipsec pki --issue %s "
r"--cacert caCert.der --cakey caKey.der --dn \"C=%s, O=%s, CN=%s@%s\" "
r"--san \"%s\" --san \"%s@%s\" --san \"%s@%s\" --outform der "
"> clientCert.der"
% (
rsa,
lifetime,
country,
org,
self.l2tp.username,
self.l2tp.hostname,
self.l2tp.username,
self.l2tp.username,
LOCALHOST,
self.l2tp.username,
self.l2tp.hostname,
),
"openssl rsa -inform DER -in clientKey.der "
"-out clientKey.pem -outform PEM",
"openssl x509 -inform DER -in clientCert.der "
"-out clientCert.pem -outform PEM",
"openssl x509 -inform DER -in caCert.der " "-out caCert.pem -outform PEM",
"openssl pkcs12 -in clientCert.pem -inkey clientKey.pem "
"-certfile caCert.pem -export -out clientPkcs.p12 -passout pass:",
"mv caCert.pem /etc/ipsec.d/cacerts/",
"mv *Cert* /etc/ipsec.d/certs/",
"mv *Key* /etc/ipsec.d/private/",
"mv clientPkcs.p12 /www/downloads/",
"chmod 664 /www/downloads/clientPkcs.p12",
]
file_string = "\n".join(ikev2_vpn_cert_keys)
self.create_config_file(file_string, IKEV2_VPN_CERT_KEYS_PATH)
self.ssh.run(f"chmod +x {IKEV2_VPN_CERT_KEYS_PATH}")
self.ssh.run(f"{IKEV2_VPN_CERT_KEYS_PATH}")
def update_firewall_rules_list(self):
"""Update rule list in /etc/config/firewall."""
new_rules_list = []
for i in range(self.count("firewall", "rule")):
rule = self.ssh.run(f"uci get firewall.@rule[{i}].name").stdout
new_rules_list.append(rule)
self.firewall_rules_list = new_rules_list
def setup_firewall_rules_for_pptp(self):
"""Setup firewall for vpn pptp server."""
self.update_firewall_rules_list()
if "pptpd" not in self.firewall_rules_list:
self.ssh.run("uci add firewall rule")
self.ssh.run("uci set firewall.@rule[-1].name='pptpd'")
self.ssh.run("uci set firewall.@rule[-1].target='ACCEPT'")
self.ssh.run("uci set firewall.@rule[-1].proto='tcp'")
self.ssh.run("uci set firewall.@rule[-1].dest_port='1723'")
self.ssh.run("uci set firewall.@rule[-1].family='ipv4'")
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
if "GRP" not in self.firewall_rules_list:
self.ssh.run("uci add firewall rule")
self.ssh.run("uci set firewall.@rule[-1].name='GRP'")
self.ssh.run("uci set firewall.@rule[-1].target='ACCEPT'")
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
self.ssh.run("uci set firewall.@rule[-1].proto='47'")
iptable_rules = list(network_const.FIREWALL_RULES_FOR_PPTP)
self.add_custom_firewall_rules(iptable_rules)
self.service_manager.need_restart(SERVICE_FIREWALL)
def restore_firewall_rules_for_pptp(self):
"""Restore firewall for vpn pptp server."""
self.update_firewall_rules_list()
if "pptpd" in self.firewall_rules_list:
self.ssh.run(
f"uci del firewall.@rule[{self.firewall_rules_list.index('pptpd')}]"
)
self.update_firewall_rules_list()
if "GRP" in self.firewall_rules_list:
self.ssh.run(
f"uci del firewall.@rule[{self.firewall_rules_list.index('GRP')}]"
)
self.remove_custom_firewall_rules()
self.service_manager.need_restart(SERVICE_FIREWALL)
def setup_firewall_rules_for_l2tp(self):
"""Setup firewall for vpn l2tp server."""
self.update_firewall_rules_list()
if "ipsec esp" not in self.firewall_rules_list:
self.ssh.run("uci add firewall rule")
self.ssh.run("uci set firewall.@rule[-1].name='ipsec esp'")
self.ssh.run("uci set firewall.@rule[-1].target='ACCEPT'")
self.ssh.run("uci set firewall.@rule[-1].proto='esp'")
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
if "ipsec nat-t" not in self.firewall_rules_list:
self.ssh.run("uci add firewall rule")
self.ssh.run("uci set firewall.@rule[-1].name='ipsec nat-t'")
self.ssh.run("uci set firewall.@rule[-1].target='ACCEPT'")
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
self.ssh.run("uci set firewall.@rule[-1].proto='udp'")
self.ssh.run("uci set firewall.@rule[-1].dest_port='4500'")
if "auth header" not in self.firewall_rules_list:
self.ssh.run("uci add firewall rule")
self.ssh.run("uci set firewall.@rule[-1].name='auth header'")
self.ssh.run("uci set firewall.@rule[-1].target='ACCEPT'")
self.ssh.run("uci set firewall.@rule[-1].src='wan'")
self.ssh.run("uci set firewall.@rule[-1].proto='ah'")
net_id = self.l2tp.address.rsplit(".", 1)[0]
iptable_rules = list(network_const.FIREWALL_RULES_FOR_L2TP)
iptable_rules.append(f"iptables -A FORWARD -s {net_id}.0/24 -j ACCEPT")
iptable_rules.append(
f"iptables -t nat -A POSTROUTING -s {net_id}.0/24 -o eth0.2 -j MASQUERADE"
)
self.add_custom_firewall_rules(iptable_rules)
self.service_manager.need_restart(SERVICE_FIREWALL)
def restore_firewall_rules_for_l2tp(self):
"""Restore firewall for vpn l2tp server."""
self.update_firewall_rules_list()
if "ipsec esp" in self.firewall_rules_list:
self.ssh.run(
f"uci del firewall.@rule[{self.firewall_rules_list.index('ipsec esp')}]"
)
self.update_firewall_rules_list()
if "ipsec nat-t" in self.firewall_rules_list:
self.ssh.run(
"uci del firewall.@rule[%s]"
% self.firewall_rules_list.index("ipsec nat-t")
)
self.update_firewall_rules_list()
if "auth header" in self.firewall_rules_list:
self.ssh.run(
"uci del firewall.@rule[%s]"
% self.firewall_rules_list.index("auth header")
)
self.remove_custom_firewall_rules()
self.service_manager.need_restart(SERVICE_FIREWALL)
def add_custom_firewall_rules(self, rules):
"""Backup current custom rules and replace with arguments.
Args:
rules: A list of iptable rules to apply.
"""
backup_file_path = f"{FIREWALL_CUSTOM_OPTION_PATH}.backup"
if not self.file_exists(backup_file_path):
self.ssh.run(f"mv {FIREWALL_CUSTOM_OPTION_PATH} {backup_file_path}")
for rule in rules:
self.ssh.run(f"echo {rule} >> {FIREWALL_CUSTOM_OPTION_PATH}")
def remove_custom_firewall_rules(self):
"""Clean up and recover custom firewall rules."""
backup_file_path = f"{FIREWALL_CUSTOM_OPTION_PATH}.backup"
if self.file_exists(backup_file_path):
self.ssh.run(f"mv {backup_file_path} {FIREWALL_CUSTOM_OPTION_PATH}")
else:
self.log.debug(f"Did not find {backup_file_path}")
self.ssh.run(f"echo > {FIREWALL_CUSTOM_OPTION_PATH}")
def disable_pptp_service(self):
"""Disable pptp service."""
self.package_remove(PPTP_PACKAGE)
def setup_vpn_local_ip(self):
"""Setup VPN Server local ip on OpenWrt for client ping verify."""
self.ssh.run("uci set network.lan2=interface")
self.ssh.run("uci set network.lan2.type=bridge")
self.ssh.run("uci set network.lan2.ifname=eth1.2")
self.ssh.run("uci set network.lan2.proto=static")
self.ssh.run(f'uci set network.lan2.ipaddr="{self.l2tp.address}"')
self.ssh.run("uci set network.lan2.netmask=255.255.255.0")
self.ssh.run("uci set network.lan2=interface")
self.service_manager.reload(SERVICE_NETWORK)
self.commit_changes()
def remove_vpn_local_ip(self):
"""Discard vpn local ip on OpenWrt."""
self.ssh.run("uci delete network.lan2")
self.service_manager.reload(SERVICE_NETWORK)
self.commit_changes()
def enable_ipv6(self):
"""Enable ipv6 on OpenWrt."""
self.ssh.run("uci set network.lan.ipv6=1")
self.ssh.run("uci set network.wan.ipv6=1")
self.service_manager.enable("odhcpd")
self.service_manager.reload(SERVICE_NETWORK)
self.config.discard("disable_ipv6")
self.commit_changes()
def disable_ipv6(self):
"""Disable ipv6 on OpenWrt."""
self.config.add("disable_ipv6")
self.ssh.run("uci set network.lan.ipv6=0")
self.ssh.run("uci set network.wan.ipv6=0")
self.service_manager.disable("odhcpd")
self.service_manager.reload(SERVICE_NETWORK)
self.commit_changes()
def setup_ipv6_bridge(self):
"""Setup ipv6 bridge for client have ability to access network."""
self.config.add("setup_ipv6_bridge")
self.ssh.run("uci set dhcp.lan.dhcpv6=relay")
self.ssh.run("uci set dhcp.lan.ra=relay")
self.ssh.run("uci set dhcp.lan.ndp=relay")
self.ssh.run("uci set dhcp.wan6=dhcp")
self.ssh.run("uci set dhcp.wan6.dhcpv6=relay")
self.ssh.run("uci set dhcp.wan6.ra=relay")
self.ssh.run("uci set dhcp.wan6.ndp=relay")
self.ssh.run("uci set dhcp.wan6.master=1")
self.ssh.run("uci set dhcp.wan6.interface=wan6")
# Enable service
self.service_manager.need_restart(SERVICE_ODHCPD)
self.commit_changes()
def remove_ipv6_bridge(self):
"""Discard ipv6 bridge on OpenWrt."""
if "setup_ipv6_bridge" in self.config:
self.config.discard("setup_ipv6_bridge")
self.ssh.run("uci set dhcp.lan.dhcpv6=server")
self.ssh.run("uci set dhcp.lan.ra=server")
self.ssh.run("uci delete dhcp.lan.ndp")
self.ssh.run("uci delete dhcp.wan6")
self.service_manager.need_restart(SERVICE_ODHCPD)
self.commit_changes()
def _add_dhcp_option(self, args):
self.ssh.run(f'uci add_list dhcp.lan.dhcp_option="{args}"')
def _remove_dhcp_option(self, args):
self.ssh.run(f'uci del_list dhcp.lan.dhcp_option="{args}"')
def add_default_dns(self, addr_list):
"""Add default dns server for client.
Args:
addr_list: dns ip address for Openwrt client.
"""
self._add_dhcp_option(f'6,{",".join(addr_list)}')
self.config.add(f"default_dns {addr_list}")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def del_default_dns(self, addr_list: str):
"""Remove default dns server for client.
Args:
addr_list: list of dns ip address for Openwrt client.
"""
self._remove_dhcp_option(f"6,{addr_list}")
self.config.discard(f"default_dns {addr_list}")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def add_default_v6_dns(self, addr_list: str):
"""Add default v6 dns server for client.
Args:
addr_list: list of dns ip address for Openwrt client.
"""
self.ssh.run(f'uci add_list dhcp.lan.dns="{addr_list}"')
self.config.add(f"default_v6_dns {addr_list}")
self.service_manager.need_restart(SERVICE_ODHCPD)
self.commit_changes()
def del_default_v6_dns(self, addr_list: str):
"""Del default v6 dns server for client.
Args:
addr_list: list of dns ip address for Openwrt client.
"""
self.ssh.run(f'uci del_list dhcp.lan.dns="{addr_list}"')
self.config.add(f"default_v6_dns {addr_list}")
self.service_manager.need_restart(SERVICE_ODHCPD)
self.commit_changes()
def add_ipv6_prefer_option(self):
self._add_dhcp_option("108,1800i")
self.config.add("ipv6_prefer_option")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def remove_ipv6_prefer_option(self):
self._remove_dhcp_option("108,1800i")
self.config.discard("ipv6_prefer_option")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def add_dhcp_rapid_commit(self):
self.create_config_file("dhcp-rapid-commit\n", "/etc/dnsmasq.conf")
self.config.add("add_dhcp_rapid_commit")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def remove_dhcp_rapid_commit(self):
self.create_config_file("", "/etc/dnsmasq.conf")
self.config.discard("add_dhcp_rapid_commit")
self.service_manager.need_restart(SERVICE_DNSMASQ)
self.commit_changes()
def start_tcpdump(self, test_name, args="", interface="br-lan"):
""" "Start tcpdump on OpenWrt.
Args:
test_name: Test name for create tcpdump file name.
args: Option args for tcpdump.
interface: Interface to logging.
Returns:
tcpdump_file_name: tcpdump file name on OpenWrt.
pid: tcpdump process id.
"""
self.package_install("tcpdump")
if not self.path_exists(TCPDUMP_DIR):
self.ssh.run(f"mkdir {TCPDUMP_DIR}")
now = (time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())),)
tcpdump_file_name = f"openwrt_{test_name}_{now}.pcap"
tcpdump_file_path = f"{TCPDUMP_DIR}{tcpdump_file_name}"
cmd = f"tcpdump -i {interface} -s0 {args} -w {tcpdump_file_path}"
self.ssh.run_async(cmd)
pid = self._get_tcpdump_pid(tcpdump_file_name)
if not pid:
raise signals.TestFailure("Fail to start tcpdump on OpenWrt.")
# Set delay to prevent tcpdump fail to capture target packet.
time.sleep(15)
return tcpdump_file_name
def stop_tcpdump(self, tcpdump_file_name, pull_dir=None):
"""Stop tcpdump on OpenWrt and pull the pcap file.
Args:
tcpdump_file_name: tcpdump file name on OpenWrt.
pull_dir: Keep none if no need to pull.
Returns:
tcpdump abs_path on host.
"""
# Set delay to prevent tcpdump fail to capture target packet.
time.sleep(15)
pid = self._get_tcpdump_pid(tcpdump_file_name)
self.ssh.run(f"kill -9 {pid}", ignore_status=True)
if self.path_exists(TCPDUMP_DIR) and pull_dir:
tcpdump_path = f"{TCPDUMP_DIR}{tcpdump_file_name}"
tcpdump_remote_path = f"{pull_dir}/{tcpdump_file_name}"
tcpdump_local_path = f"{self.user}@{self.ip}:{tcpdump_path}"
utils.exe_cmd(f"scp {tcpdump_local_path} {tcpdump_remote_path}")
if self._get_tcpdump_pid(tcpdump_file_name):
raise signals.TestFailure("Failed to stop tcpdump on OpenWrt.")
if self.file_exists(tcpdump_path):
self.ssh.run(f"rm -f {tcpdump_path}")
return tcpdump_remote_path if pull_dir else None
def clear_tcpdump(self):
self.ssh.run("killall tcpdump", ignore_status=True)
if self.ssh.run("pgrep tcpdump", ignore_status=True).stdout:
raise signals.TestFailure("Failed to clean up tcpdump process.")
if self.path_exists(TCPDUMP_DIR):
self.ssh.run(f"rm -f {TCPDUMP_DIR}/*")
def _get_tcpdump_pid(self, tcpdump_file_name):
"""Check tcpdump process on OpenWrt."""
return self.ssh.run(f"pgrep -f {tcpdump_file_name}", ignore_status=True).stdout
def setup_mdns(self):
self.config.add("setup_mdns")
self.package_install(MDNS_PACKAGE)
self.commit_changes()
def remove_mdns(self):
self.config.discard("setup_mdns")
self.package_remove(MDNS_PACKAGE)
self.commit_changes()
def block_dns_response(self):
self.config.add("block_dns_response")
iptable_rules = list(network_const.FIREWALL_RULES_DISABLE_DNS_RESPONSE)
self.add_custom_firewall_rules(iptable_rules)
self.service_manager.need_restart(SERVICE_FIREWALL)
self.commit_changes()
def unblock_dns_response(self):
self.config.discard("block_dns_response")
self.remove_custom_firewall_rules()
self.service_manager.need_restart(SERVICE_FIREWALL)
self.commit_changes()
def setup_captive_portal(self, fas_fdqn, fas_port=2080):
"""Create captive portal with Forwarding Authentication Service.
Args:
fas_fdqn: String for captive portal page's fdqn add to local dns server.
fas_port: Port for captive portal page.
"""
self.package_install(CAPTIVE_PORTAL_PACKAGE)
self.config.add(f"setup_captive_portal {fas_port}")
self.ssh.run("uci set opennds.@opennds[0].fas_secure_enabled=2")
self.ssh.run("uci set opennds.@opennds[0].gatewayport=2050")
self.ssh.run(f"uci set opennds.@opennds[0].fasport={fas_port}")
self.ssh.run(f"uci set opennds.@opennds[0].fasremotefqdn={fas_fdqn}")
self.ssh.run('uci set opennds.@opennds[0].faspath="/nds/fas-aes.php"')
self.ssh.run("uci set opennds.@opennds[0].faskey=1234567890")
self.service_manager.need_restart(SERVICE_OPENNDS)
# Config uhttpd
self.ssh.run("uci set uhttpd.main.interpreter=.php=/usr/bin/php-cgi")
self.ssh.run(f"uci add_list uhttpd.main.listen_http=0.0.0.0:{fas_port}")
self.ssh.run(f"uci add_list uhttpd.main.listen_http=[::]:{fas_port}")
self.service_manager.need_restart(SERVICE_UHTTPD)
# cp fas-aes.php
self.create_folder("/www/nds/")
self.ssh.run("cp /etc/opennds/fas-aes.php /www/nds")
# Add fdqn
self.add_resource_record(fas_fdqn, LOCALHOST)
self.commit_changes()
def remove_cpative_portal(self, fas_port: int = 2080):
"""Remove captive portal.
Args:
fas_port: Port for captive portal page.
"""
# Remove package
self.package_remove(CAPTIVE_PORTAL_PACKAGE)
# Clean up config
self.ssh.run("rm /etc/config/opennds")
# Remove fdqn
self.clear_resource_record()
# Restore uhttpd
self.ssh.run("uci del uhttpd.main.interpreter")
self.ssh.run(f"uci del_list uhttpd.main.listen_http='0.0.0.0:{fas_port}'")
self.ssh.run(f"uci del_list uhttpd.main.listen_http='[::]:{fas_port}'")
self.service_manager.need_restart(SERVICE_UHTTPD)
# Clean web root
self.ssh.run("rm -r /www/nds")
self.config.discard(f"setup_captive_portal {fas_port}")
self.commit_changes()
class ServiceManager(object):
"""Class for service on OpenWrt.
Attributes:
ssh: ssh object for the AP.
_need_restart: Record service need to restart.
"""
def __init__(self, ssh):
self.ssh = ssh
self._need_restart = set()
def enable(self, service_name):
"""Enable service auto start."""
self.ssh.run(f"/etc/init.d/{service_name} enable")
def disable(self, service_name):
"""Disable service auto start."""
self.ssh.run(f"/etc/init.d/{service_name} disable")
def restart(self, service_name):
"""Restart the service."""
self.ssh.run(f"/etc/init.d/{service_name} restart")
def reload(self, service_name):
"""Restart the service."""
self.ssh.run(f"/etc/init.d/{service_name} reload")
def restart_services(self):
"""Restart all services need to restart."""
for service in self._need_restart:
if service == SERVICE_NETWORK:
self.reload(service)
self.restart(service)
self._need_restart = set()
def stop(self, service_name):
"""Stop the service."""
self.ssh.run(f"/etc/init.d/{service_name} stop")
def need_restart(self, service_name):
self._need_restart.add(service_name)