From 80d0dcce92d3cfe214af06387024851ef03e18e7 Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Wed, 12 Oct 2011 19:45:31 +0100 Subject: [PATCH] add tests for the model and the basic daemons, including a few fixes for bugs found with those tests --- server/djrandom/model/markov.py | 6 +- server/djrandom/model/mp3.py | 4 +- server/djrandom/model/playlist.py | 2 +- server/djrandom/receiver/receiver.py | 6 +- server/djrandom/scanner/metadata.py | 9 +- server/djrandom/scanner/scanner.py | 6 +- server/djrandom/test/__init__.py | 57 +++++++ server/djrandom/test/test_markov.py | 71 +++++++++ server/djrandom/test/test_model.py | 220 ++++++++++++++++++++++++++ server/djrandom/test/test_receiver.py | 60 +++++++ server/djrandom/test/test_scanner.py | 141 +++++++++++++++++ server/djrandom/test/test_utils.py | 67 ++++++++ 12 files changed, 634 insertions(+), 15 deletions(-) create mode 100644 server/djrandom/test/__init__.py create mode 100644 server/djrandom/test/test_markov.py create mode 100644 server/djrandom/test/test_model.py create mode 100644 server/djrandom/test/test_receiver.py create mode 100644 server/djrandom/test/test_scanner.py create mode 100644 server/djrandom/test/test_utils.py diff --git a/server/djrandom/model/markov.py b/server/djrandom/model/markov.py index fa8dfec..45c253c 100644 --- a/server/djrandom/model/markov.py +++ b/server/djrandom/model/markov.py @@ -73,12 +73,12 @@ class MarkovModel(object): def suggest(self, prev): prev_n = tuple(self._to_i(x) for x in prev) if prev_n in self._map: - r = self.random.random() - for off, value in self._norm_map[prev_n]: + r = self._rnd.random() + for off, value in self._map[prev_n]: if off > r: return self._i2hash[value] # Can't find anything, get a random song instead. - return random.choice(self._i2hash) + return self._i2hash[self._rnd.randint(1, len(self._i2hash) - 1)] def generate_sequence(self, prev, n, count): if len(prev) < n: diff --git a/server/djrandom/model/mp3.py b/server/djrandom/model/mp3.py index 3f70280..5cc3b34 100644 --- a/server/djrandom/model/mp3.py +++ b/server/djrandom/model/mp3.py @@ -63,7 +63,7 @@ class MP3(Base): 'sha1': self.sha1, 'size': self.size, 'uploaded_at': self.uploaded_at} - if self.duplicate: + if self.duplicate_of: data['duplicate_of'] = self.duplicate_of if self.track_num: data['track_num'] = self.track_num @@ -162,7 +162,7 @@ class PlayLog(Base): t[0..n] -> target transition registered in the playlog. """ n -= 1 # account for the target - for plog in cls.query.all(): + for plog in cls.query: if plog.prev: hashes = plog.prev.split(',')[:n] if len(hashes) < n: diff --git a/server/djrandom/model/playlist.py b/server/djrandom/model/playlist.py index e4fabd9..8c1f043 100644 --- a/server/djrandom/model/playlist.py +++ b/server/djrandom/model/playlist.py @@ -40,4 +40,4 @@ class Playlist(Base): @classmethod def get_by_title(cls, userid, title): - return cls.query.filter_by(userid=userid, title=title) + return cls.query.filter_by(userid=userid, title=title).first() diff --git a/server/djrandom/receiver/receiver.py b/server/djrandom/receiver/receiver.py index 21238fb..bb478db 100644 --- a/server/djrandom/receiver/receiver.py +++ b/server/djrandom/receiver/receiver.py @@ -39,11 +39,12 @@ def _upload_mp3(incoming_fd, sha1): shutil.copyfileobj(incoming_fd, fd) if utils.sha1_of_file(mp3.path) != sha1: log.error('sha1 mismatch') - raise Error('SHA1 mismatch') + return False mp3.size = os.path.getsize(mp3.path) Session.add(mp3) Session.commit() log.info('successfully stored %s' % (sha1,)) + return True @app.route('/upload/<sha1>', methods=['POST']) @@ -52,7 +53,8 @@ def upload(sha1): log.error('attempted upload with wrong content-type (%s)' % ( request.content_type,)) abort(400) - _upload_mp3(request.stream, sha1) + if not _upload_mp3(request.stream, sha1): + abort(400) return jsonify(status=True) diff --git a/server/djrandom/scanner/metadata.py b/server/djrandom/scanner/metadata.py index 2ef6262..b09c237 100644 --- a/server/djrandom/scanner/metadata.py +++ b/server/djrandom/scanner/metadata.py @@ -9,10 +9,11 @@ _nonalpha_pattern = re.compile(r'\W+', re.UNICODE) _spaces_pattern = re.compile(r'\s+', re.UNICODE) def normalize_string(s): - s = s.replace('_', ' ') - s = _nonalpha_pattern.sub(' ', s) - s = _spaces_pattern.sub(' ', s) - s = s.lower().strip() + if s: + s = s.replace('_', ' ') + s = _nonalpha_pattern.sub(' ', s) + s = _spaces_pattern.sub(' ', s) + s = s.lower().strip() return s diff --git a/server/djrandom/scanner/scanner.py b/server/djrandom/scanner/scanner.py index 76761d1..39b2e14 100644 --- a/server/djrandom/scanner/scanner.py +++ b/server/djrandom/scanner/scanner.py @@ -20,7 +20,7 @@ class Scanner(object): def process(self, mp3): mp3_info = metadata.analyze_mp3(mp3.path) - if not mp3_info['artist'] or not mp3_info['title']: + if not mp3_info.get('artist') or not mp3_info.get('title'): raise BadMetadataError() for key, value in mp3_info.iteritems(): setattr(mp3, key, value) @@ -31,10 +31,10 @@ class Scanner(object): mp3 = MP3.query.filter_by(state=MP3.INCOMING ).limit(1).first() if not mp3: - if run_once: - break Session.remove() indexer.commit() + if run_once: + break time.sleep(60) continue log.info('processing %s' % mp3.sha1) diff --git a/server/djrandom/test/__init__.py b/server/djrandom/test/__init__.py new file mode 100644 index 0000000..3acf1bd --- /dev/null +++ b/server/djrandom/test/__init__.py @@ -0,0 +1,57 @@ +import mox +import os +import shutil +import solr +import tempfile +from djrandom.model import indexer +from djrandom import database + + +class DbTestCase(mox.MoxTestBase): + + def setUp(self): + mox.MoxTestBase.setUp(self) + self.tmpdir = tempfile.mkdtemp() + self.mox.StubOutWithMock(indexer.Indexer, 'add_mp3') + self.mox.StubOutWithMock(indexer.Indexer, 'del_mp3') + self.mox.StubOutWithMock(indexer.Indexer, 'commit') + database.init_db('sqlite://', 'http://solr/') + + def tearDown(self): + database.Session.remove() + mox.MoxTestBase.tearDown(self) + shutil.rmtree(self.tmpdir) + + +class SolrTestCase(mox.MoxTestBase): + """A more complex base test class, with temporary storage and SOLR.""" + + def setUp(self): + mox.MoxTestBase.setUp(self) + # Do not use StubOutWithMock, since we don't know if the test is + # actually going to initialize the indexer or not. + self.solr = self.mox.CreateMock(solr.Solr) + self.old_solr = solr.Solr + solr.Solr = lambda url, **kw: self.solr + database.init_db('sqlite://', 'http://solr/') + + def tearDown(self): + database.Session.remove() + # Reset database.indexer + database.indexer._solr = None + solr.Solr = self.old_solr + mox.MoxTestBase.tearDown(self) + + +class WsgiTestCase(DbTestCase): + + FLASK_APP = None + DATA = [] + + def setUp(self): + DbTestCase.setUp(self) + for item in self.DATA: + database.Session.add(item) + database.Session.commit() + self.FLASK_APP.config['TESTING'] = True + self.app = self.FLASK_APP.test_client() diff --git a/server/djrandom/test/test_markov.py b/server/djrandom/test/test_markov.py new file mode 100644 index 0000000..62a3c9e --- /dev/null +++ b/server/djrandom/test/test_markov.py @@ -0,0 +1,71 @@ +import os +from djrandom.test import DbTestCase +from djrandom.model import markov + + +class MarkovTest(DbTestCase): + + def setUp(self): + DbTestCase.setUp(self) + + # Build a simple n=2 model. + self.source = [ + ('1', [None]), + ('1', ['4']), + ('2', ['1']), + ('3', ['2']), + ('4', ['3']), + ('4', ['1']), + ] + self.markov = markov.MarkovModel() + self.markov.create(self.source) + self.markov.normalize() + + def test_markov_create_ok(self): + # Test some internals. + self.assertEquals(5, len(self.markov._hash2i)) + self.assertEquals(5, len(self.markov._i2hash)) + + def test_markov_suggest_single(self): + # There is just one possible answer. + self.assertEquals('3', self.markov.suggest(['2'])) + self.assertEquals('1', self.markov.suggest([None])) + + def test_markov_suggest_many(self): + # There is more than one possibility for '1', so run a bunch of + # queries and count the results. + counts = {'2': 0, '4': 0} + for i in range(1000): + result = self.markov.suggest(['1']) + self.assertTrue(result in ('2', '4'), + 'unexpected random result "%s"' % result) + counts[result] += 1 + ratio = float(counts['2']) / float(counts['4']) + if ratio < 1: + ratio = 1.0 / ratio + # Be tolerant. + self.assertTrue(ratio < 1.5, + 'unbalanced distribution: %s' % str(counts)) + + def test_markov_suggest_non_existing(self): + for i in xrange(100): + # '5' should not exist. + result = self.markov.suggest(['5']) + self.assertTrue(result in ('1', '2', '3', '4', '5'), + 'unexpected random result "%s"' % result) + + def test_markov_generate_sequence(self): + result = self.markov.generate_sequence(['2'], 1, 2) + self.assertEquals(['3', '4'], result) + + def test_markov_save_and_reload(self): + filename = os.path.join(self.tmpdir, 'markov.dat') + self.markov.save(filename) + self.assertTrue(os.path.exists(filename)) + + markov_b = markov.MarkovModel() + markov_b.load(filename) + + self.assertEquals(self.markov._hash2i, markov_b._hash2i) + self.assertEquals(self.markov._i2hash, markov_b._i2hash) + self.assertEquals(self.markov._map, markov_b._map) diff --git a/server/djrandom/test/test_model.py b/server/djrandom/test/test_model.py new file mode 100644 index 0000000..67ea9ed --- /dev/null +++ b/server/djrandom/test/test_model.py @@ -0,0 +1,220 @@ +import os +import shutil +from datetime import datetime +from djrandom.test import DbTestCase, SolrTestCase +from djrandom.database import Session, indexer +from djrandom.model.mp3 import MP3, PlayLog, SearchLog, DUPLICATE_DIR +from djrandom.model.playlist import Playlist + + +class MP3Test(DbTestCase): + + def _create_mp3(self, sha1='1234', **kw): + mp3_data = {'title': u'title', 'artist': u'artist', 'album': u'album', + 'genre': u'genre', 'sha1': sha1, 'size': 2601, + 'uploaded_at': datetime(2011, 10, 10, 9, 18, 0)} + mp3_data.update(kw) + return MP3(path='/storage/' + sha1, **mp3_data), mp3_data + + def test_mp3_std(self): + # Simple tests building and serializing an MP3 object. + mp3, mp3_data = self._create_mp3() + Session.add(mp3) + Session.commit() + + mp3b = MP3.query.get('1234') + self.assertTrue(mp3 is not None) + self.assertEquals(MP3.INCOMING, mp3b.state) + for key, value in mp3_data.iteritems(): + actual_value = getattr(mp3b, key) + self.assertEquals(value, actual_value) + + mp3b_dict = mp3b.to_dict() + self.assertEquals(mp3_data, mp3b_dict) + + def test_mp3_mark_as_duplicate(self): + self.mox.StubOutWithMock(os.path, 'isdir') + self.mox.StubOutWithMock(os, 'makedirs') + self.mox.StubOutWithMock(shutil, 'move') + + os.path.isdir(DUPLICATE_DIR).AndReturn(False) + os.makedirs(DUPLICATE_DIR) + shutil.move('/storage/1234', + os.path.join(DUPLICATE_DIR, '1234')) + + self.mox.ReplayAll() + mp3, _ = self._create_mp3() + mp3.mark_as_duplicate('2345') + + self.assertEquals('2345', mp3.duplicate_of) + self.assertEquals(MP3.DUPLICATE, mp3.state) + + def test_mp3_fingerprint(self): + fp = 'a fingerprint' + mp3, _ = self._create_mp3() + mp3.set_fingerprint(fp) + Session.add(mp3) + Session.commit() + + mp3b = MP3.query.get('1234') + self.assertEquals(True, mp3b.has_fingerprint) + self.assertEquals(fp, mp3b.get_fingerprint()) + + def test_mp3_get_with_no_fingerprint(self): + mp3_1, _ = self._create_mp3('1001') + mp3_2, _ = self._create_mp3('1002') + mp3_3, _ = self._create_mp3('1003') + mp3_1.state = MP3.READY + mp3_2.state = MP3.BAD_METADATA + for x in (mp3_1, mp3_2, mp3_3): + Session.add(x) + Session.commit() + + results = MP3.get_with_no_fingerprint() + result_ids = set(x.sha1 for x in results) + self.assertEquals(set(['1001', '1002']), result_ids) + + def test_mp3_get_with_bad_metadata(self): + pass + + def test_mp3_last_uploaded(self): + pass + + def test_mp3_get_songs_for_album(self): + mp3_1, _ = self._create_mp3('1001', album=u'other album', state=MP3.READY) + mp3_2, _ = self._create_mp3('1002', state=MP3.READY) + Session.add(mp3_1) + Session.add(mp3_2) + Session.commit() + + results = MP3.get_songs_for_album(u'artist', u'album') + self.assertEquals(['1002'], [x.sha1 for x in results]) + + #def test_mp3_get_random_songs(self): + # mp3_1, _ = self._create_mp3('1001', state=MP3.READY) + # mp3_2, _ = self._create_mp3('1002', state=MP3.READY) + # mp3_3, _ = self._create_mp3('1003', state=MP3.READY) + # for x in (mp3_1, mp3_2, mp3_3): + # Session.add(x) + # Session.commit() + # + # results = MP3.get_random_songs(1) + # self.assertEquals(1, len(results)) + # self.assertTrue(results[0].sha1 in (u'1001', u'1002', u'1003')) + # + # results = MP3.get_random_songs(10) + # self.assertEquals(10, len(results)) + + +class PlayLogTest(DbTestCase): + + def setUp(self): + DbTestCase.setUp(self) + last = None + for sha in ('1', '2', '3', '4', '1'): + p = PlayLog(sha1=sha, userid='user', prev=last, + stamp=datetime.now()) + last = sha + Session.add(p) + Session.commit() + + def test_playlog_most_played(self): + result = PlayLog.most_played(1).all() + self.assertEquals(1, len(result)) + self.assertEquals(u'1', result[0][0]) + self.assertEquals(2, result[0][1]) + + def test_playlog_generate_tuples(self): + result = list(PlayLog.generate_tuples(n=2)) + result.sort() + expected = [(u'1', [None]), + (u'1', [u'4']), + (u'2', [u'1']), + (u'3', [u'2']), + (u'4', [u'3'])] + self.assertEquals(expected, result) + + +class PlayListTest(DbTestCase): + + def setUp(self): + DbTestCase.setUp(self) + pl1 = Playlist(modified_at=datetime.now(), + userid='userid', + contents='1,2,3,4') + Session.add(pl1) + pl2 = Playlist(modified_at=datetime.now(), + userid='userid', + contents='2,3,4,5', + title=u'title') + Session.add(pl2) + Session.commit() + + def test_playlist_std(self): + pldata = {'modified_at': datetime.now(), + 'userid': 'userid', + 'play_count': 3, + 'contents': '1,2,3,4', + 'title': u'title'} + pl = Playlist(**pldata) + Session.add(pl) + Session.commit() + + self.assertTrue(pl.uuid is not None) + for key, value in pldata.items(): + self.assertEquals(value, getattr(pl, key)) + + pl_ser = pl.to_dict() + self.assertEquals(u'title', pl_ser['title']) + self.assertEquals(['1', '2', '3', '4'], pl_ser['songs']) + + def test_playlist_get_all_by_user(self): + results = Playlist.get_all_by_user('userid').all() + self.assertEquals(2, len(results)) + + results = Playlist.get_all_by_user('userid', public=True).all() + self.assertEquals(1, len(results)) + + def test_playlist_get_by_title(self): + results = Playlist.get_by_title('userid', u'title') + self.assertEquals(u'title', results.title) + + +class IndexerTest(SolrTestCase): + + def test_indexer_setup(self): + self.mox.ReplayAll() + self.assertTrue(indexer._get_solr()) + + def test_indexer_commit(self): + self.solr.commit() + self.mox.ReplayAll() + indexer.commit() + + def test_indexer_delete_all(self): + self.solr.delete_query('id:*') + self.mox.ReplayAll() + indexer.delete_all() + + def test_indexer_del_mp3(self): + self.solr.delete('1234') + self.mox.ReplayAll() + mp3 = MP3(sha1='1234') + indexer.del_mp3(mp3) + + def test_indexer_add_mp3(self): + self.solr.add({'id': '1', + 'artist': u'artist', + 'album': u'album', + 'title': u'title', + 'genre': u'genre'}) + self.solr.delete('2') + self.mox.ReplayAll() + + good_mp3 = MP3(sha1='1', artist=u'artist', album=u'album', + title=u'title', genre=u'genre', + state=MP3.READY) + indexer.add_mp3(good_mp3) + + bad_mp3 = MP3(sha1='2', state=MP3.ERROR) + indexer.add_mp3(bad_mp3) diff --git a/server/djrandom/test/test_receiver.py b/server/djrandom/test/test_receiver.py new file mode 100644 index 0000000..b5b3464 --- /dev/null +++ b/server/djrandom/test/test_receiver.py @@ -0,0 +1,60 @@ +import hashlib +import json +from djrandom.test import WsgiTestCase +from djrandom.receiver import receiver +from djrandom.model.mp3 import MP3 + + +class ReceiverTest(WsgiTestCase): + + FLASK_APP = receiver.app + + DATA = [MP3(sha1='1234')] + + def setUp(self): + WsgiTestCase.setUp(self) + receiver.storage_root = self.tmpdir + + def test_check_existing(self): + rv = self.app.get('/check/1234') + result = json.loads(rv.data) + self.assertEquals({'status': True}, result) + + def test_check_nonexisting(self): + rv = self.app.get('/check/2345') + result = json.loads(rv.data) + self.assertEquals({'status': False}, result) + + def test_upload_ok(self): + filedata = 'buzz buzz buzz' + filesha1 = hashlib.sha1(filedata).hexdigest() + rv = self.app.post('/upload/%s' % filesha1, + content_type='audio/mpeg', + data=filedata) + result = json.loads(rv.data) + self.assertEquals({'status': True}, result) + + # Check that the MP3 exists and it's in the right state + mp3 = MP3.query.get(filesha1) + self.assertTrue(mp3 is not None) + self.assertEquals(MP3.INCOMING, mp3.state) + + # Check that the file contents have been saved properly + with open(mp3.path, 'r') as fd: + actual_data = fd.read() + self.assertEquals(filedata, actual_data) + + def test_upload_sha1_mismatch(self): + filedata = 'buzz buzz buzz' + filesha1 = 'wrongsha1' + rv = self.app.post('/upload/%s' % filesha1, + content_type='audio/mpeg', + data=filedata) + self.assertEquals(400, rv.status_code) + + def test_upload_wrong_content_type(self): + filedata = 'buzz buzz buzz' + filesha1 = hashlib.sha1(filedata).hexdigest() + rv = self.app.post('/upload/%s' % filesha1, + data=filedata) + self.assertEquals(400, rv.status_code) diff --git a/server/djrandom/test/test_scanner.py b/server/djrandom/test/test_scanner.py new file mode 100644 index 0000000..90cccfe --- /dev/null +++ b/server/djrandom/test/test_scanner.py @@ -0,0 +1,141 @@ +import eyeD3 +import hashlib +import json +from djrandom.database import Session, indexer +from djrandom.test import DbTestCase +from djrandom.scanner import scanner +from djrandom.scanner import metadata +from djrandom.model.mp3 import MP3 + + +class FakeGenre(object): + + def __init__(self, name): + self.name = name + + def getName(self): + return self.name + + +class ScannerTest(DbTestCase): + + def setUp(self): + DbTestCase.setUp(self) + data = [ + {'sha1': '1', 'state': MP3.INCOMING, 'path': '/root/1'}, + {'sha1': '2', 'state': MP3.READY, 'path': '/root/2', + 'artist': u'artist', 'title': u'title'}, + {'sha1': '3', 'state': MP3.ERROR, 'path': '/root/3'}, + ] + for attrs in data: + mp3 = MP3(**attrs) + Session.add(mp3) + Session.commit() + + def test_scanner_process_good_metadata(self): + self.mox.StubOutWithMock(metadata, 'analyze_mp3') + metadata.analyze_mp3('/root/1').AndReturn({ + 'artist': u'artist', 'title': u'title'}) + self.mox.ReplayAll() + + mp3 = MP3.query.get('1') + sc = scanner.Scanner() + sc.process(mp3) + self.assertEquals(u'artist', mp3.artist) + self.assertEquals(u'title', mp3.title) + + def test_scanner_process_bad_metadata(self): + self.mox.StubOutWithMock(metadata, 'analyze_mp3') + metadata.analyze_mp3('/root/1').AndReturn({}) + self.mox.ReplayAll() + + mp3 = MP3.query.get('1') + sc = scanner.Scanner() + self.assertRaises(scanner.BadMetadataError, + sc.process, mp3) + + def test_scanner_run(self): + sc = scanner.Scanner() + mp3 = MP3.query.get('1') + + self.mox.StubOutWithMock(sc, 'process') + sc.process(mp3) + indexer.add_mp3(mp3) + indexer.commit() + + self.mox.ReplayAll() + + sc.scan_db(run_once=True) + + # Verify changes to the mp3 object. + mp3b = MP3.query.get('1') + self.assertEquals(MP3.READY, mp3b.state) + + def test_scanner_run_bad_metadata(self): + sc = scanner.Scanner() + mp3 = MP3.query.get('1') + + self.mox.StubOutWithMock(sc, 'process') + sc.process(mp3).AndRaise(scanner.BadMetadataError()) + indexer.add_mp3(mp3) + indexer.commit() + + self.mox.ReplayAll() + + sc.scan_db(run_once=True) + + # Verify changes to the mp3 object. + mp3b = MP3.query.get('1') + self.assertEquals(MP3.BAD_METADATA, mp3b.state) + + def test_scanner_run_error(self): + sc = scanner.Scanner() + mp3 = MP3.query.get('1') + + self.mox.StubOutWithMock(sc, 'process') + sc.process(mp3).AndRaise(Exception('something bad!')) + indexer.add_mp3(mp3) + indexer.commit() + + self.mox.ReplayAll() + + sc.scan_db(run_once=True) + + # Verify changes to the mp3 object. + mp3b = MP3.query.get('1') + self.assertEquals(MP3.ERROR, mp3b.state) + + def test_metadata_normalize_string(self): + testdata = [ + (None, None), + ('', ''), + (u'a', u'a'), + (u'perep\xe8', u'perep\xe8'), + (u'bla bla_BLA$$$', u'bla bla bla'), + ] + for src, expected in testdata: + result = metadata.normalize_string(src) + self.assertEquals(expected, result) + + def test_metadata_analyze_mp3(self): + tag = self.mox.CreateMock(eyeD3.Tag) + self.mox.StubOutWithMock(eyeD3, 'Tag', use_mock_anything=True) + eyeD3.Tag().AndReturn(tag) + tag.link('/path/to/mp3') + tag.getGenre().InAnyOrder().AndReturn(FakeGenre(u'genre')) + tag.getArtist().InAnyOrder().AndReturn(u'artist') + tag.getAlbum().InAnyOrder().AndReturn(u'album') + tag.getTitle().InAnyOrder().AndReturn(u'title') + tag.getTrackNum().InAnyOrder().AndReturn((1, 10)) + + self.mox.ReplayAll() + result = metadata.analyze_mp3('/path/to/mp3') + + self.assertEquals( + {'artist': u'artist', + 'album': u'album', + 'title': u'title', + 'genre': u'genre', + 'track_num': 1 + }, + result) diff --git a/server/djrandom/test/test_utils.py b/server/djrandom/test/test_utils.py new file mode 100644 index 0000000..6087506 --- /dev/null +++ b/server/djrandom/test/test_utils.py @@ -0,0 +1,67 @@ +import mox +import os +import shutil +import tempfile +import unittest +from djrandom import utils +from djrandom.test import DbTestCase + + +class UtilsTest(mox.MoxTestBase): + + def setUp(self): + mox.MoxTestBase.setUp(self) + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + mox.MoxTestBase.tearDown(self) + + def test_generate_path(self): + self.assertEquals('/base/a/b/abcd', + utils.generate_path('/base', 'abcd')) + + def test_random_token(self): + result = utils.random_token() + self.assertTrue(len(result) > 10) + result2 = utils.random_token() + self.assertNotEquals(result, result2) + + def test_sha1_of_file(self): + test_file = os.path.join(self.tmpdir, 'testfile') + with open(test_file, 'w') as fd: + fd.write('test\n') + sha1 = utils.sha1_of_file(test_file) + self.assertEquals('4e1243bd22c66e76c2ba9eddc1f91394e57f9f83', sha1) + + def test_read_config_defaults(self): + cfg_file = os.path.join(self.tmpdir, 'config') + with open(cfg_file, 'w') as fd: + fd.write(''' +# Test config file +var_a=a +var_b = b + + var_c = 42 +''') + + parser = self.mox.CreateMockAnything() + parser.set_default('var_a', 'a') + parser.set_default('var_b', 'b') + parser.set_default('var_c', '42') + self.mox.ReplayAll() + + utils.read_config_defaults(parser, cfg_file) + + def test_read_config_file_with_error(self): + cfg_file = os.path.join(self.tmpdir, 'config') + with open(cfg_file, 'w') as fd: + fd.write('this is not a config\n') + + self.assertRaises(utils.SyntaxError, + utils.read_config_defaults, + None, cfg_file) + + def test_read_config_file_missing(self): + utils.read_config_defaults( + None, os.path.join(self.tmpdir, 'nosuchfile')) -- GitLab