Commit 112c04e3 authored by ale's avatar ale

upgrade to CAM v2.0

parent de74ecfb
cam - minimal X509 Certification Authority management
=====================================================
`cam` is a tiny Python program that can be used to manage a X509
certification authority for a small organization. It can only create
server certificates, so this is not going to be useful to manage an
X509-based client authentication infrastructure.
The intended usage involves describing the list of certificates to
generate in a configuration file, and using the `cam' tool to create
and renew them.
Configuration
-------------
The configuration file uses INI-like syntax, consisting of a number of
sections. There are two special sections: `ca` and `global`, any other
section is interpreted as a certificate definition.
The `ca` section contains the attributes of the CA itself, see the
example configuration file to see which attributes are supported.
The `global` section contains configuration parameters for `cam`. The
only configuration parameter supported is `root_dir`, which is where all
the CA private data will be stored. If you leave this parameter empty,
or if you don't define a `global` section at all, this will default to
the directory containing the configuration file.
Certificates are intentified by a ''tag'', (the section name), so for
example given the following configuration snippet::
[web]
cn = www.domain.org
you would use the following command to generate it::
$ cam --config=my.config gen web
Certificates and private keys are saved within the CA data directory,
you can obtain their path with::
$ cam --config=my.config files web
/your/ca/dir/public/certs/web.pem
/your/ca/dir/private/web.key
Installation
------------
The CA private keys are very sensitive information, so you'll want to
store them in some encrypted removable storage. You can bundle the `cam`
application itself with the CA data by using `virtualenv`::
$ virtualenv --no-site-packages /secure/cam
$ virtualenv --relocatable /secure/cam
$ (cd /tmp ; git clone http://git.autistici.org/cam.git \
&& /secure/cam/bin/python setup.py install)
Then you can simply mount your encrypted image wherever there is a
Python interpreter available (well, with the same architecture/OS too)
and run::
$ /secure/cam/bin/cam --config=/secure/ca/my.config ...
#!/usr/bin/python
import os, sys
import logging
sys.path.insert(0, os.path.join(os.path.dirname(sys.argv[0]), 'lib'))
from utils import *
from cfg import *
# commands
from gen import gen
from newca import newca
from list import list
from files import files
from initfs import initfs
from check import check
def Usage():
print '''
CAM v0.1 - (c)2006 by <ale@incal.net>
A Certification Authority manager for complex situations.
Usage: %s <COMMAND> [<ARG>...]
Known commands:
init
Initialize the environment by creating the necessary
directory structure
newca [<RSA_CRT> [<DSA_CRT>]]
Create a new CA certificate (otherwise you can import
your own certificates)
gen <TAG>...
Create (or re-create) the certificates corresponding
to TAG
list
List all known certificates
files <TAG>...
Dump all the certificate-related files of this TAG
check
Should be run weekly from a cron job to warn you if
some certificates are about to expire (controlled by
the 'warning_days' parameter in the 'global' section
of the configuration)
The configuration will be read from '%s'.
It consists of a ini-style file, with one 'ca' section that
specifies global CA parameters, and more sections for each
tag with certificate-specific information. See the examples
for more details on how to write your own configuration.
''' % (sys.argv[0], config_file_path)
sys.exit(0)
if len(sys.argv) < 2 or sys.argv[1] == 'help':
Usage()
cmd = sys.argv[1]
if cmd == 'init':
initfs()
elif cmd == 'gen':
for tag in sys.argv[2:]:
gen(tag)
elif cmd == 'newca':
newca()
elif cmd == 'list':
list(sys.argv[2:])
elif cmd == 'files':
for tag in sys.argv[2:]:
files(tag)
elif cmd == 'check':
check()
else:
print 'Unknown command \'%s\'.' % cmd
Usage()
import errno
import fcntl
import logging
import re
import os
import getpass
import random
import shutil
import tempfile
import time
from cam import openssl_wrap
from cam import utils
log = logging.getLogger(__name__)
class _CAFiles(object):
def __init__(self, basedir, **attrs):
for key, value in attrs.items():
setattr(self, key, os.path.join(basedir, value))
class CA(object):
def __init__(self, basedir, config, password=None):
self._pw = password
self.basedir = basedir
self.config = {'basedir': basedir, 'default_days': '365', 'ou': 'CA',
'days': '3650', 'country': 'XX', 'crl_url': '',
'bits': '4096'}
self.config.update(config)
self.files = _CAFiles(basedir,
conf='conf/ca.conf',
public_key='public/ca.pem',
private_key='private/ca.key',
crl='public/crl.pem',
serial='serial',
crlnumber='crlnumber',
index='index')
self._lock()
def _getpw(self):
if self._pw is None:
self._pw = getpass.getpass(prompt='CA Password: ')
return self._pw
def _lock(self):
self._lockfd = open(os.path.join(self.basedir, '_lock'), 'w+')
n = 3
while True:
try:
fcntl.lockf(self._lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError, e:
if e.errno in (errno.EACCES, errno.EAGAIN):
n -= 1
if n == 0:
log.error('another instance is running')
raise
time.sleep(1)
continue
raise
def _unlock(self):
fcntl.lockf(self._lockfd, fcntl.LOCK_UN)
self._lockfd.close()
def close(self):
self._unlock()
def create(self):
old_umask = os.umask(077)
for pathext in ('', 'conf', 'public', 'public/certs',
'public/crl', 'private', 'newcerts'):
fullpath = os.path.join(self.basedir, pathext)
if not os.path.isdir(fullpath):
os.mkdir(fullpath)
if not os.path.exists(self.files.index):
log.info('creating new index file')
open(self.files.index, 'w').close()
if not os.path.exists(self.files.serial):
serial = random.randint(1, 1000000000)
log.info('initializing serial number (%d)', serial)
with open(self.files.serial, 'w') as fd:
fd.write('%08X\n' % serial)
if not os.path.exists(self.files.crlnumber):
with open(self.files.crlnumber, 'w') as fd:
fd.write('01\n')
# Create the OpenSSL configuration file.
utils.render(self.files.conf, 'openssl_config', self.config)
# Generate keys if they do not exist.
if not os.path.exists(self.files.public_key):
tmpdir = tempfile.mkdtemp()
csr_file = os.path.join(tmpdir, 'ca.csr')
log.info('creating new RSA CA CSR')
openssl_wrap.run_with_config(
self.files.conf, 'req', '-new',
'-passout', 'pass:%s' % self._getpw(),
'-keyout', self.files.private_key, '-out', csr_file)
log.info('self-signing RSA CA certificate')
openssl_wrap.run_with_config(
self.files.conf, 'ca', '-keyfile', self.files.private_key,
'-key', self._getpw(),
'-extensions', 'v3_ca', '-out', self.files.public_key,
'-days', self.config.get('days', self.config['default_days']),
'-selfsign', '-infiles', csr_file)
shutil.rmtree(tmpdir)
os.umask(old_umask)
# Make some files public.
for path in (os.path.join(self.basedir, 'public'),
os.path.join(self.basedir, 'public/certs'),
self.files.public_key):
if os.path.isdir(path):
os.chmod(path, 0755)
else:
os.chmod(path, 0644)
def gencrl(self):
log.info('generating CRL')
openssl_wrap.run_with_config(
self.files.conf, 'ca', '-gencrl', '-out', self.files.crl,
'-key', self._getpw())
os.chmod(self.files.crl, 0644)
def revoke(self, cert):
log.info('revoking certificate %s', cert.name)
openssl_wrap.run_with_config(
self.files.conf, 'ca', '-revoke', cert.public_key_file,
'-key', self._getpw())
self.gencrl()
def generate(self, cert):
expiry = cert.get_expiration_date()
if expiry and expiry > time.time():
log.warn('certificate is still valid, revoking previous version')
self.revoke(cert)
log.info('generating new certificate %s', cert.name)
tmpdir = tempfile.mkdtemp()
try:
csr_file = os.path.join(tmpdir, '%s.csr' % cert.name)
conf_file = os.path.join(tmpdir, '%s.conf' % cert.name)
ext_file = os.path.join(tmpdir, '%s-ext.conf' % cert.name)
conf = {}
conf.update(self.config)
conf['cn'] = cert.cn
conf['days'] = cert.days or self.config['default_days']
if cert.ou:
conf['ou'] = cert.ou
conf['alt_names'] = ''.join(
['DNS.%d=%s\n' % (idx + 1, x)
for idx, x in enumerate(cert.alt_names)])
utils.render(conf_file, 'openssl_config', conf)
utils.render(ext_file, 'ext_config', conf)
openssl_wrap.run_with_config(
conf_file, 'req', '-new', '-keyout', cert.private_key_file,
'-nodes', '-out', csr_file)
os.chmod(cert.private_key_file, 0600)
openssl_wrap.run_with_config(
conf_file, 'ca', '-days', conf['days'],
'-key', self._getpw(),
'-policy', 'policy_anything', '-out', cert.public_key_file,
'-extfile', ext_file, '-infiles', csr_file)
finally:
shutil.rmtree(tmpdir)
import os
import re
import string
import time
from cam import openssl_wrap
def _parse_alt_names(s):
if not s:
return []
if ',' in s:
parts = s.split(',')
else:
parts = s.split()
return [x.strip() for x in parts if x]
class Cert(object):
def __init__(self, ca, name, config):
self.name = name
self.ca = ca
self.cn = config['cn']
self.ou = config.get('ou', '')
self.days = config.get('days')
self.alt_names = _parse_alt_names(config.get('alt_names'))
if self.cn not in self.alt_names:
self.alt_names.insert(0, self.cn)
self.public_key_file = os.path.join(ca.basedir, 'public', 'certs',
'%s.pem' % name)
self.private_key_file = os.path.join(ca.basedir, 'private',
'%s.key' % name)
def get_fingerprint(self, digest='sha1'):
if os.path.exists(self.public_key_file):
output = openssl_wrap.run('x509', '-in', self.public_key_file,
'-noout', '-fingerprint', '-%s' % digest)
m = re.search(r'=(.*)$', output)
if m:
return m.group(1)
return None
def get_expiration_date(self):
if os.path.exists(self.public_key_file):
output = openssl_wrap.run('x509', '-in', self.public_key_file,
'-noout', '-dates')
m = re.search(r'notAfter=(.*)', output)
if m:
return time.mktime(time.strptime(m.group(1),
'%b %d %H:%M:%S %Y %Z'))
return None
def expired(self):
now = time.time()
return self.get_expiration_date() > now
import ConfigParser
import os
from cam import cert
from cam import ca
class ConfigError(Exception):
pass
def read_config(filename):
parser = ConfigParser.ConfigParser()
if not parser.read(filename):
raise ConfigError('File not found: %s' % filename)
root_dir = os.path.dirname(os.path.abspath(filename))
global_config = {}
if parser.has_section('global'):
global_config = dict(parser.items('global'))
root_dir = global_config.get('root_dir', root_dir)
ca_obj = ca.CA(root_dir, dict(parser.items('ca')))
certs = []
for section in parser.sections():
if section in ('ca', 'global'):
continue
certs.append(cert.Cert(ca_obj, section, dict(parser.items(section))))
return global_config, ca_obj, certs
#!/usr/bin/python
import logging
import optparse
import os
import sys
import time
from cam import config
USAGE = '''cam [<OPTIONS>] <COMMAND> [<ARG>...]
CAM v2.0 - (c)2012 by <ale@incal.net>
A Certification Authority manager for complex situations.
Known commands:
init [<RSA_CRT> [<DSA_CRT>]]
Initialize the environment and create a new CA certificate
(you can also import your own existing certificates)
gen <TAG>...
Create (or re-create) the certificates corresponding
to TAG
gencrl
Update the CRL
list
List all known certificates
files <TAG>...
Dump all the certificate-related files of this TAG
check
Should be run weekly from a cron job to warn you if some
certificates are about to expire (controlled by the 'warning_days'
parameter in the 'global' section of the configuration)
The configuration file consists of a ini-style file, with one 'ca'
section that specifies global CA parameters, and more sections for
each tag with certificate-specific information. See the examples for
more details on how to write your own configuration.
'''
def find_cert(certs, name):
for c in certs:
if c.name == name:
return c
raise Exception('Certificate "%s" not found' % name)
def main():
parser = optparse.OptionParser(usage=USAGE)
parser.add_option('-d', '--debug', dest='debug', help='Be verbose',
action='store_true')
parser.add_option('-c', '--config', dest='config', help='Config file')
opts, args = parser.parse_args()
if not opts.config:
parser.error('Must specify --config')
if len(args) < 1:
parser.error('Must specify a command')
logging.basicConfig()
logging.getLogger().setLevel(opts.debug and logging.DEBUG or logging.INFO)
global_config, ca, certs = config.read_config(opts.config)
cmd, args = args[0], args[1:]
try:
if cmd == 'init':
ca.create()
elif cmd == 'gen':
if len(args) != 1:
parser.error('Wrong number of arguments')
ca.generate(find_cert(certs, args[0]))
elif cmd == 'gencrl':
ca.gencrl()
elif cmd == 'files':
if len(args) != 1:
parser.error('Wrong number of arguments')
c = find_cert(certs, args[0])
print c.public_key_file
print c.private_key_file
elif cmd == 'list':
for cert in sorted(certs, key=lambda x: x.name):
print cert.name, cert.cn, cert.get_expiration_date()
elif cmd == 'check':
now = time.time()
warning_time = 8640000 * int(global_config.get('warning_days', 15))
for cert in certs:
exp = cert.get_expiration_date()
if exp and (exp - now) < warning_time:
print '%s (%s) is about to expire.' % (cert.name, cert.cn)
else:
parser.error('unknown command "%s"' % cmd)
finally:
ca.close()
def main_wrapper():
try:
main()
return 0
except Exception, e:
logging.exception('uncaught exception')
return 1
if __name__ == '__main__':
sys.exit(main_wrapper())
import logging
import subprocess
log = logging.getLogger(__name__)
class CommandError(Exception):
pass
def run(*args):
cmd = ['openssl']
cmd.extend(args)
log.debug('executing "%s"' % ' '.join(cmd))
pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE)
stdout, _ = pipe.communicate()
if pipe.returncode != 0:
raise CommandError('openssl exited with status %d' % (
pipe.returncode,))
return stdout
def run_with_config(config_file, *args):
cmd = args[0]
args = args[1:]
return run(cmd, '-config', config_file, '-batch', *args)
basicConstraints = CA:false
nsCertType = client, server
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
nsComment = "%(cn)s"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid, issuer:always
subjectAltName = @subject_alt_name
issuerAltName = issuer:copy
nsCaRevocationUrl = %(crl_url)s
nsRevocationUrl = %(crl_url)s
crlDistributionPoints = @cdp_section
[ subject_alt_name ]
%(alt_names)s
email = copy
[ cdp_section ]
URI.1 = %(crl_url)s
openssl_conf_template = '''
RANDFILE = %(ca_dir)s/.random
RANDFILE = %(basedir)s/.random
[ ca ]
default_ca = CA_default
[ CA_default ]
dir = %(ca_dir)s
dir = %(basedir)s
certs = $dir/public/certs
crl_dir = $dir/public/crl
crl = $dir/public/crl.pem
crlnumber = $dir/crlnumber
database = $dir/index
serial = $dir/serial
new_certs_dir = $dir/newcerts
......@@ -38,7 +37,7 @@ commonName = supplied
emailAddress = optional
[ req ]
default_bits = 4096
default_bits = %(bits)s
default_md = sha1
distinguished_name = req_distinguished_name
attributes = req_attributes
......@@ -76,27 +75,3 @@ nsComment = "%(cn)s"
subjectAltName = email:copy
issuerAltName = issuer:copy
'''
ext_template = '''
basicConstraints = CA:false
nsCertType = client, server
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
nsComment = "%(ca_name)s"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid, issuer:always
subjectAltName = @subject_alt_name
issuerAltName = issuer:copy
nsCaRevocationUrl = %(ca_base_url)s/crl.pem
nsRevocationUrl = %(ca_base_url)s/crl.pem
crlDistributionPoints = @cdp_section
[ subject_alt_name ]
%(alt_names)s
email = copy
[ cdp_section ]
URI.1 = %(ca_base_url)s/crl.pem
'''
import logging
import os
import tempfile
import shutil
import unittest
from cam import ca
from cam import openssl_wrap
logging.basicConfig(level=logging.DEBUG)
class CertStub(object):
def __init__(self, name, cn, tmpdir):