ca_stub.py 2.38 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
import logging
import threading
import urllib
import urllib2
from OpenSSL import crypto

from autoca import certutil

log = logging.getLogger(__name__)


class Error(Exception):
    pass


class CaStub(object):

    def __init__(self, url):
        self.url = url.rstrip('/')
        self.ca_pem = None
        self._cache_lock = threading.Lock()

    def _request(self, path, method='GET', args=None, parse=True):
        data = None
        if args:
            if method == 'GET':
                path = '%s?%s' % (path, urllib.urlencode(args))
            else:
                data = urllib.urlencode(args)
        request = urllib2.Request(self.url + path, data)
        try:
            response = urllib2.urlopen(request)
            response_data = response.read()
        except urllib2.URLError, e:
            log.error('error accessing %s: %s', path, e)
            raise Error(str(e))
        if parse:
            ctype = response.headers['Content-Type']
            if ctype in ('application/x-x509-user-cert',
                         'application/x-x509-ca-cert'):
                return crypto.load_certificate(
                    crypto.FILETYPE_PEM, response_data)
            elif ctype == 'application/x-pkcs7-crl':
                return crypto.load_crl(
                    crypto.FILETYPE_ASN1, response_data)
        return response_data

    def get_ca(self, parse=True):
        with self._cache_lock:
            cached_attr = parse and '_ca_parsed' or '_ca_obj'
            value = getattr(self, cached_attr, None)
            if not value:
                value = self._request('/ca.pem', parse=parse)
                setattr(self, cached_attr, value)
        return value

    def get_crl(self, parse=True):
        return self._request('/ca.crl', parse=parse)

    def get_certificate(self, cn, parse=True):
        try:
            return self._request('/get/%s' % cn, parse=parse)
        except Error:
            return None

    def sign_certificate(self, cn, **subject_attrs):
        pkey = certutil.create_rsa_key_pair()
        subject_attrs['CN'] = cn
        csr = certutil.create_cert_request(pkey, **subject_attrs)
        csr_data = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
        signed_cert = self._request('/sign', 'POST', {'csr': csr_data})
        return pkey, signed_cert

    def revoke_certificate(self, cn):
        self._request('/revoke/%s' % cn, 'POST')