Commit 7393336e authored by ale's avatar ale

test the DNSSEC functionality (and fix associated bugs)

parent dc26d3f9
import glob
import os
import re
import socket
import subprocess
import time
......@@ -161,7 +163,7 @@ class ZoneWriter(object):
def write_if_changed(self, data, dst):
if os.path.exists(dst):
with open(dst, 'r') as fd:
if not _zonecmp(fd.read(), data):
if _zonecmp(fd.read(), data):
return False
if not self.dry_run:
with open(dst, 'w') as fd:
......@@ -177,7 +179,7 @@ class ZoneWriter(object):
outfile = zone_name
zone_changed = False
dst = os.path.join(self.output_dir, outfile)
if write_if_changed(zone_data, dst):
if self.write_if_changed(zone_data, dst):
changed.add(zone_name)
zone_changed = True
# Zone file post-processing.
......@@ -196,12 +198,38 @@ class ZoneWriter(object):
class DNSSECSigner(object):
random_dev = '/dev/random'
def __init__(self, key_dir, ds_dir, nsec3_salt, refresh):
self.key_dir = key_dir
self.ds_dir = ds_dir
self.nsec3_salt = nsec3_salt
self.refresh = refresh
def has_keys(self, zone_name):
# One day we'll figure out what the magic numbers are...
return glob.glob(os.path.join(self.key_dir, 'K%s.+007+*' % zone_name))
def create_keys(self, zone_name):
old_umask = os.umask(077)
try:
ksk_file = subprocess.check_output([
'dnssec-keygen',
'-r', self.random_dev,
'-3', '-q', '-f', 'KSK',
'-K', self.key_dir,
zone_name])
zsk_file = subprocess.check_output([
'dnssec-keygen',
'-r', self.random_dev,
'-3', '-q',
'-K', self.key_dir,
zone_name])
print 'created DNSSEC keys for "%s" (%s, %s)' % (
zone_name, ksk_file, zsk_file)
finally:
os.umask(old_umask)
def sign_zone_file(self, zone_name, infile, outfile):
"""Sign a zone file with DNSSEC records.
......@@ -227,5 +255,9 @@ class DNSSECSigner(object):
return filename
outfile = filename + '.signed'
if zone_changed or self.refresh:
self.sign_zone_file(zone_name, filename, outfile)
if not self.has_keys(zone_name):
self.create_keys(zone_name)
self.sign_zone_file(zone_name,
os.path.join(output_dir, filename),
os.path.join(output_dir, outfile))
return outfile
import yaml
import shutil
import tempfile
import unittest
from StringIO import StringIO
from zonetool.zone import *
......@@ -7,6 +9,7 @@ from zonetool.zone import *
TEST_DATA = [
'''
autistici.org:
DNSSEC: true
EXTENDS:
- "@default"
_:
......@@ -27,6 +30,10 @@ autistici.org:
''',
]
TEST_CONFIG = {
'FRONTENDS': ['82.94.249.234', '82.221.99.153'],
}
def _loadyaml(strs):
for s in strs:
......@@ -36,9 +43,7 @@ def _loadyaml(strs):
class ZoneParserTest(unittest.TestCase):
def setUp(self):
self.zp = ZoneParser({
'FRONTENDS': ['82.94.249.234', '82.221.99.153'],
})
self.zp = ZoneParser(TEST_CONFIG)
def test_template_expansion(self):
self.zp.load(_loadyaml(TEST_DATA))
......@@ -46,3 +51,72 @@ class ZoneParserTest(unittest.TestCase):
self.assertTrue(result)
self.assertEquals('autistici.org', result[0][0])
class ZoneWriterTestBase(unittest.TestCase):
def setUp(self):
self.zp = ZoneParser(TEST_CONFIG)
self.tmpdir = tempfile.mkdtemp()
self.zw = ZoneWriter(self.tmpdir)
def tearDown(self):
shutil.rmtree(self.tmpdir)
class ZoneWriterTest(ZoneWriterTestBase):
def test_write(self):
self.zp.load(_loadyaml(TEST_DATA))
changed, removed = self.zw.write(self.zp.render())
self.assertEquals(set(['autistici.org']), changed)
self.assertEquals(set(), removed)
# Writing a second time, should see no changes.
changed, removed = self.zw.write(self.zp.render())
self.assertEquals(set(), changed)
self.assertEquals(set(), removed)
# Change the global config, render again.
cfg = dict(TEST_CONFIG)
cfg['FRONTENDS'][0] = '178.255.144.35'
zp2 = ZoneParser(cfg)
zp2.load(_loadyaml(TEST_DATA))
changed, removed = self.zw.write(zp2.render())
self.assertEquals(set(['autistici.org']), changed)
self.assertEquals(set(), removed)
# Replace a domain with another, triggering a change and a
# removal.
zp3 = ZoneParser(TEST_CONFIG)
data = [TEST_DATA[0].replace('autistici.org', 'inventati.org')]
zp3.load(_loadyaml(data))
changed, removed = self.zw.write(zp3.render())
self.assertEquals(set(['inventati.org']), changed)
self.assertEquals(set(['autistici.org']), removed)
class DNSSECTest(ZoneWriterTestBase):
def setUp(self):
ZoneWriterTestBase.setUp(self)
key_dir = os.path.join(self.tmpdir, 'keys')
ds_dir = os.path.join(self.tmpdir, 'ds')
os.mkdir(key_dir)
os.mkdir(ds_dir)
self.signer = DNSSECSigner(key_dir, ds_dir, '123456', False)
self.signer.random_dev = '/dev/urandom'
def test_sign_zone(self):
self.zp.load(_loadyaml(TEST_DATA))
self.zw.write(self.zp.render(), postproc=self.signer)
self.assertTrue(self.signer.has_keys('autistici.org'))
# Test signing again, but add a tripwire to the signing
# function to verify it isn't called.
def oh_no(*args):
raise Exception('called sign_zone_file')
self.signer.sign_zone_file = oh_no
self.zw.write(self.zp.render(), postproc=self.signer)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment