Commit 27aa4b81 authored by ale's avatar ale

made the "ca_stub" and "ca" interfaces match; rationalize the API

parent 545f0ecd
......@@ -13,15 +13,13 @@ ca_conf_file = os.path.join(os.path.dirname(__file__), 'ca.yml')
def create_app():
with open(ca_conf_file, 'r') as fd:
ca_config = yaml.load(fd)
ca_instance = ca.CA(ca_config['root'],
ca_instance = ca.CA(ca_config['basedir'],
ca_config['ca_subj'],
int(ca_config.get('bits', '4096')),
ca_config.get('digest', 'sha1'))
app = ca_app.make_app(ca_instance)
if __name__ == "__main__":
if __name__ == '__main__':
from flup.server.fcgi import WSGIServer
WSGIServer(create_app()).run()
......@@ -52,6 +52,14 @@ class CA(object):
crt_str)
self.public_ca_pem = crt_str
def get_ca(self):
return self.public_ca_pem
def make_certificate(self, subject_attrs, days=7, server=False):
pkey = certutil.create_rsa_key_pair()
csr = certutil.create_cert_request(pkey, **subject_attrs)
return pkey, self.sign_certificate(csr, days, server)
def sign_certificate(self, req, days=365, server=False):
cn = req.get_subject().CN
log.info('sign request for cn=%s', cn)
......@@ -98,18 +106,23 @@ class CA(object):
self.crl_data_pem = crl.export(self.ca_crt, self.ca_key,
crypto.FILETYPE_PEM, 30)
self.storage.set_crl(self.crl_data_pem)
log.debug('CRL PEM data: %s', self.crl_data_pem)
# Re-read the CRL data in DER and PEM formats.
pipe = subprocess.Popen(
['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.crl_data_der = pipe.communicate(self.crl_data_pem)[0]
def get_crl(self, format='der'):
if format == 'der':
return self.crl_data_der
else:
return self.crl_data_pem
def _revoke_certificate(self, cn, serial_num):
log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num)
#serial_hex = '%X' % int(serial_num)
revoked = self.storage.get_revoked()
revoked = set(x[0] for x in self.storage.get_revoked())
if serial_num in revoked:
return
......@@ -122,12 +135,13 @@ class CA(object):
if serial_num:
self._revoke_certificate(cn, serial_num)
def get_certificate(self, cn):
def get_certificate(self, cn, raw=False):
data = self.storage.get_certificate(cn)
if data:
return crypto.load_certificate(crypto.FILETYPE_PEM, data)
if data and not raw:
data = crypto.load_certificate(crypto.FILETYPE_PEM, data)
return data
def get_serial(self, cn):
crt = self.get_certificate(cn)
crt = self.get_certificate(cn, raw=False)
if crt:
return crt.get_serial_number()
import functools
import logging
from OpenSSL import crypto
from flask import Flask, abort, redirect, request, make_response
from flask import Blueprint, Flask, abort, redirect, request, make_response, \
g, current_app
from autoca import ca
from autoca import certutil
app = Flask(__name__)
ca_app = Blueprint('ca', __name__)
log = logging.getLogger(__name__)
def init_app(ca, config):
app.config.update(config)
app.config.from_envvar('CA_SETTINGS', silent=True)
app.ca = ca
def content_type(ctype):
"""Decorator to correctly return X509 certificates."""
def _ctype_decorator(fn):
......@@ -27,40 +24,45 @@ def content_type(ctype):
return _ctype_decorator
@app.route('/ca.pem')
@ca_app.before_request
def set_ca_wrapper():
g.ca = current_app.ca
@ca_app.route('/ca.pem')
@content_type('application/x-x509-ca-cert')
def get_ca():
return app.ca.public_ca_pem
return g.ca.public_ca_pem
@app.route('/crl.pem')
@ca_app.route('/crl.pem')
@content_type('application/x-x509-ca-cert')
def get_crl_pem():
return app.ca.crl_data_pem
return g.ca.crl_data_pem
@app.route('/ca.crl')
@ca_app.route('/ca.crl')
@content_type('application/x-pkcs7-crl')
def get_crl_der():
return app.ca.crl_data_der
return g.ca.crl_data_der
@app.route('/get/<cn>')
@ca_app.route('/get/<cn>')
@content_type('application/x-x509-user-cert')
def get_certificate(cn):
cert = app.ca.get_certificate(cn)
cert = g.ca.get_certificate(cn)
if not cert:
abort(404)
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
@app.route('/revoke/<cn>', methods=['POST'])
@ca_app.route('/revoke/<cn>', methods=['POST'])
def revoke(cn):
app.ca.revoke_certificate(cn)
g.ca.revoke_certificate(cn)
return 'ok'
@app.route('/sign', methods=['POST'])
@ca_app.route('/sign', methods=['POST'])
@content_type('application/x-x509-user-cert')
def sign():
if not request.form.get('csr'):
......@@ -71,13 +73,44 @@ def sign():
except Exception, e:
log.exception('error decoding CSR data: %s', e)
abort(400)
server = (request.form.get('server', 'y') == 'y')
days = int(request.form.get('days', 15))
signed_cert = app.ca.sign_certificate(csr, days=days, server=server)
server = (request.form.get('server', 'n') == 'y')
days = int(request.form.get('days', 7))
signed_cert = g.ca.sign_certificate(
csr, days=days, server=server)
return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert)
def make_app(ca_instance):
init_app(ca_instance, {})
return app.wsgi_app
app = Flask(__name__)
app.register_blueprint(ca_app)
app.config.from_envvar('CA_SETTINGS', silent=True)
app.ca = ca_instance
return app
def main():
import optparse
parser = optparse.OptionParser()
parser.add_option('--root', dest='root')
parser.add_option('-p', '--port', dest='port', type='int', default=4000)
parser.add_option('--bits', dest='bits', type='int', default=4096)
parser.add_option('--subject', dest='subject')
opts, args = parser.parse_args()
if args:
parser.error('Too many arguments')
if not opts.root:
parser.error('--root is mandatory')
if not opts.subject:
parser.error('--subject is mandatory')
ca_instance = ca.CA(opts.root,
certutil.parse_subject(opts.subject),
opts.bits)
app = make_app(ca_instance)
app.debug = True
app.run(port=opts.port)
if __name__ == '__main__':
main()
......@@ -62,7 +62,6 @@ class FileStorage(object):
return certutil.readfrom(self.crl_path)
def set_crl(self, crl_str):
print 'setting CRL:', crl_str
certutil.writeto(self.crl_path, crl_str)
def get_certificate(self, cn):
......@@ -91,11 +90,9 @@ class FileStorage(object):
def get_revoked(self):
with LockedFile(self.revoked_path) as fd:
revoked = [map(int, x.strip().split()) for x in fd if x]
print 'get_revoked():', revoked
return revoked
def add_revoked(self, serial):
print 'add_revoked(%s)' % serial
with LockedFile(self.revoked_path) as fd:
fd.seek(0, 2)
fd.write('%s %i\n' % (serial, time.time()))
......
......@@ -20,7 +20,7 @@ class CaStub(object):
self.ca_pem = None
self._cache_lock = threading.Lock()
def _request(self, path, method='GET', args=None, parse=True):
def _request(self, path, method='GET', args=None, raw=True):
data = None
if args:
if method == 'GET':
......@@ -34,7 +34,7 @@ class CaStub(object):
except urllib2.URLError, e:
log.error('error accessing %s: %s', path, e)
raise Error(str(e))
if parse:
if not raw:
ctype = response.headers['Content-Type']
if ctype in ('application/x-x509-user-cert',
'application/x-x509-ca-cert'):
......@@ -45,31 +45,37 @@ class CaStub(object):
crypto.FILETYPE_ASN1, response_data)
return response_data
def get_ca(self, parse=True):
def get_ca(self):
with self._cache_lock:
cached_attr = parse and '_ca_parsed' or '_ca_obj'
value = getattr(self, cached_attr, None)
value = getattr(self, '_ca_pem', None)
if not value:
value = self._request('/ca.pem', parse=parse)
setattr(self, cached_attr, value)
value = self._request('/ca.pem', raw=True)
self._ca_pem = value
return value
def get_crl(self, parse=True):
return self._request('/ca.crl', parse=parse)
def get_crl(self, format='der'):
url = (format == 'der') and '/ca.crl' or '/crl.pem'
return self._request(url, raw=True)
def get_certificate(self, cn, parse=True):
def get_certificate(self, cn, raw=True):
try:
return self._request('/get/%s' % cn, parse=parse)
return self._request('/get/%s' % cn, raw=raw)
except Error:
return None
def sign_certificate(self, cn, **subject_attrs):
def make_certificate(self, subject_attrs, days=7, server=False):
pkey = certutil.create_rsa_key_pair()
subject_attrs['CN'] = cn
csr = certutil.create_cert_request(pkey, **subject_attrs)
csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
signed_cert = self._request('/sign', 'POST', {'csr': csr_data})
return pkey, signed_cert
return pkey, self.sign_certificate(csr, days, server)
def sign_certificate(self, req, days=365, server=False):
csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
signed_cert = self._request('/sign', 'POST',
{'csr': csr_data,
'days': str(days),
'server': (server and 'y' or 'n')},
raw=False)
return signed_cert
def revoke_certificate(self, cn):
self._request('/revoke/%s' % cn, 'POST')
......
......@@ -3,6 +3,7 @@ import logging
import sys
from OpenSSL import crypto
from autoca import ca_stub
from autoca import certutil
def writeout(output, data):
......@@ -15,10 +16,6 @@ def writeout(output, data):
outfd.close()
def parse_subject(subjstr):
return dict(x.split('=', 1) for x in subjstr.split(','))
def main():
parser = optparse.OptionParser()
parser.add_option('--url', dest='url',
......@@ -49,8 +46,9 @@ def main():
elif cmd == 'sign':
if not opts.subject:
parser.error('Must specify --subject')
subject = parse_subject(opts.subject)
pkey, cert = ca.sign_certificate(subject['CN'], **subject)
subject = certutil.parse_subject(opts.subject)
pkey, cert = ca.sign_certificate(subject, days=7,
server=opts.server)
writeout(opts.output, crypto.dump_certificate(
crypto.FILETYPE_PEM, cert))
writeout(opts.outkey, crypto.dump_privatekey(
......
......@@ -49,6 +49,10 @@ def sign_certificate(req, ca_key, ca_crt, serial_num, days,
return cert
def parse_subject(subjstr):
return dict(x.split('=', 1) for x in subjstr.split(','))
class FakeRevoked(object):
def set_serial(self, serial):
......
......@@ -2,6 +2,7 @@ from OpenSSL import crypto
import unittest
import tempfile
import shutil
import subprocess
import os
from autoca import ca
......@@ -18,3 +19,11 @@ class CaTestBase(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _run_openssl(self, cmdline, input_data):
pipe = subprocess.Popen(['openssl'] + cmdline.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
output = pipe.communicate(input_data)[0]
return pipe.returncode, output
......@@ -28,6 +28,38 @@ class CaTest(CaTestBase):
ca2 = ca.CA(self.tmpdir, self.ca_subject, bits=1024)
self.assertEquals(self.ca.public_ca_pem, ca2.public_ca_pem)
def test_get_ca(self):
self.assertEquals(self.ca.public_ca_pem,
self.ca.get_ca())
def test_get_crl_pem(self):
crl_pem = self.ca.get_crl(format='pem')
status, output = self._run_openssl(
'crl -inform PEM -noout -text', crl_pem)
self.assertEquals(0, status)
self.assertTrue('No Revoked Certificates.' in output)
def test_get_crl_der(self):
crl_pem = self.ca.get_crl(format='der')
status, output = self._run_openssl(
'crl -inform DER -noout -text', crl_pem)
self.assertEquals(0, status)
self.assertTrue('No Revoked Certificates.' in output)
def test_get_missing_certificate(self):
result = self.ca.get_certificate('missing', raw=False)
self.assertEquals(None, result)
result = self.ca.get_certificate('missing', raw=True)
self.assertEquals(None, result)
def test_get_missing_serial(self):
result = self.ca.get_serial('missing')
self.assertEquals(None, result)
def test_make_certificate(self):
pkey, crt = self.ca.make_certificate({'CN': 'testcn'})
self.assertEquals('testcn', crt.get_subject().CN)
def test_sign_certificate(self):
pkey = certutil.create_rsa_key_pair()
request = certutil.create_cert_request(pkey, CN='testcn')
......@@ -77,6 +109,7 @@ class CaTest(CaTestBase):
result = self.ca.sign_certificate(request)
self.assertTrue(result is not None)
self.assertEquals('testcn', result.get_subject().CN)
serial = result.get_serial_number()
# Check that the certificate is now stored in the CA db.
result2 = self.ca.get_certificate('testcn')
......@@ -87,9 +120,24 @@ class CaTest(CaTestBase):
result3 = self.ca.get_certificate('testcn')
self.assertEquals(None, result3)
# Try to revoke it twice, expect no errors.
# Check that the serial appears in the revocation list.
revoked = set(x[0] for x in self.ca.storage.get_revoked())
self.assertTrue(serial in revoked)
# Try to revoke it twice, expect no errors. Detect unexpected
# calls to _update_crl and raise an exception.
def unexpected(x):
raise Exception('unexpected call')
self.ca._update_crl = unexpected
self.ca.revoke_certificate('testcn')
# Examine the CRL and verify that it contains the serial.
crl = self.ca.get_crl(format='pem')
status, output = self._run_openssl(
'crl -inform PEM -noout -text', crl)
self.assertEquals(0, status)
self.assertTrue(str(serial) in output)
if __name__ == '__main__':
unittest.main()
......@@ -41,30 +41,20 @@ class CaStubTest(mox.MoxTestBase, CaTestBase):
CaTestBase.tearDown(self)
mox.MoxTestBase.tearDown(self)
def test_get_ca(self):
def test_get_ca_raw(self):
urllib2.urlopen(IsUrl('/ca.pem')).AndReturn(
FakeResponse(self.ca.public_ca_pem,
'application/x-x509-ca-cert'))
self.mox.ReplayAll()
result = self.stub.get_ca()
self.assertEquals(self.ca.ca_subject['CN'],
result.get_subject().CN)
self.assertTrue(isinstance(result, basestring))
self.assertEquals(self.ca.public_ca_pem, result)
# Test caching (mox will catch a second urllib call).
result2 = self.stub.get_ca()
self.assertEquals(result, result2)
def test_get_ca_raw(self):
urllib2.urlopen(IsUrl('/ca.pem')).AndReturn(
FakeResponse(self.ca.public_ca_pem,
'application/x-x509-ca-cert'))
self.mox.ReplayAll()
result = self.stub.get_ca(parse=False)
self.assertTrue(isinstance(result, basestring))
self.assertEquals(self.ca.public_ca_pem, result)
def test_get_crl(self):
urllib2.urlopen(IsUrl('/ca.crl')).AndReturn(
FakeResponse(self.ca.crl_data_der,
......@@ -84,7 +74,7 @@ class CaStubTest(mox.MoxTestBase, CaTestBase):
FakeResponse(cert_str, 'application/x-x509-user-cert'))
self.mox.ReplayAll()
result = self.stub.get_certificate('testcn')
result = self.stub.get_certificate('testcn', raw=False)
self.assertEquals('testcn', result.get_subject().CN)
def test_get_certificate_raw(self):
......@@ -97,7 +87,7 @@ class CaStubTest(mox.MoxTestBase, CaTestBase):
FakeResponse(cert_str, 'application/x-x509-user-cert'))
self.mox.ReplayAll()
result2 = self.stub.get_certificate('testcn', parse=False)
result2 = self.stub.get_certificate('testcn', raw=True)
self.assertTrue(isinstance(result2, basestring))
self.assertEquals(cert_str, result2)
......@@ -109,7 +99,7 @@ class CaStubTest(mox.MoxTestBase, CaTestBase):
result = self.stub.get_certificate('missingcn')
self.assertEquals(None, result)
def test_sign_certificate(self):
def test_make_certificate(self):
pkey = certutil.create_rsa_key_pair()
csr = certutil.create_cert_request(pkey, CN='testcn')
cert = self.ca.sign_certificate(csr)
......@@ -119,7 +109,7 @@ class CaStubTest(mox.MoxTestBase, CaTestBase):
FakeResponse(cert_str, 'application/x-x509-user-cert'))
self.mox.ReplayAll()
key, result = self.stub.sign_certificate('testcn')
key, result = self.stub.make_certificate({'CN': 'testcn'})
self.assertTrue(key is not None)
self.assertEquals('testcn', result.get_subject().CN)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment