Commit e1c1725d authored by ale's avatar ale

import iniziale

parents
#!/usr/bin/python
import os, sys
import logging
sys.path.append(os.path.join(os.path.dirname(sys.argv[0]), 'lib'))
from utils import *
from cfg import *
# commands
from gen import gen
from newca import newca
from list import list
from files import files
from initfs import initfs
from check import check
def Usage():
print '''
CAM v0.1 - (c)2006 by <ale@incal.net>
A Certification Authority manager for complex situations.
Usage: %s <COMMAND> [<ARG>...]
Known commands:
init
Initialize the environment by creating the necessary
directory structure
newca [<RSA_CRT> [<DSA_CRT>]]
Create a new CA certificate (otherwise you can import
your own certificates)
gen <TAG>...
Create (or re-create) the certificates corresponding
to TAG
list
List all known certificates
files <TAG>...
Dump all the certificate-related files of this TAG
check
Should be run weekly from a cron job to warn you if
some certificates are about to expire (controlled by
the 'warning_days' parameter in the 'global' section
of the configuration)
The configuration will be read from '%s'.
It consists of a ini-style file, with one 'ca' section that
specifies global CA parameters, and more sections for each
tag with certificate-specific information. See the examples
for more details on how to write your own configuration.
''' % (sys.argv[0], config_file_path)
sys.exit(0)
if len(sys.argv) < 2 or sys.argv[1] == 'help':
Usage()
cmd = sys.argv[1]
if cmd == 'init':
initfs()
elif cmd == 'gen':
for tag in sys.argv[2:]:
gen(tag)
elif cmd == 'newca':
newca()
elif cmd == 'list':
list(sys.argv[2:])
elif cmd == 'files':
for tag in sys.argv[2:]:
files(tag)
elif cmd == 'check':
check()
else:
print 'Unknown command \'%s\'.' % cmd
Usage()
[global]
root = /src/cam/test
[ca]
name = Example Certification Authority
org = Example
country = AA
default_days = 365
email = ca@domain.org
base_url = http://ca.domain.org/public/
[web]
cn = www.domain.org
alt_names = www1.domain.org, www2.domain.org,
www3.domain.org, www4.domain.org, www5.domain.org
ou = Example web services
[imap]
cn = mail.domain.org
alt_names = mail.domain.org, imap.domain.org, domain.org
ou = Example mail services
[smtp]
cn = smtp.domain.org
alt_names = mx1.domain.org, mx2.domain.org, mx3.domain.org,
mx4.domain.org, mx5.domain.org, smtp.domain.org,
mx.domain.org
ou = Example mail services
__all__ = [ 'cfg', 'ca_base', 'ca', 'config_file_path' ]
import os, sys
import ConfigParser
from utils import *
cfg = ConfigParser.ConfigParser()
config_file_path = os.path.join(os.path.dirname(sys.argv[0]),'config')
cfg.read(config_file_path)
ca = cfg2dict(cfg, 'ca')
ca_base = cfg.get('global', 'root')
File added
import os
from time import time, strftime, localtime
from utils import *
from paths import *
from cfg import *
def check():
now = time()
warning_days = 15
if cfg.has_option('global', 'warning_days'):
warning_days = int(cfg.get('global', 'warning_days'))
for tag in cfg.sections():
if tag == 'global' or tag == 'ca' or tag == 'DEFAULT':
continue
crt_file = getpath('rsa_crt', tag)
crt_date = getcertdate(crt_file)
if crt_date:
days = int((now - crt_date)/86400)
if days > -warning_days:
print '''
The certificate '%s' should be renewed!
(Expiration date: %s)
''' % (tag, strftime('%d/%m/%Y', localtime(crt_date)))
import os
from cfg import *
from paths import *
fields = [
('conf', 'OpenSSL configuration file'),
('rsa_key', 'RSA private key'),
('rsa_crt', 'RSA certificate'),
('dsa_key', 'DSA private key'),
('dsa_crt', 'DSA certificate'),
('dsa_parms', 'DSA parameters'),
('public_crt', 'public certificates bundle'),
('singlefile', 'single cert. file (with keys)')
]
def files(tag):
print '''
Files related to the '%s' certificates:
''' % tag
for k, desc in fields:
p = getpath(k, tag)
star = ' '
if not os.path.exists(p):
star = '*'
print '%-30s %s%s' % (desc, star, p)
print '(* = not found)'
print
import os, sys
import logging
from utils import *
from templates import *
from paths import *
from cfg import *
def gen(tag):
info = cfg2dict(cfg, tag)
conf_file = getpath('conf', tag)
rsa_key_file = getpath('rsa_key', tag)
dsa_key_file = getpath('dsa_key', tag)
dsa_parms_file = getpath('dsa_parms', tag)
csr_file = getpath('rsa_csr', tag)
dsa_csr_file = getpath('dsa_csr', tag)
ext_file = getpath('ext', tag)
public_crt_file = getpath('public_crt', tag)
crt_file = getpath('rsa_crt', tag)
dsa_crt_file = getpath('dsa_crt', tag)
sf_file = getpath('singlefile', tag)
if os.path.exists(public_crt_file):
print
if expired(getcertdate(public_crt_file)):
print 'Certificate has expired. Ready to re-generate.'
else:
print
ans = raw_input('This certificate seems to exist already (in %s).\nAre you really sure that you want to re-create it? [y/N] ' % crt_file)
if not ans or ans[0].lower() != 'y':
sys.exit(0)
# create custom config file
template(conf_file,
openssl_conf_template,
dict(
ca_dir = ca_base,
default_days = ca['default_days'],
country = d2get(info, ca, 'country'),
org = d2get(info, ca, 'org'),
ou = d2get(info, ca, 'ou', ''),
cn = info['cn'],
email = d2get(info, ca, 'email')))
# create dsa parameters
openssl('dsaparam', '-out', dsa_parms_file, '1024')
# create rsa key
openssl('req', '-batch', '-new', '-keyout', rsa_key_file,
'-config', conf_file, '-nodes', '-out', csr_file)
openssl('req', '-batch', '-new', '-newkey', 'dsa:' + dsa_parms_file,
'-keyout', dsa_key_file, '-nodes',
'-config', conf_file, '-out', dsa_csr_file)
# create ext file
altnames = [ x.strip() for x in info['alt_names'].split(',') ]
altnames_s = ''
for i in range(len(altnames)):
altnames_s += 'DNS.%d=%s\n' % (i + 1, altnames[i])
template(ext_file,
ext_template,
dict(
ca_name = ca['name'],
ca_base_url = ca['base_url'],
alt_names = altnames_s))
# sign requests
openssl('ca', '-days', ca['default_days'],
'-config', conf_file, '-batch',
'-policy', 'policy_anything',
'-out', crt_file,
'-extfile', ext_file,
'-infiles', csr_file)
openssl('ca', '-days', ca['default_days'],
'-config', conf_file, '-batch',
'-policy', 'policy_anything',
'-out', dsa_crt_file,
'-extfile', ext_file,
'-infiles', dsa_csr_file)
f = open(public_crt_file, 'w')
f.write(open(crt_file, 'r').read())
f.write(open(dsa_crt_file, 'r').read())
f.close()
# create single-file file
f = open(sf_file, 'w')
f.write(open(crt_file, 'r').read())
f.write(open(dsa_crt_file, 'r').read())
f.write(open(rsa_key_file, 'r').read())
f.write(open(dsa_key_file, 'r').read())
f.close()
logging.info('created certificate %s [%s]' % (tag, info['cn']))
print '''
Certificate '%s':
CN: %s
AltNames: %s
RSA key: %s
DSA key: %s
public crt: %s
all-in-one: %s
''' % (tag, info['cn'], ', '.join(altnames),
rsa_key_file, dsa_key_file, public_crt_file, sf_file)
import os
import logging
from utils import *
from cfg import *
def initfs():
mkdir(ca_base)
for sub in [ 'archive', 'ext', 'conf',
'private', 'private/certs', 'private/single-file',
'public', 'public/certs', 'public/crl', 'newcerts' ]:
mkdir(os.path.join(ca_base, sub))
os.chmod(os.path.join(ca_base, 'private'), 0700)
logging.info('created directory structure')
import os, re
import commands
from time import time, strftime, localtime
from utils import *
from templates import *
from paths import *
from cfg import *
def list(args):
verbose = 0
if len(args) > 0 and args[0] == '-v':
verbose = 1
print '''
List of known certificates for '%s' CA:
''' % ca['name']
i = 0
sections = cfg.sections()
sections.sort()
for sec in sections:
if sec == 'ca' or sec == 'global':
continue
i += 1
crt_file = getpath('rsa_crt', sec)
dsa_crt_file = getpath('dsa_crt', sec)
if os.path.exists(crt_file):
crt_date = getcertdate(crt_file)
days = int((time() - crt_date) / 86400)
if days > 0:
days_s = '%d days ago' % days
else:
days_s = '%d days left' % (-days)
if expired(crt_date):
m = 'EXPIRED %s' % days_s.upper()
star = '*'
else:
if verbose:
m = '\n%21s Expiration: %s (%s)' % \
(' ',
strftime('%d %b %Y', localtime(crt_date)),
days_s)
else:
m = 'exp. %s' % strftime('%d/%m/%Y',
localtime(crt_date))
star = ' '
else:
m = 'NOT FOUND'
star = '*'
print '%3d. %s%-15s %s [%s] - %s' % (i, star, sec, cfg.get(sec, 'cn'), cfg.get(sec, 'alt_names').replace('\n', ' '), m)
if star != '*' and verbose:
# do fingerprints
for hname, hash in [ ('MD5', 'md5'), ('SHA1', 'sha1') ]:
for cypher, file in [ ( 'RSA', crt_file), ( 'DSA', dsa_crt_file ) ]:
fp = fingerprint(hash, file)
print '%21s %s %s fingerprint: %s' % ('', cypher, hname, fp)
if verbose:
print
print '''
(* = certificate does not exist, create with 'cam gen <name>')
'''
import os, logging
from utils import *
from templates import *
from cfg import *
def newca():
conf_file = os.path.join(ca_base, 'conf/ca.conf')
ca_file = os.path.join(ca_base, 'public/ca.pem')
ca_dsa_file = os.path.join(ca_base, 'public/ca-dsa.tmp')
ca_key_file = os.path.join(ca_base, 'private/ca.key')
ca_dsa_key_file = os.path.join(ca_base, 'private/ca-dsa.key')
ca_csr_file = os.path.join(ca_base, 'newcerts/ca.csr')
ca_dsa_csr_file = os.path.join(ca_base, 'newcerts/ca-dsa.csr')
dsa_parms_file = os.path.join(ca_base, 'private/ca.dsap')
serial_file = os.path.join(ca_base, 'serial')
index_file = os.path.join(ca_base, 'index')
if not os.path.exists(serial_file):
open(serial_file, 'w').write('01')
if not os.path.exists(index_file):
open(index_file, 'w').close()
template(conf_file,
openssl_conf_template,
dict(
ca_dir = ca_base,
default_days = ca['default_days'],
country = ca['country'],
org = ca['org'],
ou = ca.get('ou', ''),
cn = ca['name'],
email = ca['email']))
if not os.path.exists(dsa_parms_file):
openssl('dsaparam', '-out', dsa_parms_file, '1024')
logging.info('generated CA DSA parameters')
if not os.path.exists(ca_file):
openssl('req', '-new', '-keyout', ca_key_file,
'-config', conf_file, '-batch',
'-out', ca_csr_file)
openssl('req', '-new', '-newkey', 'dsa:' + dsa_parms_file,
'-config', conf_file, '-batch',
'-keyout', ca_dsa_key_file,
'-out', ca_dsa_csr_file)
openssl('ca',
'-config', conf_file, '-batch',
'-keyfile', ca_key_file,
'-extensions', 'v3_ca', '-selfsign',
'-out', ca_file,
'-infiles', ca_csr_file)
openssl('ca',
'-config', conf_file, '-batch',
'-keyfile', ca_dsa_key_file,
'-extensions', 'v3_ca', '-selfsign',
'-out', ca_dsa_file,
'-infiles', ca_dsa_csr_file)
open(ca_file, 'a').write(open(ca_dsa_file, 'r').read())
os.remove(ca_dsa_file)
logging.info('created CA certificates')
__all__ = [ 'getpath' ]
import os
from cfg import *
path_exts = dict(
conf = 'conf/%s.conf',
rsa_key = 'private/%s.key',
dsa_key = 'private/%s-dsa.key',
dsa_parms = 'private/%s.dsap',
rsa_csr = 'newcerts/%s.csr',
dsa_csr = 'newcerts/%s-dsa.csr',
rsa_crt = 'private/certs/%s.pem',
dsa_crt = 'private/certs/%s-dsa.pem',
ext = 'ext/%s.ext',
public_crt = 'public/certs/%s.pem',
singlefile = 'private/single-file/%s.pem',
)
def getpath(what, tag):
return os.path.join(ca_base, path_exts[what] % tag)
openssl_conf_template = '''
RANDFILE = %(ca_dir)s/.random
[ ca ]
default_ca = CA_default
[ CA_default ]
dir = %(ca_dir)s
certs = $dir/public/certs
crl_dir = $dir/public/crl
crl = $dir/public/crl.pem
database = $dir/index
serial = $dir/serial
new_certs_dir = $dir/newcerts
certificate = $dir/public/ca.pem
private_key = $dir/private/ca.key
x509_extensions = certificate_extensions
email_in_dn = no
default_days = %(default_days)s
default_crl_days = 31
default_md = sha1
preserve = yes
policy = policy_match
[ policy_match ]
countryName = supplied
organizationName = supplied
organizationalUnitName = optional
commonName = supplied
emailAddress = supplied
[ policy_anything ]
countryName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
[ req ]
default_bits = 4096
default_md = sha1
distinguished_name = req_distinguished_name
attributes = req_attributes
x509_extensions = v3_ca
string_mask = nombstr
[ req_distinguished_name ]
countryName = Country Name
countryName_default = "%(country)s"
countryName_min = 2
countryName_max = 2
0.organizationName = Organization Name
0.organizationName_default = "%(org)s"
organizationalUnitName = Organizational Unit Name
organizationalUnitName_default = "%(ou)s"
commonName = Common Name
commonName_max = 64
commonName_default = "%(cn)s"
emailAddress = Email Address
emailAddress_max = 60
emailAddress_default = "%(email)s"
SET-ex3 = SET extension number 3
[ req_attributes ]
[ certificate_extensions ]
[ v3_ca ]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid:always,issuer:always
basicConstraints = critical, CA:true
keyUsage = cRLSign, keyCertSign
nsCertType = sslCA, emailCA, objCA
nsComment = "%(cn)s"
subjectAltName = email:copy
issuerAltName = issuer:copy
'''
ext_template = '''
basicConstraints = CA:false
nsCertType = client, server
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
nsComment = "%(ca_name)s"
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid, issuer:always
subjectAltName = @subject_alt_name
issuerAltName = issuer:copy
nsCaRevocationUrl = %(ca_base_url)s/crl.pem
nsRevocationUrl = %(ca_base_url)s/crl.pem
crlDistributionPoints = @cdp_section
[ subject_alt_name ]
%(alt_names)s
email = copy
[ cdp_section ]
URI.1 = %(ca_base_url)s/crl.pem
'''
__all__ = [
'getcertdate', 'fingerprint', 'expired',
'cfg2dict', 'mkdir', 'template',
'dictmerge', 'd2get',
'openssl'
]
import commands
import os, re
import time
def getcertdate(crt_file):
if os.path.exists(crt_file):
o = commands.getoutput('openssl x509 -in %s -noout -dates' % crt_file)
m = re.search(r'notAfter=(.*)', o)
if m:
return time.mktime(time.strptime(m.group(1),
'%b %d %H:%M:%S %Y %Z'))
return 0
def expired(crtdate):
if crtdate < time.time():
return 1
else:
return 0
def fingerprint(hash, crt_file):
if os.path.exists(crt_file):
o = commands.getoutput('openssl x509 -in %s -noout -fingerprint -%s' % (crt_file, hash))
m = re.search(r'=(.*)$', o)
if m:
return m.group(1)
return 'NOT FOUND'
def cfg2dict(cfg, section):
d = dict()
for k, v in cfg.items(section):
d[k] = v
return d
def mkdir(p):
try:
os.mkdir(p, 0755)
print 'created directory \'%s\'' % p
except OSError:
pass
def template(file, t, args):
f = open(file, 'w')
f.write(t % args)
f.close()
def dictmerge(d1, d2):
out = dict()
for d in [ d2, d1 ]:
for k, v in d.items():
out[k] = v
return out
def d2get(d1, d2, k, default=None):
return d1.get(k, d2.get(k, default))
def openssl(*args):
cmd = "openssl " + ' '.join(["'%s'" % x for x in args])
print 'executing "%s"' % cmd
return os.system(cmd)
File added
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution. </