zone_test.py 8.15 KB
Newer Older
ale's avatar
ale committed
1
import yaml
2 3
import shutil
import tempfile
ale's avatar
ale committed
4 5 6 7 8
import unittest
from StringIO import StringIO
from zonetool.zone import *


ale's avatar
ale committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22
class TestMergeZones(unittest.TestCase):

    def test_merge_zones(self):
        a = {'_': ['1.2.3.4']}
        b = {'_': ['TXT spf ...'], 'EXTENDS': ['a']}
        c = {'_': ['2.3.4.5'], 'REPLACES': ['a']}
        zp = ZoneParser()
        zp.zones.update({'a': a, 'b': b, 'c':  c})
        result = zp._resolve_references('@b', b)
        self.assertEqual(set(['1.2.3.4', 'TXT spf ...']), set(result['_']))
        result = zp._resolve_references('@c', c)
        self.assertEqual(['2.3.4.5'], result['_'])
        

ale's avatar
ale committed
23 24 25
TEST_DATA = [
    '''
autistici.org:
26
  DNSSEC: true
ale's avatar
ale committed
27 28 29 30 31 32
  EXTENDS:
    - "@default"
  _:
    - 82.94.249.234
    - 82.221.99.153
  www: "$FRONTENDS"
godog's avatar
godog committed
33
''',
ale's avatar
ale committed
34 35 36 37 38
    '''
"@default":
  _:
    - NS ns1.autistici.org.
    - NS ns2.autistici.org.
godog's avatar
godog committed
39
''',
ale's avatar
ale committed
40 41 42 43 44
    '''
"@default":
  onion:
    - TXT "blahblah.onion."
''',
ale's avatar
ale committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
]

TEST_DATA_2 = [
    '''
autistici.org:
  www:
    - 82.94.249.234
    - 82.221.99.153
''', '''
autistici.org:
  www:
    - TXT "web"
''', '''
autistici.org:
  www:
    - 2a02:f48:2000:201::19
    - 2002:b2ff:9023::1
''',
]
ale's avatar
ale committed
64

ale's avatar
ale committed
65 66 67 68 69 70 71 72 73 74 75
TEST_DATA_3 = [
    '''
autistici.org:
  _:
    - NS $FRONTENDS
    - MX 10 $FRONTENDS
    - 82.94.249.234
    - 82.221.99.153
''',
]

godog's avatar
godog committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
TEST_DATA_4 = [
    '''
autistici.org:
  EXTENDS:
    - "@default"
  _:
    - 1.1.1.3
  www: CNAME "www.l.autistici.org"
''',
    '''
"@default":
  EXTENDS:
    - "@base"
  _:
    - 1.1.1.2
    - NS ns2.autistici.org.
''',
    '''
"@base":
  _:
    - 1.1.1.1
    - MX 10 $FRONTENDS
    - NS ns1.autistici.org.
''',
]

102 103 104 105 106 107 108 109
TEST_DATA_EMPTY_VAR = [
    '''
autistici.org:
  banana:
    - MX 10 $EMPTY
''',
]

110 111
TEST_CONFIG = {
    'FRONTENDS': ['82.94.249.234', '82.221.99.153'],
112
    'EMPTY': [],
113 114
}

ale's avatar
ale committed
115 116 117 118 119 120 121 122 123

def _loadyaml(strs):
    for s in strs:
        yield yaml.safe_load_all(StringIO(s))


class ZoneParserTest(unittest.TestCase):

    def setUp(self):
124
        self.zp = ZoneParser(TEST_CONFIG)
ale's avatar
ale committed
125 126

    def test_template_expansion(self):
ale's avatar
ale committed
127 128
        self.zp.load(_loadyaml(TEST_DATA))
        result = list(self.zp.render())
ale's avatar
ale committed
129

ale's avatar
ale committed
130 131
        self.assertTrue(result)
        self.assertEquals('autistici.org', result[0][0])
132

ale's avatar
ale committed
133 134 135 136 137 138
        # Verify that $FRONTENDS has been replaced correctly.
        self.assertTrue(isinstance(result[0][1]['www'], list))
        self.assertEquals(
            sorted(result[0][1]['www']),
            sorted(TEST_CONFIG['FRONTENDS']))

ale's avatar
ale committed
139 140 141 142 143 144
    def test_zone_output(self):
        self.zp.load(_loadyaml(TEST_DATA))
        result = list(self.zp.render())
        zone_data = result[0][2]
        expected_data = '''; Comments should be ignored
$TTL 3600
ale's avatar
ale committed
145
@		IN	SOA	ns1.autistici.org.	hostmaster.autistici.org. (
ale's avatar
ale committed
146 147 148 149 150 151 152
				1521885904 ; Serial
				43200
				3600
				2419200
				3600 )
		IN	A	82.221.99.153
		IN	A	82.94.249.234
ale's avatar
ale committed
153 154
		IN	NS	ns1.autistici.org.
		IN	NS	ns2.autistici.org.
ale's avatar
ale committed
155 156 157 158 159 160 161 162 163
onion		IN	TXT	"blahblah.onion."
www		IN	A	82.221.99.153
www		IN	A	82.94.249.234
'''
        self.assertTrue(
            zonecmp(zone_data, expected_data),
            'Bad zone data: got: %s, expected: %s' % (
                zone_data, expected_data))

ale's avatar
ale committed
164 165 166 167 168 169 170 171
    def test_merge(self):
        self.zp.load(_loadyaml(TEST_DATA_2))
        result = list(self.zp.render())

        self.assertTrue(result)
        self.assertEquals('autistici.org', result[0][0])

        # Verify that all 'www' entries have been merged properly.
ale's avatar
ale committed
172 173 174
        www = set(result[0][1]['www'])
        expected = set(['82.94.249.234', '82.221.99.153', 'TXT "web"',
                        '2a02:f48:2000:201::19', '2002:b2ff:9023::1'])
ale's avatar
ale committed
175 176
        self.assertEquals(expected, www)

ale's avatar
ale committed
177 178 179 180 181 182 183 184 185 186
    def test_expand_list_keywords(self):
        self.zp.load(_loadyaml(TEST_DATA_3))
        result = list(self.zp.render())
        self.assertTrue(result)
        self.assertEquals('autistici.org', result[0][0])

        # Count the number of NS records.
        num_ns = len(filter(lambda x: x.startswith('NS '), result[0][1]['_']))
        self.assertEquals(num_ns, 2)

187 188 189 190 191 192 193 194 195 196 197
    def test_expand_empty_variable(self):
        self.zp.load(_loadyaml(TEST_DATA_EMPTY_VAR))
        result = list(self.zp.render())
        self.assertTrue(result)
        self.assertEquals('autistici.org', result[0][0])

        # The record that expanded an empty list should not be
        # present at all in the zone.
        self.assertTrue('banana' not in result[0][1],
                        'Bad zone with empty record:\n%s' % result[0][1])

godog's avatar
godog committed
198 199 200 201 202 203 204 205
    def test_recursive_extend(self):
        self.zp.load(_loadyaml(TEST_DATA_4))
        result = list(self.zp.render())
        self.assertTrue(result)
        for frontend in TEST_CONFIG['FRONTENDS']:
            self.assertIn('MX 10 %s' % frontend, result[0][1]['_'])
        self.assertEquals('autistici.org', result[0][0])

206 207 208 209 210 211

class ZoneWriterTestBase(unittest.TestCase):

    def setUp(self):
        self.zp = ZoneParser(TEST_CONFIG)
        self.tmpdir = tempfile.mkdtemp()
ale's avatar
ale committed
212 213 214 215
        zonedir = os.path.join(self.tmpdir, 'zones')
        os.mkdir(zonedir)
        self.zw = ZoneWriter(zonedir,
                             os.path.join(self.tmpdir, 'named.conf'))
216 217 218 219 220 221 222 223 224 225 226 227

    def tearDown(self):
        shutil.rmtree(self.tmpdir)


class ZoneWriterTest(ZoneWriterTestBase):

    def test_write(self):
        self.zp.load(_loadyaml(TEST_DATA))
        changed, removed = self.zw.write(self.zp.render())
        self.assertEquals(set(['autistici.org']), changed)
        self.assertEquals(set(), removed)
ale's avatar
ale committed
228 229
        self.assertTrue(
            os.path.exists(os.path.join(self.tmpdir, 'named.conf')))
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253

        # Writing a second time, should see no changes.
        changed, removed = self.zw.write(self.zp.render())
        self.assertEquals(set(), changed)
        self.assertEquals(set(), removed)

        # Change the global config, render again.
        cfg = dict(TEST_CONFIG)
        cfg['FRONTENDS'][0] = '178.255.144.35'
        zp2 = ZoneParser(cfg)
        zp2.load(_loadyaml(TEST_DATA))
        changed, removed = self.zw.write(zp2.render())
        self.assertEquals(set(['autistici.org']), changed)
        self.assertEquals(set(), removed)

        # Replace a domain with another, triggering a change and a
        # removal.
        zp3 = ZoneParser(TEST_CONFIG)
        data = [TEST_DATA[0].replace('autistici.org', 'inventati.org')]
        zp3.load(_loadyaml(data))
        changed, removed = self.zw.write(zp3.render())
        self.assertEquals(set(['inventati.org']), changed)
        self.assertEquals(set(['autistici.org']), removed)

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
    def test_config(self):
        self.zp.load(_loadyaml(TEST_DATA))
        self.zw.write(self.zp.render())

        with open(os.path.join(self.tmpdir, 'named.conf')) as fd:
            named_conf = fd.read()
        self.assertEquals(named_conf, '''
zone "autistici.org" {
  type master;
  file "%s/zones/autistici.org";
  allow-query { any; };
};
''' % (self.tmpdir,))

    def test_config_with_update_policies(self):
        self.zp.load(_loadyaml(TEST_DATA))
        self.zw.update_policies = [
            'grant testkey zonesub',
            'grant acme zonesub TXT',
        ]
        self.zw.write(self.zp.render())

        with open(os.path.join(self.tmpdir, 'named.conf')) as fd:
            named_conf = fd.read()
        self.assertEquals(named_conf, '''
zone "autistici.org" {
  type master;
  file "%s/zones/autistici.org";
  allow-query { any; };
  update-policy { grant testkey zonesub; };
  update-policy { grant acme zonesub TXT; };
};
''' % (self.tmpdir,))

288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312

class DNSSECTest(ZoneWriterTestBase):

    def setUp(self):
        ZoneWriterTestBase.setUp(self)

        key_dir = os.path.join(self.tmpdir, 'keys')
        ds_dir = os.path.join(self.tmpdir, 'ds')
        os.mkdir(key_dir)
        os.mkdir(ds_dir)
        self.signer = DNSSECSigner(key_dir, ds_dir, '123456', False)
        self.signer.random_dev = '/dev/urandom'

    def test_sign_zone(self):
        self.zp.load(_loadyaml(TEST_DATA))
        self.zw.write(self.zp.render(), postproc=self.signer)

        self.assertTrue(self.signer.has_keys('autistici.org'))

        # Test signing again, but add a tripwire to the signing
        # function to verify it isn't called.
        def oh_no(*args):
            raise Exception('called sign_zone_file')
        self.signer.sign_zone_file = oh_no
        self.zw.write(self.zp.render(), postproc=self.signer)