from OpenSSL import crypto import logging import os import subprocess import time from autoca import ca_storage from autoca import certutil log = logging.getLogger(__name__) class CA(object): def __init__(self, root, subject, bits=1024, digest='sha1'): self.ca_subject = subject self.bits = bits self.digest = digest self.storage = ca_storage.FileStorage(root) self._init_ca() self._load_crl() def _generate_ca_cert(self): ca_req = certutil.create_cert_request( self.ca_key, **(self.ca_subject)) self.ca_crt = certutil.sign_certificate( ca_req, self.ca_key, ca_req, 1, 3650, extensions=[ crypto.X509Extension('basicConstraints', True, 'CA:TRUE, pathlen:0'), crypto.X509Extension('keyUsage', True, 'keyCertSign, cRLSign'), #crypto.X509Extension('subjectKeyIdentifier', False, # 'hash', subject=ca_req), ], digest=self.digest) crt_str = crypto.dump_certificate( crypto.FILETYPE_PEM, self.ca_crt) self.storage.set_ca( crypto.dump_privatekey(crypto.FILETYPE_PEM, self.ca_key), crt_str) self.public_ca_pem = crt_str def _init_ca(self): key_str, crt_str = self.storage.get_ca() if key_str: self.ca_key = crypto.load_privatekey( crypto.FILETYPE_PEM, key_str) self.ca_crt = crypto.load_certificate( crypto.FILETYPE_PEM, crt_str) self.public_ca_pem = crt_str else: log.info('initializing CA certificate and private key') self.ca_key = certutil.create_rsa_key_pair(self.bits) self._generate_ca_cert() def renew_ca(self): if not self.ca_key: log.error('CA private key not available') return log.info('renewing CA certificate') self._generate_ca_cert() def get_ca(self): return self.public_ca_pem def make_certificate(self, subject_attrs, days=7, server=False): pkey = certutil.create_rsa_key_pair() csr = certutil.create_cert_request(pkey, **subject_attrs) return pkey, self.sign_certificate(csr, days, server) def sign_certificate(self, req, days=365, server=False): cn = req.get_subject().CN log.info('sign request for cn=%s', cn) cert = self.get_certificate(cn) if cert: log.info('a valid certificate already exists for cn=%s, ' 'revoking it', cn) self._revoke_certificate(cn, cert.get_serial_number()) new_serial = self.storage.get_next_serial() extensions = [ crypto.X509Extension('basicConstraints', True, 'CA:FALSE'), crypto.X509Extension('keyUsage', True, '%sdigitalSignature, keyEncipherment' % ( server and '' or 'nonRepudiation, ')), crypto.X509Extension('extendedKeyUsage', False, server and 'serverAuth' or 'clientAuth'), crypto.X509Extension('nsCertType', False, server and 'client, server' or 'client'), ] cert = certutil.sign_certificate( req, self.ca_key, self.ca_crt, new_serial, days, extensions=extensions, digest=self.digest) self.storage.store_certificate( cn, crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) return cert def _update_crl(self): crl = crypto.CRL() for serial, stamp in self.storage.get_revoked(): revoked = crypto.Revoked() revoked.set_serial(str(serial)) revoked.set_rev_date( time.strftime('%Y%m%d%H%M%SZ', time.gmtime(stamp))) crl.add_revoked(revoked) self.storage.set_crl( crl.export(self.ca_crt, self.ca_key, crypto.FILETYPE_PEM, 30)) self._load_crl() def _load_crl(self): self.crl_data_pem = self.storage.get_crl() if not self.crl_data_pem: # Create an empty CRL. crl = crypto.CRL() self.crl_data_pem = crl.export(self.ca_crt, self.ca_key, crypto.FILETYPE_PEM, 30) self.storage.set_crl(self.crl_data_pem) # Re-read the CRL data in DER and PEM formats. pipe = subprocess.Popen( ['openssl', 'crl', '-inform', 'PEM', '-outform', 'DER'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) self.crl_data_der = pipe.communicate(self.crl_data_pem)[0] def get_crl(self, format='der'): if format == 'der': return self.crl_data_der else: return self.crl_data_pem def _revoke_certificate(self, cn, serial_num): log.debug('revoking certificate: cn=%s, serial=%s', cn, serial_num) revoked = set(x[0] for x in self.storage.get_revoked()) if serial_num in revoked: return self.storage.delete_certificate(cn) self.storage.add_revoked(serial_num) self._update_crl() def revoke_certificate(self, cn): serial_num = self.get_serial(cn) if serial_num: self._revoke_certificate(cn, serial_num) def get_certificate(self, cn, raw=False): data = self.storage.get_certificate(cn) if data and not raw: data = crypto.load_certificate(crypto.FILETYPE_PEM, data) return data def get_serial(self, cn): crt = self.get_certificate(cn, raw=False) if crt: return crt.get_serial_number()