diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..0d20b6487c61e7d1bde93acf4a14b7a89083a16d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/autoca.fcgi b/autoca.fcgi index b5333b5febd5849c632a77646c3f5b3fe6972b8f..8e07f4f5f51bd978707052b0edad2cfeab453efd 100755 --- a/autoca.fcgi +++ b/autoca.fcgi @@ -1,41 +1,27 @@ #!/usr/bin/python # A minimal RESTful Certification Authority # by <ale@incal.net>, 2010 -# $Id$ import os -from autoca import certutil -from OpenSSL import crypto -from bottle import route, run, request, response, abort, send_file +import yaml +from autoca import ca +from autoca import ca_app -X509_MIME = 'application/x-x509-ca-cert' -ca_conf_file = os.path.join(os.path.dirname(__file__), "ca.yml") -ca = certutil.CA(ca_conf_file) -cwd = os.getcwd() +ca_conf_file = os.path.join(os.path.dirname(__file__), 'ca.yml') -@route('/sign', method='POST') -def sign(): - if 'cert' not in request.POST: - abort(404) - try: - csr_data = request.POST['cert'].value - csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_data) - except Exception, e: - print "Exception while decoding CSR data:", e - abort(400) - signed_cert = ca.sign_certificate(csr) - response.content_type = X509_MIME - return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert) -@route('/ca.pem') -def get_ca_cert(): - return send_file(ca.cacrtpath, cwd, mimetype=X509_MIME) +def create_app(): + with open(ca_conf_file, 'r') as fd: + ca_config = yaml.load(fd) + ca_instance = ca.CA(ca_config['root'], + ca_config['ca_subj'], + int(ca_config.get('bits', '4096')), + ca_config.get('digest', 'sha1')) + app = ca_app.make_app(ca_instance) -@route('/get/:cn') -def get_cert(cn): - if cn.endswith('.pem'): - cn = cn[:-4] - return send_file(ca.path_to_cert(cn), cwd, mimetype=X509_MIME) if __name__ == "__main__": - run() + from flup.server.fcgi import WSGIServer + WSGIServer(create_app()).run() + + diff --git a/autoca/ca.py b/autoca/ca.py new file mode 100644 index 0000000000000000000000000000000000000000..d46c6b4cf57adc30c820215ae4fb0dd89b6c99c4 --- /dev/null +++ b/autoca/ca.py @@ -0,0 +1,123 @@ +from OpenSSL import crypto +import logging +import os +import subprocess +import time + +from autoca import ca_storage +from autoca import certutil + +log = logging.getLogger(__name__) + + +class CA(object): + + def __init__(self, root, subject, bits=1024, digest='sha1'): + self.ca_subject = subject + self.bits = bits + self.digest = digest + self.storage = ca_storage.FileStorage(root) + self._init_ca() + self._update_crl() + + def _init_ca(self): + key_str, crt_str = self.storage.get_ca() + if key_str: + self.ca_key = crypto.load_privatekey( + crypto.FILETYPE_PEM, key_str) + self.ca_crt = crypto.load_certificate( + crypto.FILETYPE_PEM, crt_str) + self.public_ca_pem = crt_str + else: + log.info('initializing CA certificate and private key') + self.ca_key = certutil.create_rsa_key_pair(self.bits) + ca_req = certutil.create_cert_request( + self.ca_key, **(self.ca_subject)) + self.ca_crt = certutil.sign_certificate( + ca_req, self.ca_key, ca_req, 1, 3650, + extensions=[ + crypto.X509Extension('basicConstraints', True, + 'CA:TRUE, pathlen:0'), + crypto.X509Extension('keyUsage', True, + 'keyCertSign, cRLSign'), + #crypto.X509Extension('subjectKeyIdentifier', False, + # 'hash', subject=ca_req), + ], + digest=self.digest) + + crt_str = crypto.dump_certificate( + crypto.FILETYPE_PEM, self.ca_crt) + self.storage.set_ca( + crypto.dump_privatekey(crypto.FILETYPE_PEM, self.ca_key), + crt_str) + self.public_ca_pem = crt_str + + def sign_certificate(self, req, days=365, server=False): + cn = req.get_subject().CN + log.info('sign request for cn=%s', cn) + cert = self.get_certificate(cn) + if cert: + log.info('a valid certificate already exists for cn=%s, ' + 'revoking it', cn) + self._revoke_certificate(cn, cert.get_serial_number()) + new_serial = self.storage.get_next_serial() + extensions = [ + crypto.X509Extension('basicConstraints', True, 'CA:FALSE'), + crypto.X509Extension('keyUsage', True, + '%sdigitalSignature, keyEncipherment' % ( + server and '' or 'nonRepudiation, ')), + crypto.X509Extension('extendedKeyUsage', True, + server and 'serverAuth' or 'clientAuth'), + crypto.X509Extension('nsCertType', True, + server and 'server' or 'client'), + ] + cert = certutil.sign_certificate( + req, self.ca_key, self.ca_crt, new_serial, days, + extensions=extensions, digest=self.digest) + self.storage.store_certificate( + cn, crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) + return cert + + def _update_crl(self): + self.crl_data_der = '' + self.crl_data_pem = self.storage.get_crl() or '' + if self.crl_data_pem: + # Re-read the CRL data in DER and PEM formats. + pipe = subprocess.Popen( + ['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'], + stdout=subprocess.PIPE) + self.crl_data_der = pipe.communicate(self.crl_data_pem)[0] + + def _revoke_certificate(self, cn, serial_num): + log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num) + if self.crl_data_pem: + crl = crypto.load_crl(crypto.FILETYPE_PEM, self.crl_data_pem) + else: + crl = crypto.CRL() + revoked_set = set(x.get_serial() for x in crl.get_revoked()) + if serial_num in revoked_set: + return + + r = crypto.Revoked() + r.set_serial(str(serial_num)) + crl.add_revoked(r) + + self.storage.delete_certificate(cn) + self.storage.set_crl(crl.export(self.ca_crt, self.ca_key, + crypto.FILETYPE_PEM, 7)) + self._update_crl() + + def revoke_certificate(self, cn): + serial_num = self.get_serial(cn) + if serial_num: + self._revoke_certificate(cn, serial_num) + + def get_certificate(self, cn): + data = self.storage.get_certificate(cn) + if data: + return crypto.load_certificate(crypto.FILETYPE_PEM, data) + + def get_serial(self, cn): + crt = self.get_certificate(cn) + if crt: + return crt.get_serial_number() diff --git a/autoca/ca_app.py b/autoca/ca_app.py new file mode 100644 index 0000000000000000000000000000000000000000..5b217efd2aafcf029012c2dcb7ce942c46858f57 --- /dev/null +++ b/autoca/ca_app.py @@ -0,0 +1,83 @@ +import functools +import logging +from OpenSSL import crypto +from flask import Flask, abort, redirect, request, make_response + +app = Flask(__name__) +log = logging.getLogger(__name__) + + +def init_app(ca, config): + app.config.update(config) + app.config.from_envvar('CA_SETTINGS', silent=True) + app.ca = ca + + +def content_type(ctype): + """Decorator to correctly return X509 certificates.""" + def _ctype_decorator(fn): + @functools.wraps(fn) + def _ctype_wrapper(*args, **kwargs): + resp = fn(*args, **kwargs) + if isinstance(resp, basestring): + resp = make_response(resp, 200) + resp.headers['Content-Type'] = ctype + return resp + return _ctype_wrapper + return _ctype_decorator + + +@app.route('/ca.pem') +@content_type('application/x-x509-ca-cert') +def get_ca(): + return app.ca.public_ca_pem + + +@app.route('/crl.pem') +@content_type('application/x-x509-ca-cert') +def get_crl_pem(): + return app.ca.crl_data_pem + + +@app.route('/ca.crl') +@content_type('application/x-pkcs7-crl') +def get_crl_der(): + return app.ca.crl_data_der + + +@app.route('/get/<cn>') +@content_type('application/x-x509-user-cert') +def get_certificate(cn): + cert = app.ca.get_certificate(cn) + if not cert: + abort(404) + return crypto.dump_certificate(crypto.FILETYPE_PEM, cert) + + +@app.route('/revoke/<cn>', methods=['POST']) +def revoke(cn): + app.ca.revoke_certificate(cn) + return 'ok' + + +@app.route('/sign', methods=['POST']) +@content_type('application/x-x509-user-cert') +def sign(): + if not request.form.get('csr'): + abort(400) + try: + csr_data = request.form['csr'] + csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_data) + except Exception, e: + log.exception('error decoding CSR data: %s', e) + abort(400) + server = (request.form.get('server', 'y') == 'y') + days = int(request.form.get('days', 15)) + signed_cert = app.ca.sign_certificate(csr, days=days, server=server) + return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert) + + +def make_app(ca_instance): + init_app(ca_instance, {}) + return app.wsgi_app + diff --git a/autoca/ca_storage.py b/autoca/ca_storage.py new file mode 100644 index 0000000000000000000000000000000000000000..c0aec0fd1a40483fcb184e7b5fd5a13c944d10c4 --- /dev/null +++ b/autoca/ca_storage.py @@ -0,0 +1,77 @@ +import fcntl +import os +import time + +import certutil + + +class FileStorage(object): + + def __init__(self, root): + self.root = root + self.certs_dir = os.path.join(root, 'certs') + self.key_dir = os.path.join(root, 'private') + for path in (self.root, self.certs_dir, self.key_dir): + if not os.path.isdir(path): + os.mkdir(path, 0700) + + # Special files. + self.serial_path = os.path.join(root, 'serial') + self.ca_crt_path = os.path.join(root, 'ca.pem') + self.ca_key_path = os.path.join(self.key_dir, 'ca.key') + self.crl_path = os.path.join(root, 'crl.pem') + + def _cert_path(self, cn): + return os.path.join(self.certs_dir, + cn.replace('/', '_') + '.pem') + + def get_ca(self): + if (os.path.exists(self.ca_crt_path) + and os.path.exists(self.ca_key_path)): + return (certutil.readfrom(self.ca_key_path), + certutil.readfrom(self.ca_crt_path)) + else: + return (None, None) + + def set_ca(self, key_str, cert_str): + certutil.writeto(self.ca_crt_path, cert_str) + certutil.writeto(self.ca_key_path, key_str) + os.chmod(self.ca_key_path, 0400) + + def get_crl(self): + if os.path.exists(self.crl_path): + return certutil.readfrom(self.crl_path) + + def set_crl(self, crl_str): + certutil.writeto(self.crl_path, crl_str) + + def get_certificate(self, cn): + path = self._cert_path(cn) + if os.path.exists(path): + return certutil.readfrom(path) + + def store_certificate(self, cn, cert_data): + certutil.writeto(self._cert_path(cn), cert_data) + + def delete_certificate(self, cn): + os.unlink(self._cert_path(cn)) + + def get_next_serial(self): + try: + fd = open(self.serial_path, 'r+') + except IOError: + fd = open(self.serial_path, 'w+') + fcntl.lockf(fd, fcntl.LOCK_EX) + try: + contents = fd.read() + fd.seek(0) + if contents: + fd.truncate() + serial = int(contents.strip()) + 1 + else: + serial = int(time.time()) + fd.write('%d\n' % serial) + return serial + finally: + fcntl.lockf(fd, fcntl.LOCK_UN) + fd.close() diff --git a/autoca/ca_stub.py b/autoca/ca_stub.py new file mode 100644 index 0000000000000000000000000000000000000000..4bfdb4e723c6eeffaedc8a27a0510bb687c3600b --- /dev/null +++ b/autoca/ca_stub.py @@ -0,0 +1,76 @@ +import logging +import threading +import urllib +import urllib2 +from OpenSSL import crypto + +from autoca import certutil + +log = logging.getLogger(__name__) + + +class Error(Exception): + pass + + +class CaStub(object): + + def __init__(self, url): + self.url = url.rstrip('/') + self.ca_pem = None + self._cache_lock = threading.Lock() + + def _request(self, path, method='GET', args=None, parse=True): + data = None + if args: + if method == 'GET': + path = '%s?%s' % (path, urllib.urlencode(args)) + else: + data = urllib.urlencode(args) + request = urllib2.Request(self.url + path, data) + try: + response = urllib2.urlopen(request) + response_data = response.read() + except urllib2.URLError, e: + log.error('error accessing %s: %s', path, e) + raise Error(str(e)) + if parse: + ctype = response.headers['Content-Type'] + if ctype in ('application/x-x509-user-cert', + 'application/x-x509-ca-cert'): + return crypto.load_certificate( + crypto.FILETYPE_PEM, response_data) + elif ctype == 'application/x-pkcs7-crl': + return crypto.load_crl( + crypto.FILETYPE_ASN1, response_data) + return response_data + + def get_ca(self, parse=True): + with self._cache_lock: + cached_attr = parse and '_ca_parsed' or '_ca_obj' + value = getattr(self, cached_attr, None) + if not value: + value = self._request('/ca.pem', parse=parse) + setattr(self, cached_attr, value) + return value + + def get_crl(self, parse=True): + return self._request('/ca.crl', parse=parse) + + def get_certificate(self, cn, parse=True): + try: + return self._request('/get/%s' % cn, parse=parse) + except Error: + return None + + def sign_certificate(self, cn, **subject_attrs): + pkey = certutil.create_rsa_key_pair() + subject_attrs['CN'] = cn + csr = certutil.create_cert_request(pkey, **subject_attrs) + csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr) + signed_cert = self._request('/sign', 'POST', {'csr': csr_data}) + return pkey, signed_cert + + def revoke_certificate(self, cn): + self._request('/revoke/%s' % cn, 'POST') + diff --git a/autoca/ca_tool.py b/autoca/ca_tool.py new file mode 100644 index 0000000000000000000000000000000000000000..68aaaaaa7afa8abb1d0ec2a02a969aa4189e5a4b --- /dev/null +++ b/autoca/ca_tool.py @@ -0,0 +1,63 @@ +import optparse +import logging +import sys +from OpenSSL import crypto +from autoca import ca_stub + + +def writeout(output, data): + if output: + outfd = open(output, 'w') + else: + outfd = sys.stdout + outfd.write(data) + if output: + outfd.close() + + +def parse_subject(subjstr): + return dict(x.split('=', 1) for x in subjstr.split(',')) + + +def main(): + parser = optparse.OptionParser() + parser.add_option('--url', dest='url', + help='autoca API endpoint') + parser.add_option('--output', dest='output', metavar='FILE', + help='write output to this file') + parser.add_option('--outkey', dest='outkey', metavar='FILE', + help='write private key to this file (only ' + 'useful with the "sign" command)') + parser.add_option('--server', dest='server', action='store_true', + help='create a server certificate (for "sign")') + parser.add_option('--subject', dest='subject', + help='specify the X.509 subject as a set of ' + 'comma-separated ATTR=VALUE assignments') + opts, args = parser.parse_args() + if len(args) < 1: + parser.error('No command specified') + if not opts.url: + parser.error('Must specify --url') + + ca = ca_stub.CaStub(opts.url) + + cmd, args = args[0], args[1:] + if cmd == 'get-ca': + writeout(opts.output, ca.get_ca(parse=False)) + elif cmd == 'get-crl': + writeout(opts.output, ca.get_crl(parse=False)) + elif cmd == 'sign': + if not opts.subject: + parser.error('Must specify --subject') + subject = parse_subject(opts.subject) + pkey, cert = ca.sign_certificate(subject['CN'], **subject) + writeout(opts.output, crypto.dump_certificate( + crypto.FILETYPE_PEM, cert)) + writeout(opts.outkey, crypto.dump_privatekey( + crypto.FILETYPE_PEM, pkey)) + else: + parser.error('Unknown command') + + +if __name__ == '__main__': + main() diff --git a/autoca/certutil.py b/autoca/certutil.py index 07a7a39292a737d368d4ff359664d1dbd523b7b4..650f0a6741225f9bfc81de82dc0dbb5438107513 100644 --- a/autoca/certutil.py +++ b/autoca/certutil.py @@ -1,116 +1,77 @@ -# $Id$ - from OpenSSL import crypto -import os import socket -import time -import yaml -DEFAULT_CONFIG_FILE = "/etc/autoca.conf" -DIGEST = "sha1" +CSR_DIGEST = 'sha1' + + +def writeto(filename, contents): + with open(filename, 'w') as fd: + fd.write(contents) -def _writeto(file, contents): - fd = open(file, "w") - fd.write(contents) - fd.close() -def _readfrom(file): - fd = open(file, "r") - try: +def readfrom(filename): + with open(filename, 'r') as fd: return fd.read() - finally: - fd.close() - - -class CA(object): - - def __init__(self, cf_file=DEFAULT_CONFIG_FILE, load=True): - self._parse_config(cf_file) - self.basedir = self.cf["basedir"] - self.cakeypath = os.path.join(self.basedir, ".private", "ca.key") - self.cacrtpath = os.path.join(self.basedir, "ca.pem") - if load: - self._load_ca_keys() - - def _parse_config(self, file): - fd = open(file, "r") - self.cf = yaml.load(fd) - fd.close() - - def _incr_serial(self): - serialpath = os.path.join(self.basedir, "serial") - try: - fd = open(serialpath, "r+") - cur_serial = int(fd.read().strip()) - cur_serial += 1 - except IOError: - fd = open(serialpath, "w") - cur_serial = int(time.time()) - fd.seek(0) - fd.truncate() - fd.write("%d\n" % cur_serial) - fd.close() - return cur_serial - - def _setup_ca(self): - if not os.path.exists(self.basedir): - os.mkdir(self.basedir, 0700) - for dir in (".private", "certs"): - fulldir = os.path.join(self.basedir, dir) - if not os.path.exists(fulldir): - os.mkdir(fulldir, 0700) - if not os.path.exists(self.cakeypath) or not os.path.exists(self.cacrtpath): - ca_key = self.create_rsa_key_pair(4096) - ca_req = self.create_cert_request(ca_key, **(self.cf["ca_subj"])) - ca_crt = self._sign_certificate(ca_req, ca_key, ca_req) - - _writeto(self.cakeypath, crypto.dump_privatekey(crypto.FILETYPE_PEM, ca_key)) - os.chmod(self.cakeypath, 0400) - _writeto(self.cacrtpath, crypto.dump_certificate(crypto.FILETYPE_PEM, ca_crt)) - - def _load_ca_keys(self): - self._setup_ca() - self.ca_key = crypto.load_privatekey( - crypto.FILETYPE_PEM, _readfrom(self.cakeypath)) - self.public_ca_pem = _readfrom(self.cacrtpath) - self.ca_crt = crypto.load_certificate( - crypto.FILETYPE_PEM, self.public_ca_pem) - - def create_rsa_key_pair(self, bits=1024): - pkey = crypto.PKey() - pkey.generate_key(crypto.TYPE_RSA, bits) - return pkey - - def create_cert_request(self, pkey, **attrs): - if "CN" not in attrs: - attrs["CN"] = socket.gethostname() - req = crypto.X509Req() - subj = req.get_subject() - tattrs = self.cf["default_subj"].copy() - tattrs.update(attrs) - for key, value in tattrs.items(): - setattr(subj, key, value) - req.set_pubkey(pkey) - req.sign(pkey, DIGEST) - return req - - def _sign_certificate(self, req, ca_key, ca_crt): - cert = crypto.X509() - cert.set_serial_number(self._incr_serial()) - cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(86400 * 365) - cert.set_issuer(ca_crt.get_subject()) - cert.set_subject(req.get_subject()) - cert.set_pubkey(req.get_pubkey()) - cert.sign(ca_key, DIGEST) - return cert - - def sign_certificate(self, req): - cert = self._sign_certificate(req, self.ca_key, self.ca_crt) - local_crt = self.path_to_cert(req.get_subject().CN) - _writeto(local_crt, - crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) - return cert - - def path_to_cert(self, cn): - return os.path.join(self.cf["basedir"], "certs", "%s.pem" % cn.lower()) + + +def create_rsa_key_pair(bits=1024): + """Generate a new RSA key pair.""" + pkey = crypto.PKey() + pkey.generate_key(crypto.TYPE_RSA, bits) + return pkey + + +def create_cert_request(pkey, **attrs): + """Generate a new CSR using the given key.""" + if 'CN' not in attrs: + attrs['CN'] = socket.gethostname() + req = crypto.X509Req() + subj = req.get_subject() + for key, value in attrs.items(): + setattr(subj, key, value) + req.set_pubkey(pkey) + req.sign(pkey, CSR_DIGEST) + return req + + +def sign_certificate(req, ca_key, ca_crt, serial_num, days, + extensions=None, digest='sha1'): + cert = crypto.X509() + cert.set_serial_number(serial_num) + cert.gmtime_adj_notBefore(0) + cert.gmtime_adj_notAfter(86400 * days) + cert.set_issuer(ca_crt.get_subject()) + cert.set_subject(req.get_subject()) + cert.set_pubkey(req.get_pubkey()) + if extensions: + cert.add_extensions(extensions) + cert.sign(ca_key, digest) + return cert + + +class FakeRevoked(object): + + def set_serial(self, serial): + self.serial = serial + + +class FakeCRL(object): + + def __init__(self): + self.entries = set() + + def get_revoked(self): + return self.entries + + def add_revoked(self, r): + self.entries.add(r) + + def export(self, cert, key, filetype, days): + return '' + + +# Work around missing CRL implementation in PyOpenSSL < 0.11. +if not hasattr(crypto, 'CRL'): + crypto.Revoked = FakeRevoked + crypto.CRL = FakeCRL + crypto.load_crl = lambda x, y: FakeCRL() diff --git a/autoca/test/__init__.py b/autoca/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4c49e84c4e703b90a5a0eddbf68accb1c6dc5aa4 --- /dev/null +++ b/autoca/test/__init__.py @@ -0,0 +1,20 @@ +from OpenSSL import crypto +import unittest +import tempfile +import shutil +import os + +from autoca import ca + + +class CaTestBase(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.ca_subject = {'CN': 'Test CA', + 'O': 'Test Corp.', + 'L': 'IE'} + self.ca = ca.CA(self.tmpdir, self.ca_subject, bits=1024) + + def tearDown(self): + shutil.rmtree(self.tmpdir) diff --git a/autoca/test/test_ca.py b/autoca/test/test_ca.py new file mode 100644 index 0000000000000000000000000000000000000000..2da8c0954aaf9430da2be531a72acbf4748f8da8 --- /dev/null +++ b/autoca/test/test_ca.py @@ -0,0 +1,95 @@ +from OpenSSL import crypto +import unittest +import os + +from autoca import ca +from autoca import certutil +from autoca.test import * + + +class CaTest(CaTestBase): + + def test_sanity_checks(self): + self.assertTrue(os.path.exists(self.ca.storage.ca_key_path)) + self.assertTrue(os.path.exists(self.ca.storage.ca_crt_path)) + + # Test subject attributes of the CA certificate. + with open(self.ca.storage.ca_crt_path, 'r') as fd: + cacrt = crypto.load_certificate(crypto.FILETYPE_PEM, fd.read()) + subj = cacrt.get_subject() + self.assertEquals('Test CA', subj.CN) + self.assertEquals('Test Corp.', subj.O) + self.assertEquals('IE', subj.L) + + # Test fetching a non-existing certificate. + self.assertEquals(None, self.ca.get_certificate('missing')) + + def test_reinit(self): + ca2 = ca.CA(self.tmpdir, self.ca_subject, bits=1024) + self.assertEquals(self.ca.public_ca_pem, ca2.public_ca_pem) + + def test_sign_certificate(self): + pkey = certutil.create_rsa_key_pair() + request = certutil.create_cert_request(pkey, CN='testcn') + result = self.ca.sign_certificate(request) + self.assertTrue(result is not None) + self.assertEquals('testcn', result.get_subject().CN) + + # Check that the certificate is now stored in the CA db. + result2 = self.ca.get_certificate('testcn') + self.assertTrue(result2 is not None) + + # Check the serial number. + serial_no = self.ca.get_serial('testcn') + self.assertTrue(int(serial_no) > 1) + + def test_sign_certificate_twice(self): + pkey = certutil.create_rsa_key_pair() + request = certutil.create_cert_request(pkey, CN='testcn') + result = self.ca.sign_certificate(request) + self.assertTrue(result is not None) + serial_no = int(result.get_serial_number()) + + pkey2 = certutil.create_rsa_key_pair() + request2 = certutil.create_cert_request(pkey2, CN='testcn') + result2 = self.ca.sign_certificate(request) + self.assertTrue(result2 is not None) + serial_no2 = int(result2.get_serial_number()) + + # Check that we have the same CN. + self.assertEquals(result.get_subject().CN, + result2.get_subject().CN) + + # Check that the serial numbers are monotonically incrementing. + self.assertNotEquals(serial_no, serial_no2) + self.assertTrue(serial_no2 > serial_no) + + # Check that get_certificate() returns the latest cert. + self.assertEquals(serial_no2, + self.ca.get_serial('testcn')) + + # Check that a CRL file has been generated. + self.assertTrue(os.path.exists(self.ca.storage.crl_path)) + + def test_revoke_certificate(self): + pkey = certutil.create_rsa_key_pair() + request = certutil.create_cert_request(pkey, CN='testcn') + result = self.ca.sign_certificate(request) + self.assertTrue(result is not None) + self.assertEquals('testcn', result.get_subject().CN) + + # Check that the certificate is now stored in the CA db. + result2 = self.ca.get_certificate('testcn') + self.assertTrue(result2 is not None) + + self.ca.revoke_certificate('testcn') + + result3 = self.ca.get_certificate('testcn') + self.assertEquals(None, result3) + + # Try to revoke it twice, expect no errors. + self.ca.revoke_certificate('testcn') + + +if __name__ == '__main__': + unittest.main() diff --git a/autoca/test/test_ca_app.py b/autoca/test/test_ca_app.py new file mode 100644 index 0000000000000000000000000000000000000000..0ddb60e1441e50c17a0f001d1411a9e5d33f264c --- /dev/null +++ b/autoca/test/test_ca_app.py @@ -0,0 +1,107 @@ +import unittest +import urllib +import StringIO +from OpenSSL import crypto + +from autoca import certutil +from autoca import ca_app +from autoca.test import * + + +class CaAppTest(CaTestBase): + + def setUp(self): + CaTestBase.setUp(self) + self.app = ca_app.make_app(self.ca) + + def _request(self, url, method='GET', data=None): + if '?' not in url: + url = url + '?' + path, qs = url.split('?', 1) + environ = {'wsgi.url_scheme': 'http', + 'PATH_INFO': path, + 'QUERY_STRING': qs, + 'REQUEST_URI': url, + 'REQUEST_METHOD': method, + 'SERVER_NAME': 'test', + 'SERVER_PORT': '80'} + if data: + enc_data = urllib.urlencode(data) + environ['wsgi.input'] = StringIO.StringIO(enc_data) + environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' + environ['CONTENT_LENGTH'] = str(len(enc_data)) + + def start_response(status, headers): + self._response_status = int(status[:3]) + self._response_headers = dict(headers) + output = ''.join(self.app(environ, start_response)) + return (self._response_status, self._response_headers, output) + + def _sign_cert(self): + pkey = certutil.create_rsa_key_pair() + csr = certutil.create_cert_request(pkey, CN='testcn') + csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr) + status, hdrs, output = self._request( + '/sign', 'POST', {'csr': csr_data}) + self.assertEquals(200, status) + self.assertEquals('application/x-x509-user-cert', + hdrs['Content-Type']) + crt = crypto.load_certificate(crypto.FILETYPE_PEM, output) + return crt + + def test_get_ca_pem(self): + status, hdrs, output = self._request('/ca.pem') + self.assertTrue(200, status) + self.assertEquals('application/x-x509-ca-cert', + hdrs['Content-Type']) + self.assertEquals(self.ca.public_ca_pem, output) + + def test_get_crl_pem(self): + status, hdrs, output = self._request('/crl.pem') + self.assertTrue(200, status) + self.assertEquals('application/x-x509-ca-cert', + hdrs['Content-Type']) + self.assertEquals(self.ca.crl_data_pem, output) + + def test_get_crl_der(self): + status, hdrs, output = self._request('/ca.crl') + self.assertTrue(200, status) + self.assertEquals('application/x-pkcs7-crl', + hdrs['Content-Type']) + self.assertEquals(self.ca.crl_data_der, output) + + def test_sign(self): + crt = self._sign_cert() + self.assertTrue(crt is not None) + + def test_sign_bad_csr(self): + status, hdrs, output = self._request( + '/sign', 'POST', {'csr': 'bad'}) + self.assertEquals(400, status) + + status, hdrs, output = self._request( + '/sign', 'POST', {'fuffa': 'true'}) + self.assertEquals(400, status) + + def test_get_certificate(self): + crt = self._sign_cert() + crt_data = crypto.dump_certificate(crypto.FILETYPE_PEM, crt) + status, hdrs, output = self._request('/get/testcn') + self.assertEquals(200, status) + self.assertEquals('application/x-x509-user-cert', + hdrs['Content-Type']) + self.assertEquals(crt_data, output) + + def test_get_nonexisting_certificate(self): + status, hdrs, output = self._request('/get/nonexisting') + self.assertEquals(404, status) + + def test_revoke(self): + status, hdrs, output = self._request('/revoke/testcn', 'POST') + self.assertEquals(200, status) + self.assertEquals(None, self.ca.get_certificate('testcn')) + + def test_revoke_nonexisting_certificate(self): + status, hdrs, output = self._request('/revoke/fuffa', 'POST') + self.assertEquals(200, status) + diff --git a/autoca/test/test_ca_stub.py b/autoca/test/test_ca_stub.py new file mode 100644 index 0000000000000000000000000000000000000000..835c9f60874f95fbf2175730251e2601a6bcde58 --- /dev/null +++ b/autoca/test/test_ca_stub.py @@ -0,0 +1,132 @@ +from OpenSSL import crypto +import unittest +import urllib2 +import os +import mox + +from autoca import ca_app +from autoca import ca_stub +from autoca import certutil +from autoca.test import * + +TEST_URL = 'https://my.ca.org' + + +class FakeResponse(object): + + def __init__(self, data, content_type): + self.data = data + self.headers = {'Content-Type': content_type} + + def read(self): + return self.data + + +def IsUrl(url): + def _comparator(x): + return (isinstance(x, urllib2.Request) and + x.get_full_url() == (TEST_URL + url)) + return mox.Func(_comparator) + + +class CaStubTest(mox.MoxTestBase, CaTestBase): + + def setUp(self): + mox.MoxTestBase.setUp(self) + CaTestBase.setUp(self) + self.mox.StubOutWithMock(urllib2, 'urlopen') + self.stub = ca_stub.CaStub(TEST_URL) + + def tearDown(self): + CaTestBase.tearDown(self) + mox.MoxTestBase.tearDown(self) + + def test_get_ca(self): + urllib2.urlopen(IsUrl('/ca.pem')).AndReturn( + FakeResponse(self.ca.public_ca_pem, + 'application/x-x509-ca-cert')) + + self.mox.ReplayAll() + result = self.stub.get_ca() + self.assertEquals(self.ca.ca_subject['CN'], + result.get_subject().CN) + + # Test caching (mox will catch a second urllib call). + result2 = self.stub.get_ca() + self.assertEquals(result, result2) + + def test_get_ca_raw(self): + urllib2.urlopen(IsUrl('/ca.pem')).AndReturn( + FakeResponse(self.ca.public_ca_pem, + 'application/x-x509-ca-cert')) + + self.mox.ReplayAll() + result = self.stub.get_ca(parse=False) + self.assertTrue(isinstance(result, basestring)) + self.assertEquals(self.ca.public_ca_pem, result) + + def test_get_crl(self): + urllib2.urlopen(IsUrl('/ca.crl')).AndReturn( + FakeResponse(self.ca.crl_data_der, + 'application/x-pkcs7-crl')) + + self.mox.ReplayAll() + result = self.stub.get_crl() + self.assertTrue(result is not None) + + def test_get_certificate(self): + pkey = certutil.create_rsa_key_pair() + csr = certutil.create_cert_request(pkey, CN='testcn') + cert = self.ca.sign_certificate(csr) + cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) + + urllib2.urlopen(IsUrl('/get/testcn')).AndReturn( + FakeResponse(cert_str, 'application/x-x509-user-cert')) + + self.mox.ReplayAll() + result = self.stub.get_certificate('testcn') + self.assertEquals('testcn', result.get_subject().CN) + + def test_get_certificate_raw(self): + pkey = certutil.create_rsa_key_pair() + csr = certutil.create_cert_request(pkey, CN='testcn') + cert = self.ca.sign_certificate(csr) + cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) + + urllib2.urlopen(IsUrl('/get/testcn')).AndReturn( + FakeResponse(cert_str, 'application/x-x509-user-cert')) + + self.mox.ReplayAll() + result2 = self.stub.get_certificate('testcn', parse=False) + self.assertTrue(isinstance(result2, basestring)) + self.assertEquals(cert_str, result2) + + def test_get_nonexisting_certificate(self): + urllib2.urlopen(IsUrl('/get/missingcn')).AndRaise( + urllib2.URLError('not found')) + + self.mox.ReplayAll() + result = self.stub.get_certificate('missingcn') + self.assertEquals(None, result) + + def test_sign_certificate(self): + pkey = certutil.create_rsa_key_pair() + csr = certutil.create_cert_request(pkey, CN='testcn') + cert = self.ca.sign_certificate(csr) + cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) + + urllib2.urlopen(IsUrl('/sign')).AndReturn( + FakeResponse(cert_str, 'application/x-x509-user-cert')) + self.mox.ReplayAll() + + key, result = self.stub.sign_certificate('testcn') + self.assertTrue(key is not None) + self.assertEquals('testcn', result.get_subject().CN) + + def test_revoke_certificate(self): + urllib2.urlopen(IsUrl('/revoke/testcn')).AndReturn( + FakeResponse('ok', 'text/html')) + + self.mox.ReplayAll() + self.stub.revoke_certificate('testcn') + diff --git a/setup.py b/setup.py index a849d176810e1edce0534b0cad24014a867281fb..835a78202dd7ac794cea5e02b46b528675552ef3 100755 --- a/setup.py +++ b/setup.py @@ -4,18 +4,18 @@ from setuptools import setup, find_packages setup( name = "autoca", - version = "0.1.1", + version = "0.2", description = "Automated CA management.", author = "Ale", author_email = "ale@incal.net", - url = "http://code.autistici.org/p/autoca", + url = "http://git.autistici.org/autoca/", license = "MIT", packages = find_packages(), platforms = ["any"], zip_safe = False, entry_points = { "console_scripts": [ - "autoca-newcert = autoca.newcert:main", + "autoca = autoca.ca_tool:main", ], }, )