Commit 951f622b authored by ale's avatar ale
Browse files

version 0.2: CA/cert generation fixes, better API, client stub interface, proper CRL support, tests

parent 2d077a78
#!/usr/bin/python #!/usr/bin/python
# A minimal RESTful Certification Authority # A minimal RESTful Certification Authority
# by <>, 2010 # by <>, 2010
# $Id$
import os import os
from autoca import certutil import yaml
from OpenSSL import crypto from autoca import ca
from bottle import route, run, request, response, abort, send_file from autoca import ca_app
X509_MIME = 'application/x-x509-ca-cert' ca_conf_file = os.path.join(os.path.dirname(__file__), 'ca.yml')
ca_conf_file = os.path.join(os.path.dirname(__file__), "ca.yml")
ca = certutil.CA(ca_conf_file)
cwd = os.getcwd() def create_app():
with open(ca_conf_file, 'r') as fd:
@route('/sign', method='POST') ca_config = yaml.load(fd)
def sign(): ca_instance = ca.CA(ca_config['root'],
if 'cert' not in request.POST: ca_config['ca_subj'],
abort(404) int(ca_config.get('bits', '4096')),
try: ca_config.get('digest', 'sha1'))
csr_data = request.POST['cert'].value app = ca_app.make_app(ca_instance)
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_data)
except Exception, e:
print "Exception while decoding CSR data:", e
signed_cert = ca.sign_certificate(csr)
response.content_type = X509_MIME
return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert)
def get_ca_cert():
return send_file(ca.cacrtpath, cwd, mimetype=X509_MIME)
def get_cert(cn):
if cn.endswith('.pem'):
cn = cn[:-4]
return send_file(ca.path_to_cert(cn), cwd, mimetype=X509_MIME)
if __name__ == "__main__": if __name__ == "__main__":
run() from flup.server.fcgi import WSGIServer
from OpenSSL import crypto
import logging
import os
import subprocess
import time
from autoca import ca_storage
from autoca import certutil
log = logging.getLogger(__name__)
class CA(object):
def __init__(self, root, subject, bits=1024, digest='sha1'):
self.ca_subject = subject
self.bits = bits
self.digest = digest = ca_storage.FileStorage(root)
def _init_ca(self):
key_str, crt_str =
if key_str:
self.ca_key = crypto.load_privatekey(
crypto.FILETYPE_PEM, key_str)
self.ca_crt = crypto.load_certificate(
crypto.FILETYPE_PEM, crt_str)
self.public_ca_pem = crt_str
else:'initializing CA certificate and private key')
self.ca_key = certutil.create_rsa_key_pair(self.bits)
ca_req = certutil.create_cert_request(
self.ca_key, **(self.ca_subject))
self.ca_crt = certutil.sign_certificate(
ca_req, self.ca_key, ca_req, 1, 3650,
crypto.X509Extension('basicConstraints', True,
'CA:TRUE, pathlen:0'),
crypto.X509Extension('keyUsage', True,
'keyCertSign, cRLSign'),
#crypto.X509Extension('subjectKeyIdentifier', False,
# 'hash', subject=ca_req),
crt_str = crypto.dump_certificate(
crypto.FILETYPE_PEM, self.ca_crt)
crypto.dump_privatekey(crypto.FILETYPE_PEM, self.ca_key),
self.public_ca_pem = crt_str
def sign_certificate(self, req, days=365, server=False):
cn = req.get_subject().CN'sign request for cn=%s', cn)
cert = self.get_certificate(cn)
if cert:'a valid certificate already exists for cn=%s, '
'revoking it', cn)
self._revoke_certificate(cn, cert.get_serial_number())
new_serial =
extensions = [
crypto.X509Extension('basicConstraints', True, 'CA:FALSE'),
crypto.X509Extension('keyUsage', True,
'%sdigitalSignature, keyEncipherment' % (
server and '' or 'nonRepudiation, ')),
crypto.X509Extension('extendedKeyUsage', True,
server and 'serverAuth' or 'clientAuth'),
crypto.X509Extension('nsCertType', True,
server and 'server' or 'client'),
cert = certutil.sign_certificate(
req, self.ca_key, self.ca_crt, new_serial, days,
extensions=extensions, digest=self.digest)
cn, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
return cert
def _update_crl(self):
self.crl_data_der = ''
self.crl_data_pem = or ''
if self.crl_data_pem:
# Re-read the CRL data in DER and PEM formats.
pipe = subprocess.Popen(
['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'],
self.crl_data_der = pipe.communicate(self.crl_data_pem)[0]
def _revoke_certificate(self, cn, serial_num):
log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num)
if self.crl_data_pem:
crl = crypto.load_crl(crypto.FILETYPE_PEM, self.crl_data_pem)
crl = crypto.CRL()
revoked_set = set(x.get_serial() for x in crl.get_revoked())
if serial_num in revoked_set:
r = crypto.Revoked()
crl.add_revoked(r), self.ca_key,
crypto.FILETYPE_PEM, 7))
def revoke_certificate(self, cn):
serial_num = self.get_serial(cn)
if serial_num:
self._revoke_certificate(cn, serial_num)
def get_certificate(self, cn):
data =
if data:
return crypto.load_certificate(crypto.FILETYPE_PEM, data)
def get_serial(self, cn):
crt = self.get_certificate(cn)
if crt:
return crt.get_serial_number()
import functools
import logging
from OpenSSL import crypto
from flask import Flask, abort, redirect, request, make_response
app = Flask(__name__)
log = logging.getLogger(__name__)
def init_app(ca, config):
app.config.from_envvar('CA_SETTINGS', silent=True) = ca
def content_type(ctype):
"""Decorator to correctly return X509 certificates."""
def _ctype_decorator(fn):
def _ctype_wrapper(*args, **kwargs):
resp = fn(*args, **kwargs)
if isinstance(resp, basestring):
resp = make_response(resp, 200)
resp.headers['Content-Type'] = ctype
return resp
return _ctype_wrapper
return _ctype_decorator
def get_ca():
def get_crl_pem():
def get_crl_der():
def get_certificate(cn):
cert =
if not cert:
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
@app.route('/revoke/<cn>', methods=['POST'])
def revoke(cn):
return 'ok'
@app.route('/sign', methods=['POST'])
def sign():
if not request.form.get('csr'):
csr_data = request.form['csr']
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_data)
except Exception, e:
log.exception('error decoding CSR data: %s', e)
server = (request.form.get('server', 'y') == 'y')
days = int(request.form.get('days', 15))
signed_cert =, days=days, server=server)
return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert)
def make_app(ca_instance):
init_app(ca_instance, {})
return app.wsgi_app
import fcntl
import os
import time
import certutil
class FileStorage(object):
def __init__(self, root):
self.root = root
self.certs_dir = os.path.join(root, 'certs')
self.key_dir = os.path.join(root, 'private')
for path in (self.root, self.certs_dir, self.key_dir):
if not os.path.isdir(path):
os.mkdir(path, 0700)
# Special files.
self.serial_path = os.path.join(root, 'serial')
self.ca_crt_path = os.path.join(root, 'ca.pem')
self.ca_key_path = os.path.join(self.key_dir, 'ca.key')
self.crl_path = os.path.join(root, 'crl.pem')
def _cert_path(self, cn):
return os.path.join(self.certs_dir,
cn.replace('/', '_') + '.pem')
def get_ca(self):
if (os.path.exists(self.ca_crt_path)
and os.path.exists(self.ca_key_path)):
return (certutil.readfrom(self.ca_key_path),
return (None, None)
def set_ca(self, key_str, cert_str):
certutil.writeto(self.ca_crt_path, cert_str)
certutil.writeto(self.ca_key_path, key_str)
os.chmod(self.ca_key_path, 0400)
def get_crl(self):
if os.path.exists(self.crl_path):
return certutil.readfrom(self.crl_path)
def set_crl(self, crl_str):
certutil.writeto(self.crl_path, crl_str)
def get_certificate(self, cn):
path = self._cert_path(cn)
if os.path.exists(path):
return certutil.readfrom(path)
def store_certificate(self, cn, cert_data):
certutil.writeto(self._cert_path(cn), cert_data)
def delete_certificate(self, cn):
def get_next_serial(self):
fd = open(self.serial_path, 'r+')
except IOError:
fd = open(self.serial_path, 'w+')
fcntl.lockf(fd, fcntl.LOCK_EX)
contents =
if contents:
serial = int(contents.strip()) + 1
serial = int(time.time())
fd.write('%d\n' % serial)
return serial
fcntl.lockf(fd, fcntl.LOCK_UN)
import logging
import threading
import urllib
import urllib2
from OpenSSL import crypto
from autoca import certutil
log = logging.getLogger(__name__)
class Error(Exception):
class CaStub(object):
def __init__(self, url):
self.url = url.rstrip('/')
self.ca_pem = None
self._cache_lock = threading.Lock()
def _request(self, path, method='GET', args=None, parse=True):
data = None
if args:
if method == 'GET':
path = '%s?%s' % (path, urllib.urlencode(args))
data = urllib.urlencode(args)
request = urllib2.Request(self.url + path, data)
response = urllib2.urlopen(request)
response_data =
except urllib2.URLError, e:
log.error('error accessing %s: %s', path, e)
raise Error(str(e))
if parse:
ctype = response.headers['Content-Type']
if ctype in ('application/x-x509-user-cert',
return crypto.load_certificate(
crypto.FILETYPE_PEM, response_data)
elif ctype == 'application/x-pkcs7-crl':
return crypto.load_crl(
crypto.FILETYPE_ASN1, response_data)
return response_data
def get_ca(self, parse=True):
with self._cache_lock:
cached_attr = parse and '_ca_parsed' or '_ca_obj'
value = getattr(self, cached_attr, None)
if not value:
value = self._request('/ca.pem', parse=parse)
setattr(self, cached_attr, value)
return value
def get_crl(self, parse=True):
return self._request('/ca.crl', parse=parse)
def get_certificate(self, cn, parse=True):
return self._request('/get/%s' % cn, parse=parse)
except Error:
return None
def sign_certificate(self, cn, **subject_attrs):
pkey = certutil.create_rsa_key_pair()
subject_attrs['CN'] = cn
csr = certutil.create_cert_request(pkey, **subject_attrs)
csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
signed_cert = self._request('/sign', 'POST', {'csr': csr_data})
return pkey, signed_cert
def revoke_certificate(self, cn):
self._request('/revoke/%s' % cn, 'POST')
import optparse
import logging
import sys
from OpenSSL import crypto
from autoca import ca_stub
def writeout(output, data):
if output:
outfd = open(output, 'w')
outfd = sys.stdout
if output:
def parse_subject(subjstr):
return dict(x.split('=', 1) for x in subjstr.split(','))
def main():
parser = optparse.OptionParser()
parser.add_option('--url', dest='url',
help='autoca API endpoint')
parser.add_option('--output', dest='output', metavar='FILE',
help='write output to this file')
parser.add_option('--outkey', dest='outkey', metavar='FILE',
help='write private key to this file (only '
'useful with the "sign" command)')
parser.add_option('--server', dest='server', action='store_true',
help='create a server certificate (for "sign")')
parser.add_option('--subject', dest='subject',
help='specify the X.509 subject as a set of '
'comma-separated ATTR=VALUE assignments')
opts, args = parser.parse_args()
if len(args) < 1:
parser.error('No command specified')
if not opts.url:
parser.error('Must specify --url')
ca = ca_stub.CaStub(opts.url)
cmd, args = args[0], args[1:]
if cmd == 'get-ca':
writeout(opts.output, ca.get_ca(parse=False))
elif cmd == 'get-crl':
writeout(opts.output, ca.get_crl(parse=False))
elif cmd == 'sign':
if not opts.subject:
parser.error('Must specify --subject')
subject = parse_subject(opts.subject)
pkey, cert = ca.sign_certificate(subject['CN'], **subject)
writeout(opts.output, crypto.dump_certificate(
crypto.FILETYPE_PEM, cert))
writeout(opts.outkey, crypto.dump_privatekey(
crypto.FILETYPE_PEM, pkey))
parser.error('Unknown command')
if __name__ == '__main__':
# $Id$
from OpenSSL import crypto from OpenSSL import crypto
import os
import socket import socket
import time
import yaml
DEFAULT_CONFIG_FILE = "/etc/autoca.conf" CSR_DIGEST = 'sha1'
DIGEST = "sha1"
def _writeto(file, contents): def writeto(filename, contents):
fd = open(file, "w") with open(filename, 'w') as fd:
fd.write(contents) fd.write(contents)
def _readfrom(file):
fd = open(file, "r") def readfrom(filename):
try: with open(filename, 'r') as fd:
return return
def create_rsa_key_pair(bits=1024):
"""Generate a new RSA key pair."""
class CA(object):
def __init__(self, cf_file=DEFAULT_CONFIG_FILE, load=True):
self.basedir =["basedir"]
self.cakeypath = os.path.join(self.basedir, ".private", "ca.key")
self.cacrtpath = os.path.join(self.basedir, "ca.pem")
if load:
def _parse_config(self, file):
fd = open(file, "r") = yaml.load(fd)
def _incr_serial(self):
serialpath = os.path.join(self.basedir, "serial")
fd = open(serialpath, "r+")
cur_serial = int(
cur_serial += 1
except IOError:
fd = open(serialpath, "w")
cur_serial = int(time.time())
fd.write("%d\n" % cur_serial)
return cur_serial
def _setup_ca(self):
if not os.path.exists(self.basedir):
os.mkdir(self.basedir, 0700)
for dir in (".private", "certs"):
fulldir = os.path.join(self.basedir, dir)
if not os.path.exists(fulldir):
os.mkdir(fulldir, 0700)
if not os.path.exists(self.cakeypath) or not os.path.exists(self.cacrtpath):
ca_key = self.create_rsa_key_pair(4096)
ca_req = self.create_cert_request(ca_key, **(["ca_subj"]))
ca_crt = self._sign_certificate(ca_req, ca_key, ca_req)
_writeto(self.cakeypath, crypto.dump_privatekey(crypto.FILETYPE_PEM, ca_key))
os.chmod(self.cakeypath, 0400)
_writeto(self.cacrtpath, crypto.dump_certificate(crypto.FILETYPE_PEM, ca_crt))
def _load_ca_keys(self):
self.ca_key = crypto.load_privatekey(
crypto.FILETYPE_PEM, _readfrom(self.cakeypath))
self.public_ca_pem = _readfrom(self.cacrtpath)
self.ca_crt = crypto.load_certificate(
crypto.FILETYPE_PEM, self.public_ca_pem)
def create_rsa_key_pair(self, bits=1024):
pkey = crypto.PKey() pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, bits) pkey.generate_key(crypto.TYPE_RSA, bits)
return pkey return pkey
def create_cert_request(self, pkey, **attrs):
if "CN" not in attrs: def create_cert_request(pkey, **attrs):
attrs["CN"] = socket.gethostname() """Generate a new CSR using the given key."""
if 'CN' not in attrs:
attrs['CN'] = socket.gethostname()
req = crypto.X509Req() req = crypto.X509Req()
subj = req.get_subject() subj = req.get_subject()
tattrs =["default_subj"].copy() for key, value in attrs.items():
for key, value in tattrs.items():
setattr(subj, key, value) setattr(subj, key, value)
req.set_pubkey(pkey) req.set_pubkey(pkey)
req.sign(pkey, DIGEST) req.sign(pkey