blob: b2c577a903d8393ccb0bb4a16f9bc01402aac1e1 [file] [log] [blame]
#!/usr/bin/env python3
""" swtpm_localca.py
A tool for creating TPM 1.2 and TPM 2 certificates localy or using pkcs11
"""
# Disable a couple of warnings:
# R0911: Too many return statements (10/6) (too-many-return-statements)
# R0912: Too many branches (15/12) (too-many-branches)
# R0913: Too many arguments (14/5) (too-many-arguments)
# R0914: Too many local variables (21/15) (too-many-locals)
# R0915: Too many statements (57/50) (too-many-statements)
# W0703: Catching too general exception Exception (broad-except)
# pylint: disable=W0703,R0911,R0912,R0913,R0914,R0915
#
# swtpm_localca.py
#
# Authors: Stefan Berger <stefanb@linux.ibm.com>
#
# (c) Copyright IBM Corporation 2020
#
import codecs
import fcntl
import getopt
import getpass
import os
import re
import stat
import subprocess
import sys
import tempfile
from py_swtpm_localca.swtpm_localca_conf import SYSCONFDIR
from py_swtpm_localca.swtpm_utils import logit, logerr
# Some flags
SETUP_TPM2_F = 1
# for TPM 2 EK
ALLOW_SIGNING_F = 2
DECRYPTION_F = 4
XCH = os.getenv("XDG_CONFIG_HOME")
HOME = os.getenv("HOME")
LOCALCA_OPTIONS = "swtpm-localca.options"
if XCH and os.access(os.path.join(XCH, LOCALCA_OPTIONS), os.R_OK):
DEFAULT_LOCALCA_OPTIONS = os.path.join(XCH, LOCALCA_OPTIONS)
elif HOME and os.access(os.path.join(HOME, ".config", LOCALCA_OPTIONS), os.R_OK):
DEFAULT_LOCALCA_OPTIONS = os.path.join(HOME, ".config", LOCALCA_OPTIONS)
else:
DEFAULT_LOCALCA_OPTIONS = os.path.join(os.sep + SYSCONFDIR, LOCALCA_OPTIONS)
LOCALCA_CONFIG = "swtpm-localca.conf"
if XCH and os.access(os.path.join(XCH, LOCALCA_CONFIG), os.R_OK):
DEFAULT_LOCALCA_CONFIG = os.path.join(XCH, LOCALCA_CONFIG)
elif HOME and os.access(os.path.join(HOME, ".config", LOCALCA_CONFIG), os.R_OK):
DEFAULT_LOCALCA_CONFIG = os.path.join(HOME, ".config", LOCALCA_CONFIG)
else:
DEFAULT_LOCALCA_CONFIG = os.path.join(os.sep + SYSCONFDIR, LOCALCA_CONFIG)
# Default logging goes to stderr
LOGFILE = ""
def resolve_string(inp):
""" resolve environment variables in a string """
result = ""
sidx = 0
while True:
idx = inp.find("${", sidx)
if idx < 0:
if sidx == 0:
return inp
result += inp[sidx:]
return result
result += inp[sidx:idx]
eidx = inp.find("}", idx + 2)
if eidx < 0:
result += inp[idx:]
return result
result += os.getenv(inp[idx + 2:eidx], '')
sidx = eidx + 1
def get_config_value(lines, configname, default=None):
""" Get a config value from a list of strings """
regex = r'^' + configname + r"\s*=\s*([^#\n]*).*"
for line in lines:
match = re.match(regex, line)
if match:
return resolve_string(match.groups()[0])
return default
def get_config_envvars(lines):
""" Extract all environment variables from the config file and return a map.
Environment variable lines must start with 'env:' and must not contain
trailing spaces or a comment starting with '#' """
res = {}
regex = r"^env:([a-zA-Z_][a-zA-Z_0-9]*)\s*=\s*([^\n]*).*"
for line in lines:
match = re.match(regex, line)
if match:
try:
encoded = codecs.encode(match.group(2), "latin-1", "backslashreplace")
res[match.group(1)] = codecs.decode(encoded, "unicode_escape")
except Exception as err:
logerr(LOGFILE, "Invalid character in value of %s environment variable: %s\n" %
(match.group(1), str(err)))
return {}, 1
return res, 0
def write_file(filename, text):
""" Write some text to a file """
try:
fileobj = open(filename, "w")
fileobj.write(text)
fileobj.close()
return 0
except Exception as err:
logerr(LOGFILE, "Could not write to file %s: %s\n" % (filename, str(err)))
return 1
def read_file(filename):
""" read contents from a file """
try:
fobj = open(filename, mode='rb')
result = fobj.read()
fobj.close()
return result, 0
except Exception as err:
logerr(LOGFILE, "Could not read from file %s: %s\n" % (filename, str(err)))
return "", 1
def read_file_lines(filename):
""" Read the lines from a file and return a list of the lines """
try:
fobj = open(filename, 'r')
lines = fobj.readlines()
fobj.close()
return lines, 0
except Exception as err:
logerr(LOGFILE, "Could not read from file %s : %s\n" & (filename, str(err)))
return [], 1
def makedir(dirname, purpose):
""" Create a directory if it does not exist """
if not os.path.exists(dirname):
logit(LOGFILE, "Creating swtpm-local dir '%s'.\n" % dirname)
try:
os.makedirs(dirname)
except OSError as err:
logerr(LOGFILE, "Could not create directory for '%s': %s\n" % (purpose, str(err)))
return 1
return 0
def remove_file(filename, verbose=True):
""" remove a file """
if not os.path.exists(filename):
return 0
try:
os.remove(filename)
return 0
except Exception as err:
if verbose:
logerr(LOGFILE, "Could not remove file %s: %s\n" % (filename, str(err)))
return 1
def remove_files(filename_list):
""" remove files in a list of filenames """
for filename in filename_list:
remove_file(filename, verbose=False)
def get_certtool():
""" Get the name of the certtool to use """
if os.uname().sysname == "Darwin":
return "gnutls-certtool"
return "certtool"
def create_localca_cert(lockfile, statedir, signkey, signkey_password, issuercert):
""" Create the local CA's certificate if it doesn't already exist. """
try:
filedes = os.open(lockfile, os.O_RDWR|os.O_CREAT)
except Exception as err:
logerr(LOGFILE, "Could not open lockfile %s: %s\n" % (lockfile, str(err)))
return 1
try:
fcntl.flock(filedes, fcntl.LOCK_EX)
if not os.path.exists(statedir):
if makedir(statedir, "statedir") != 0:
return 1
if not os.access(signkey, os.R_OK) or not os.access(issuercert, os.R_OK):
directory = os.path.dirname(signkey)
cakey = os.path.join(directory, "swtpm-localca-rootca-privkey.pem")
cacert = os.path.join(directory, "swtpm-localca-rootca-cert.pem")
swtpm_rootca_password = os.getenv("SWTPM_ROOTCA_PASSWORD")
certtool = get_certtool()
# First the root CA
cmd = [certtool, "--generate-privkey", "--outfile", cakey]
if swtpm_rootca_password:
# neither env. variable nor template file work...
cmd.extend(["--password", swtpm_rootca_password])
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
if proc.returncode:
logerr(LOGFILE, "Could not create root-CA key %s\n" % cakey)
logerr(LOGFILE, "%s" % output.decode())
return 1
except Exception as err:
logerr(LOGFILE, "Could not create root-CA key %s: %s\n" % (cakey, str(err)))
return 1
os.chmod(cakey, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP)
temp = tempfile.NamedTemporaryFile()
try:
filecontent = \
"cn=swtpm-localca-rootca\n" \
"ca\n" \
"cert_signing_key\n" \
"expiration_days = 3650\n"
temp.write(filecontent.encode())
temp.seek(0)
cmd = [certtool,
"--generate-self-signed",
"--template", temp.name,
"--outfile", cacert,
"--load-privkey", cakey]
certtool_env = {
"PATH": os.getenv("PATH")
}
if swtpm_rootca_password:
certtool_env["GNUTLS_PIN"] = swtpm_rootca_password
try:
proc = subprocess.Popen(cmd, env=certtool_env,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
if proc.returncode:
logerr(LOGFILE, "Could not create root-CA\n")
logerr(LOGFILE, "%s" % output.decode())
remove_files([cakey, cacert])
return 1
except Exception as err:
logerr(LOGFILE, "Could not create root-CA: %s\n" % str(err))
remove_files([cakey, cacert])
return 1
finally:
temp.close()
# intermediate CA
cmd = [certtool, "--generate-privkey", "--outfile", signkey]
if signkey_password:
cmd.extend(["--password", signkey_password])
try:
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
if proc.returncode:
logerr(LOGFILE, "Could not create local-CA key %s\n" % signkey)
logerr(LOGFILE, "certtool failed: %s\n" % output.decode())
remove_files([cakey, cacert, signkey])
return 1
except Exception as err:
logerr(LOGFILE, "Could not create local-CA key %s: %s\n" % (signkey, str(err)))
remove_files([cakey, cacert, signkey])
return 1
os.chmod(signkey, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP)
temp = tempfile.NamedTemporaryFile()
try:
filecontent = \
"cn=swtpm-localca\n" \
"ca\n" \
"cert_signing_key\n" \
"expiration_days = 3650\n"
if swtpm_rootca_password and signkey_password:
filecontent += "password = %s\n" % swtpm_rootca_password
temp.write(filecontent.encode())
temp.seek(0)
cmd = [certtool,
"--generate-certificate",
"--template", temp.name,
"--outfile", issuercert,
"--load-privkey", signkey,
"--load-ca-privkey", cakey,
"--load-ca-certificate", cacert]
certtool_env = {
"PATH": os.getenv("PATH")
}
if signkey_password:
certtool_env["GNUTLS_PIN"] = signkey_password
elif swtpm_rootca_password:
certtool_env["GNUTLS_PIN"] = swtpm_rootca_password
try:
proc = subprocess.Popen(cmd, env=certtool_env,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
if proc.returncode:
logerr(LOGFILE, "Could not create local CA\n")
logerr(LOGFILE, "%s" % output.decode())
remove_files([cakey, cacert, signkey, issuercert])
return 1
except Exception as err:
logerr(LOGFILE, "Could not create local CA: %s\n" % str(err))
remove_files([cakey, cacert, signkey, issuercert])
return 1
finally:
temp.close()
finally:
os.close(filedes)
return 0
def get_next_cert_serial(certserial, lockfile):
""" Get the next serial number for a certificate """
try:
filedes = os.open(lockfile, os.O_RDWR|os.O_CREAT)
except Exception as err:
logerr(LOGFILE, "Could not open lockfile %s: %s\n" % (lockfile, str(err)))
return 1
try:
fcntl.flock(filedes, fcntl.LOCK_EX)
if not os.access(certserial, os.R_OK):
_ = write_file(certserial, "1")
serial, ret = read_file(certserial)
if ret != 0:
return "", 1
if not serial.decode().isnumeric():
serial_n = 1
else:
serial_n = int(serial) + 1
ret = write_file(certserial, "%d" % serial_n)
if ret != 0:
return "", 1
finally:
os.close(filedes)
return "%d" % serial_n, 0
def create_cert(flags, typ, directory, ekparams, vmid, tpm_spec_params, tpm_attr_params,
signkey, signkey_password, issuercert, parentkey_password, swtpm_cert_env,
certserial, lockfile, optsfile):
""" Create the certificate """
serial, ret = get_next_cert_serial(certserial, lockfile)
if ret != 0:
return 1
options = []
lines, _ = read_file_lines(optsfile)
for line in lines:
if not line.strip():
continue
options.extend([x.strip() for x in line.split(" ", 1)])
if vmid:
subj = "CN=%s" % vmid
else:
subj = "CN=unknown"
if flags & SETUP_TPM2_F:
options.append("--tpm2")
else:
options.append("--add-header")
if typ == "ek":
if flags & ALLOW_SIGNING_F:
options.append("--allow-signing")
if flags & DECRYPTION_F:
options.append("--decryption")
match = re.search(r'x=([0-9A-Fa-f]+),y=([0-9A-Fa-f]+)(,id=([^,]+))?', ekparams)
if match:
keyparams = ["--ecc-x", match.group(1), "--ecc-y", match.group(2)]
if match.group(4):
keyparams.extend(["--ecc-curveid", match.group(4)])
else:
keyparams = ["--modulus", ekparams]
cmd = ["swtpm_cert",
"--subject", subj]
cmd.extend(options)
temp1 = None
temp2 = None
if signkey_password:
temp1 = tempfile.NamedTemporaryFile()
temp1.write(signkey_password.encode())
temp1.seek(0)
cmd.extend(["--signkey-pwd", "file:%s" % temp1.name])
if parentkey_password:
temp2 = tempfile.NamedTemporaryFile()
temp2.write(parentkey_password.encode())
temp2.seek(0)
cmd.extend(["--parentkey-pwd", "file:%s" % temp2.name])
if typ == "ek":
cmd.extend(tpm_spec_params)
cmd.extend(tpm_attr_params)
if typ == "platform":
cmd.extend(["--type", "platform",
"--out-cert", os.path.join(directory, "platform.cert")])
else:
cmd.extend(["--out-cert", os.path.join(directory, "ek.cert")])
cmd.extend(keyparams)
cmd.extend(["--signkey", signkey,
"--issuercert", issuercert,
"--days", "3650",
"--serial", serial])
if typ == "ek":
certtype = "EK"
else:
certtype = "platform"
try:
proc = subprocess.Popen(cmd, env=swtpm_cert_env,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
if proc.returncode:
logerr(LOGFILE, "Could not create %s certificate locally\n" % certtype)
logerr(LOGFILE, "%s" % output.decode())
return 1
except Exception as err:
logerr(LOGFILE, "Could not run swtpm_cert: %s\n" % str(err))
return 1
finally:
if temp1:
temp1.close()
if temp2:
temp2.close()
logit(LOGFILE, "Successfully created %s certificate locally.\n" % certtype)
return 0
def usage(prgname):
""" Display usage """
print(
"Usage: {prgname} [options]\n"
"\n"
"The following options are supported:\n"
"\n"
"--type type The type of certificate to create: 'ek' or 'platform'\n"
"--ek key-param The modulus of an RSA key or x=...,y=,... for an EC key\n"
"--dir directory The directory to write the resulting certificate into\n"
"--vmid vmid The ID of the virtual machine\n"
"--optsfile file A file containing options to pass to swtpm_cert\n"
"--configfile file A file containing configuration parameters for directory,\n"
" signing key and password and certificate to use\n"
"--logfile file A file to write a log into\n"
"--tpm-spec-family s The implemented spec family, e.g., '2.0'\n"
"--tpm-spec-revision i The spec revision of the TPM as integer; e.g., 146\n"
"--tpm-spec-level i The spec level of the TPM; must be an integer; e.g. 0\n"
"--tpm-manufacturer s The manufacturer of the TPM; e.g., id:00001014\n"
"--tpm-model s The model of the TPM; e.g., 'swtpm'\n"
"--tpm-version i The (firmware) version of the TPM; e.g., id:20160511\n"
"--tpm2 Generate a certificate for a TPM 2\n"
"--allow-signing The TPM 2's EK can be used for signing\n"
"--decryption The TPM 2's EK can be used for decryption\n"
"--help, -h, -? Display this help screen and exit\n"
"\n"
"\n"
"The following environment variables are supported:\n"
"\n"
"SWTPM_ROOTCA_PASSWORD The root CA's private key password\n"
"\n".format_map({
'prgname': prgname,
}))
def main():
""" main function - parses command line parameters and low level dealing with them """
global LOGFILE # pylint: disable=W0603
try:
opts, _ = getopt.getopt(sys.argv[1:], "h?",
["type=",
"ek=",
"dir=",
"vmid=",
"optsfile=",
"configfile=",
"logfile=",
"tpm-spec-family=",
"tpm-spec-revision=",
"tpm-spec-level=",
"tpm-manufacturer=",
"tpm-model=",
"tpm-version=",
"tpm2",
"allow-signing",
"decryption",
"help"])
except getopt.GetoptError as err:
print(err)
usage(sys.argv[0])
sys.exit(1)
flags = 0
typ = ""
ekparams = ""
directory = ""
vmid = ""
optsfile = DEFAULT_LOCALCA_OPTIONS
configfile = DEFAULT_LOCALCA_CONFIG
tpm_spec_params = []
tpm_attr_params = []
for opt, arg in opts:
if opt == '--type':
typ = arg
elif opt == '--ek':
ekparams = arg
elif opt == '--dir':
directory = arg
elif opt == '--vmid':
vmid = arg
elif opt == '--optsfile':
optsfile = arg
elif opt == '--configfile':
configfile = arg
elif opt == '--logfile':
LOGFILE = arg
elif opt in ['--tpm-spec-family', '--tpm-spec-revision', '--tpm-spec-level']:
tpm_spec_params.extend([opt, arg])
elif opt in ['--tpm-manufacturer', '--tpm-model', '--tpm-version']:
tpm_attr_params.extend([opt, arg])
elif opt == '--tpm2':
flags |= SETUP_TPM2_F
elif opt == '--allow-signing':
flags |= ALLOW_SIGNING_F
elif opt == '--decryption':
flags |= DECRYPTION_F
elif opt in ['--help', '-h', '-?']:
usage(sys.argv[0])
sys.exit(0)
if len(LOGFILE) > 0:
if os.path.islink(LOGFILE):
sys.stderr.write("Logfile must not be a symlink.\n")
sys.exit(1)
try:
fobj = open(LOGFILE, "a") # do not truncate
fobj.close()
except PermissionError:
sys.stderr.write("Cannot write to logfile %s.\n", LOGFILE)
sys.exit(1)
if not os.access(optsfile, os.R_OK):
logerr(LOGFILE, "Need read rights on options file %s for user %s.\n" %
(optsfile, getpass.getuser()))
sys.exit(1)
if not os.access(configfile, os.R_OK):
logerr(LOGFILE, "Need read rights on options file %s for user %s.\n" %
(configfile, getpass.getuser()))
sys.exit(1)
lines, ret = read_file_lines(configfile)
if ret != 0:
sys.exit(1)
statedir = get_config_value(lines, "statedir")
if not statedir:
logerr(LOGFILE, "Missing 'statedir' config value in config file %s.\n" % configfile)
sys.exit(1)
if not os.access(statedir, os.W_OK | os.R_OK):
logerr(LOGFILE, "Need read/write rights on statedir %s for user %s.\n" %
(statedir, getpass.getuser()))
if makedir(statedir, "statedir") != 0:
sys.exit(1)
lockfile = os.path.join(statedir, ".lock.swtpm-localca")
if os.path.exists(lockfile) and not os.access(lockfile, os.W_OK | os.R_OK):
logerr(LOGFILE, "Need read/write rights on %s for user %s.\n" %
(lockfile, getpass.getuser()))
sys.exit(1)
signkey = get_config_value(lines, "signingkey")
if not signkey:
logerr(LOGFILE, "Missing 'signingkey' config value in config file %s.\n" % configfile)
sys.exit(1)
# SIGNKEY may be a GNUTLS url like tpmkey:file= or tpmkey:uuid=
if not signkey.startswith("tpmkey:file=") and \
not signkey.startswith("tpmkey:uuid=") and \
not signkey.startswith("pkcs11:"):
if makedir(os.path.dirname(signkey), "signkey") != 0:
sys.exit(1)
signkey_password = get_config_value(lines, "signingkey_password")
parentkey_password = get_config_value(lines, "parentkey_password")
issuercert = get_config_value(lines, 'issuercert')
if not issuercert:
logerr(LOGFILE, "Missing 'issuercert' config value in config file %s.\n" % configfile)
sys.exit(1)
if makedir(os.path.dirname(issuercert), "issuercert") != 0:
sys.exit(1)
# environment needed for calling swtpm_cert
swtpm_cert_env = os.environ
# TPM keys are GNUTLS URIs...
if signkey.startswith("tpmkey:file=") or signkey.startswith("tpmkey:uuid="):
tss_tcsd_hostname = get_config_value(lines, "TSS_TCSD_HOSTNAME", "localhost")
tss_tcsd_port = get_config_value(lines, "TSS_TCSD_PORT", 30003)
swtpm_cert_env["TSS_TCSD_HOSTNAME"] = tss_tcsd_hostname
swtpm_cert_env["TSS_TCSD_PORT"] = tss_tcsd_port
logit(LOGFILE, "CA uses a GnuTLS TPM key; using TSS_TCSD_HOSTNAME=%s " \
"TSS_TCSD_PORT=%s\n" % (tss_tcsd_hostname, tss_tcsd_port))
elif signkey.startswith("pkcs11:"):
signkey = signkey.replace(r"\;", ";")
if signkey_password:
swtpm_cert_env["SWTPM_PKCS11_PIN"] = signkey_password
logit(LOGFILE, "CA uses a PKCS#11 key; using password from 'signingkey_password'\n")
else:
swtpm_pkcs11_pin = get_config_value(lines, "SWTPM_PKCS11_PIN", "swtpm-tpmca")
swtpm_cert_env["SWTPM_PKCS11_PIN"] = swtpm_pkcs11_pin
logit(LOGFILE, "CA uses a PKCS#11 key; using SWTPM_PKCS11_PIN\n")
# Get additional environment variables pkcs11 modules may need
envvars, ret = get_config_envvars(lines)
if ret != 0:
sys.exit(1)
swtpm_cert_env.update(envvars)
else:
create_certs = False
# create certificate if either the signing key or issuer cert are missing
if not os.access(signkey, os.R_OK):
if os.path.exists(signkey):
logerr(LOGFILE, "Need read rights on signing key %s for user %s.\n" %
(signkey, getpass.getuser()))
sys.exit(1)
create_certs = True
if not os.access(issuercert, os.R_OK):
if os.path.exists(issuercert):
logerr(LOGFILE, "Need read rights on issuer certficate %s for user %s.\n" %
(issuercert, getpass.getuser()))
sys.exit(1)
create_certs = True
if create_certs:
logit(LOGFILE, "Creating root CA and a local CA's signing key and issuer cert.\n")
if create_localca_cert(lockfile, statedir, signkey, signkey_password,
issuercert) != 0:
logerr(LOGFILE, "Error creating local CA's signing key and cert.\n")
sys.exit(1)
if not os.access(signkey, os.R_OK):
logerr(LOGFILE, "Need read rights on signing key %s for user %s.\n" %
(signkey, getpass.getuser()))
sys.exit(1)
if not os.access(issuercert, os.R_OK):
logerr(LOGFILE, "Need read rights on issuer certificate %s for user %s.\n" %
(issuercert, getpass.getuser()))
sys.exit(1)
certserial = get_config_value(lines, "certserial", os.path.join(statedir, "certserial"))
if makedir(os.path.dirname(certserial), "certserial") != 0:
sys.exit(1)
ret = create_cert(flags, typ, directory, ekparams, vmid, tpm_spec_params, tpm_attr_params,
signkey, signkey_password, issuercert, parentkey_password, swtpm_cert_env,
certserial, lockfile, optsfile)
sys.exit(ret)
if __name__ == "__main__":
main()