ca.py 7.06 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
from OpenSSL import crypto
import logging
import os
import subprocess
import time

from autoca import ca_storage
from autoca import certutil

log = logging.getLogger(__name__)


class CA(object):
ale's avatar
ale committed
14
    """A Certification Authority stored on the local filesystem."""
15 16 17 18 19 20 21

    def __init__(self, root, subject, bits=1024, digest='sha1'):
        self.ca_subject = subject
        self.bits = bits
        self.digest = digest
        self.storage = ca_storage.FileStorage(root)
        self._init_ca()
ale's avatar
ale committed
22
        self._load_crl()
23

ale's avatar
ale committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
    def _generate_ca_cert(self):
        ca_req = certutil.create_cert_request(
            self.ca_key, **(self.ca_subject))
        self.ca_crt = certutil.sign_certificate(
            ca_req, self.ca_key, ca_req, 1, 3650,
            extensions=[
                crypto.X509Extension('basicConstraints', True,
                                     'CA:TRUE, pathlen:0'),
                crypto.X509Extension('keyUsage', True,
                                     'keyCertSign, cRLSign'),
                #crypto.X509Extension('subjectKeyIdentifier', False,
                #                     'hash', subject=ca_req),
                ],
            digest=self.digest)

        crt_str = crypto.dump_certificate(
            crypto.FILETYPE_PEM, self.ca_crt)
        self.storage.set_ca(
            crypto.dump_privatekey(crypto.FILETYPE_PEM, self.ca_key),
            crt_str)
        self.public_ca_pem = crt_str

46 47 48 49 50 51 52 53 54 55 56
    def _init_ca(self):
        key_str, crt_str = self.storage.get_ca()
        if key_str:
            self.ca_key = crypto.load_privatekey(
                crypto.FILETYPE_PEM, key_str)
            self.ca_crt = crypto.load_certificate(
                crypto.FILETYPE_PEM, crt_str)
            self.public_ca_pem = crt_str
        else:
            log.info('initializing CA certificate and private key')
            self.ca_key = certutil.create_rsa_key_pair(self.bits)
ale's avatar
ale committed
57
            self._generate_ca_cert()
58

ale's avatar
ale committed
59
    def renew_ca(self):
ale's avatar
ale committed
60
        """Renew the CA certificate itself (keeping the private key)."""
ale's avatar
ale committed
61 62 63 64 65
        if not self.ca_key:
            log.error('CA private key not available')
            return
        log.info('renewing CA certificate')
        self._generate_ca_cert()
66

67
    def get_ca(self):
ale's avatar
ale committed
68
        """Return the CA certificate (PEM-encoded)."""
69 70 71
        return self.public_ca_pem

    def make_certificate(self, subject_attrs, days=7, server=False):
ale's avatar
ale committed
72 73 74 75 76 77 78 79 80 81
        """Create a new certificate and private key.

        Args:
          subject_attrs: dict of X509 Subject attributes
          days: days of validity of the new certificate
          server: if True, create a server certificate, otherwise a
                  client one.
        Returns:
          A (private_key, certificate) pair.
        """
82 83 84 85
        pkey = certutil.create_rsa_key_pair()
        csr = certutil.create_cert_request(pkey, **subject_attrs)
        return pkey, self.sign_certificate(csr, days, server)

ale's avatar
ale committed
86 87 88 89 90 91 92 93 94 95 96
    def sign_certificate(self, req, days=7, server=False):
        """Sign a certificate request.

        Args:
          req: CSR object
          days: days of validity of the new certificate
          server: if True, create a server certificate, otherwise a
                  client one.
        Returns:
          A X509 certificate, PEM-encoded.
        """
97 98 99 100 101 102 103 104 105 106 107 108 109
        cn = req.get_subject().CN
        log.info('sign request for cn=%s', cn)
        cert = self.get_certificate(cn)
        if cert:
            log.info('a valid certificate already exists for cn=%s, '
                     'revoking it', cn)
            self._revoke_certificate(cn, cert.get_serial_number())
        new_serial = self.storage.get_next_serial()
        extensions = [
            crypto.X509Extension('basicConstraints', True, 'CA:FALSE'),
            crypto.X509Extension('keyUsage', True,
                                 '%sdigitalSignature, keyEncipherment' % (
                    server and '' or 'nonRepudiation, ')),
110
            crypto.X509Extension('extendedKeyUsage', False,
111
                                 server and 'clientAuth, serverAuth' or 'clientAuth'),
112
            crypto.X509Extension('nsCertType', False,
ale's avatar
ale committed
113
                                 server and 'client, server' or 'client'),
114 115 116 117 118 119 120 121
            ]
        cert = certutil.sign_certificate(
            req, self.ca_key, self.ca_crt, new_serial, days,
            extensions=extensions, digest=self.digest)
        self.storage.store_certificate(
            cn, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
        return cert

ale's avatar
ale committed
122 123 124 125 126 127
    def update_crl(self):
        """Update the CRL.

        This function will generate a newly signed CRL every time it
        is called.
        """
ale's avatar
ale committed
128 129 130 131 132 133 134 135
        crl = crypto.CRL()
        for serial, stamp in self.storage.get_revoked():
            revoked = crypto.Revoked()
            revoked.set_serial(str(serial))
            revoked.set_rev_date(
                time.strftime('%Y%m%d%H%M%SZ', time.gmtime(stamp)))
            crl.add_revoked(revoked)
        self.storage.set_crl(
ale's avatar
ale committed
136 137
            crl.export(self.ca_crt, self.ca_key, crypto.FILETYPE_PEM, 30,
                       digest='sha256'))
ale's avatar
ale committed
138 139 140 141 142 143 144 145
        self._load_crl()

    def _load_crl(self):
        self.crl_data_pem = self.storage.get_crl()
        if not self.crl_data_pem:
            # Create an empty CRL.
            crl = crypto.CRL()
            self.crl_data_pem = crl.export(self.ca_crt, self.ca_key,
ale's avatar
ale committed
146 147
                                           crypto.FILETYPE_PEM, 30,
                                           digest='sha256')
ale's avatar
ale committed
148
            self.storage.set_crl(self.crl_data_pem)
149

ale's avatar
ale committed
150 151 152 153 154
        # Re-read the CRL data in DER and PEM formats.
        pipe = subprocess.Popen(
            ['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'],
            stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        self.crl_data_der = pipe.communicate(self.crl_data_pem)[0]
155

156
    def get_crl(self, format='der'):
ale's avatar
ale committed
157
        """Return the CRL (PEM- or DER-encoded)."""
158 159 160 161 162
        if format == 'der':
            return self.crl_data_der
        else:
            return self.crl_data_pem

163 164 165
    def _revoke_certificate(self, cn, serial_num):
        log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num)

166
        revoked = set(x[0] for x in self.storage.get_revoked())
ale's avatar
ale committed
167 168
        if serial_num in revoked:
            return
169 170

        self.storage.delete_certificate(cn)
ale's avatar
ale committed
171
        self.storage.add_revoked(serial_num)
ale's avatar
ale committed
172
        self.update_crl()
173 174

    def revoke_certificate(self, cn):
ale's avatar
ale committed
175
        """Revoke a certificate."""
176 177 178 179
        serial_num = self.get_serial(cn)
        if serial_num:
           self._revoke_certificate(cn, serial_num) 

180
    def get_certificate(self, cn, raw=False):
ale's avatar
ale committed
181 182 183 184 185
        """Return a certificate given its subject.

        If raw is True, the returned data will be in DER format,
        otherwise it will be PEM-encoded.
        """
186
        data = self.storage.get_certificate(cn)
187 188 189
        if data and not raw:
            data = crypto.load_certificate(crypto.FILETYPE_PEM, data)
        return data
190 191

    def get_serial(self, cn):
ale's avatar
ale committed
192
        """Return the serial number of the certificate given its subject."""
193
        crt = self.get_certificate(cn, raw=False)
194 195
        if crt:
            return crt.get_serial_number()