ca_app.py 2.11 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
import functools
import logging
from OpenSSL import crypto
from flask import Flask, abort, redirect, request, make_response

app = Flask(__name__)
log = logging.getLogger(__name__)


def init_app(ca, config):
    app.config.update(config)
    app.config.from_envvar('CA_SETTINGS', silent=True)
    app.ca = ca


def content_type(ctype):
    """Decorator to correctly return X509 certificates."""
    def _ctype_decorator(fn):
        @functools.wraps(fn)
        def _ctype_wrapper(*args, **kwargs):
            resp = fn(*args, **kwargs)
            if isinstance(resp, basestring):
                resp = make_response(resp, 200)
                resp.headers['Content-Type'] = ctype
            return resp
        return _ctype_wrapper
    return _ctype_decorator


@app.route('/ca.pem')
@content_type('application/x-x509-ca-cert')
def get_ca():
    return app.ca.public_ca_pem


@app.route('/crl.pem')
@content_type('application/x-x509-ca-cert')
def get_crl_pem():
    return app.ca.crl_data_pem


@app.route('/ca.crl')
@content_type('application/x-pkcs7-crl')
def get_crl_der():
    return app.ca.crl_data_der


@app.route('/get/<cn>')
@content_type('application/x-x509-user-cert')
def get_certificate(cn):
    cert = app.ca.get_certificate(cn)
    if not cert:
        abort(404)
    return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)


@app.route('/revoke/<cn>', methods=['POST'])
def revoke(cn):
    app.ca.revoke_certificate(cn)
    return 'ok'


@app.route('/sign', methods=['POST'])
@content_type('application/x-x509-user-cert')
def sign():
    if not request.form.get('csr'):
        abort(400)
    try:
        csr_data = request.form['csr']
        csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_data)
    except Exception, e:
        log.exception('error decoding CSR data: %s', e)
        abort(400)
    server = (request.form.get('server', 'y') == 'y')
    days = int(request.form.get('days', 15))
    signed_cert = app.ca.sign_certificate(csr, days=days, server=server)
    return crypto.dump_certificate(crypto.FILETYPE_PEM, signed_cert)


def make_app(ca_instance):
    init_app(ca_instance, {})
    return app.wsgi_app