ca.py 5.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from OpenSSL import crypto
import logging
import os
import subprocess
import time

from autoca import ca_storage
from autoca import certutil

log = logging.getLogger(__name__)


class CA(object):

    def __init__(self, root, subject, bits=1024, digest='sha1'):
        self.ca_subject = subject
        self.bits = bits
        self.digest = digest
        self.storage = ca_storage.FileStorage(root)
        self._init_ca()
ale's avatar
ale committed
21
        self._load_crl()
22

ale's avatar
ale committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
    def _generate_ca_cert(self):
        ca_req = certutil.create_cert_request(
            self.ca_key, **(self.ca_subject))
        self.ca_crt = certutil.sign_certificate(
            ca_req, self.ca_key, ca_req, 1, 3650,
            extensions=[
                crypto.X509Extension('basicConstraints', True,
                                     'CA:TRUE, pathlen:0'),
                crypto.X509Extension('keyUsage', True,
                                     'keyCertSign, cRLSign'),
                #crypto.X509Extension('subjectKeyIdentifier', False,
                #                     'hash', subject=ca_req),
                ],
            digest=self.digest)

        crt_str = crypto.dump_certificate(
            crypto.FILETYPE_PEM, self.ca_crt)
        self.storage.set_ca(
            crypto.dump_privatekey(crypto.FILETYPE_PEM, self.ca_key),
            crt_str)
        self.public_ca_pem = crt_str

45 46 47 48 49 50 51 52 53 54 55
    def _init_ca(self):
        key_str, crt_str = self.storage.get_ca()
        if key_str:
            self.ca_key = crypto.load_privatekey(
                crypto.FILETYPE_PEM, key_str)
            self.ca_crt = crypto.load_certificate(
                crypto.FILETYPE_PEM, crt_str)
            self.public_ca_pem = crt_str
        else:
            log.info('initializing CA certificate and private key')
            self.ca_key = certutil.create_rsa_key_pair(self.bits)
ale's avatar
ale committed
56
            self._generate_ca_cert()
57

ale's avatar
ale committed
58 59 60 61 62 63
    def renew_ca(self):
        if not self.ca_key:
            log.error('CA private key not available')
            return
        log.info('renewing CA certificate')
        self._generate_ca_cert()
64

65 66 67 68 69 70 71 72
    def get_ca(self):
        return self.public_ca_pem

    def make_certificate(self, subject_attrs, days=7, server=False):
        pkey = certutil.create_rsa_key_pair()
        csr = certutil.create_cert_request(pkey, **subject_attrs)
        return pkey, self.sign_certificate(csr, days, server)

73 74 75 76 77 78 79 80 81 82 83 84 85 86
    def sign_certificate(self, req, days=365, server=False):
        cn = req.get_subject().CN
        log.info('sign request for cn=%s', cn)
        cert = self.get_certificate(cn)
        if cert:
            log.info('a valid certificate already exists for cn=%s, '
                     'revoking it', cn)
            self._revoke_certificate(cn, cert.get_serial_number())
        new_serial = self.storage.get_next_serial()
        extensions = [
            crypto.X509Extension('basicConstraints', True, 'CA:FALSE'),
            crypto.X509Extension('keyUsage', True,
                                 '%sdigitalSignature, keyEncipherment' % (
                    server and '' or 'nonRepudiation, ')),
87
            crypto.X509Extension('extendedKeyUsage', False,
88
                                 server and 'serverAuth' or 'clientAuth'),
89
            crypto.X509Extension('nsCertType', False,
ale's avatar
ale committed
90
                                 server and 'client, server' or 'client'),
91 92 93 94 95 96 97 98 99
            ]
        cert = certutil.sign_certificate(
            req, self.ca_key, self.ca_crt, new_serial, days,
            extensions=extensions, digest=self.digest)
        self.storage.store_certificate(
            cn, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
        return cert

    def _update_crl(self):
ale's avatar
ale committed
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        crl = crypto.CRL()
        for serial, stamp in self.storage.get_revoked():
            revoked = crypto.Revoked()
            revoked.set_serial(str(serial))
            revoked.set_rev_date(
                time.strftime('%Y%m%d%H%M%SZ', time.gmtime(stamp)))
            crl.add_revoked(revoked)
        self.storage.set_crl(
            crl.export(self.ca_crt, self.ca_key, crypto.FILETYPE_PEM, 30))
        self._load_crl()

    def _load_crl(self):
        self.crl_data_pem = self.storage.get_crl()
        if not self.crl_data_pem:
            # Create an empty CRL.
            crl = crypto.CRL()
            self.crl_data_pem = crl.export(self.ca_crt, self.ca_key,
                                           crypto.FILETYPE_PEM, 30)
            self.storage.set_crl(self.crl_data_pem)
119

ale's avatar
ale committed
120 121 122 123 124
        # Re-read the CRL data in DER and PEM formats.
        pipe = subprocess.Popen(
            ['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'],
            stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        self.crl_data_der = pipe.communicate(self.crl_data_pem)[0]
125

126 127 128 129 130 131
    def get_crl(self, format='der'):
        if format == 'der':
            return self.crl_data_der
        else:
            return self.crl_data_pem

132 133 134
    def _revoke_certificate(self, cn, serial_num):
        log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num)

135
        revoked = set(x[0] for x in self.storage.get_revoked())
ale's avatar
ale committed
136 137
        if serial_num in revoked:
            return
138 139

        self.storage.delete_certificate(cn)
ale's avatar
ale committed
140
        self.storage.add_revoked(serial_num)
141 142 143 144 145 146 147
        self._update_crl()

    def revoke_certificate(self, cn):
        serial_num = self.get_serial(cn)
        if serial_num:
           self._revoke_certificate(cn, serial_num) 

148
    def get_certificate(self, cn, raw=False):
149
        data = self.storage.get_certificate(cn)
150 151 152
        if data and not raw:
            data = crypto.load_certificate(crypto.FILETYPE_PEM, data)
        return data
153 154

    def get_serial(self, cn):
155
        crt = self.get_certificate(cn, raw=False)
156 157
        if crt:
            return crt.get_serial_number()