import os import shutil from datetime import datetime from djrandom.test import DbTestCase, SolrTestCase from djrandom.database import Session, indexer from djrandom.model.mp3 import MP3, PlayLog, SearchLog, DUPLICATE_DIR from djrandom.model.playlist import Playlist def create_mp3(sha1='1234', **kw): mp3_data = {'title': u'title', 'artist': u'artist', 'album': u'album', 'genre': u'genre', 'sha1': sha1, 'size': 2601, 'uploaded_at': datetime(2011, 10, 10, 9, 18, 0)} mp3_data.update(kw) return MP3(path='/storage/' + sha1, **mp3_data), mp3_data class MP3Test(DbTestCase): def test_mp3_std(self): # Simple tests building and serializing an MP3 object. mp3, mp3_data = create_mp3() Session.add(mp3) Session.commit() mp3b = MP3.query.get('1234') self.assertTrue(mp3 is not None) self.assertEquals(MP3.INCOMING, mp3b.state) for key, value in mp3_data.iteritems(): actual_value = getattr(mp3b, key) self.assertEquals(value, actual_value) mp3b_dict = mp3b.to_dict() self.assertEquals(mp3_data, mp3b_dict) def test_mp3_mark_as_duplicate(self): self.mox.StubOutWithMock(os.path, 'isdir') self.mox.StubOutWithMock(os, 'makedirs') self.mox.StubOutWithMock(shutil, 'move') os.path.isdir(DUPLICATE_DIR).AndReturn(False) os.makedirs(DUPLICATE_DIR) shutil.move('/storage/1234', os.path.join(DUPLICATE_DIR, '1234')) self.mox.ReplayAll() mp3, _ = create_mp3() mp3.mark_as_duplicate('2345') self.assertEquals('2345', mp3.duplicate_of) self.assertEquals(MP3.DUPLICATE, mp3.state) def test_mp3_fingerprint(self): fp = 'a fingerprint' mp3, _ = create_mp3() mp3.set_fingerprint(fp) Session.add(mp3) Session.commit() mp3b = MP3.query.get('1234') self.assertEquals(True, mp3b.has_fingerprint) self.assertEquals(fp, mp3b.get_fingerprint()) def test_mp3_get_with_no_fingerprint(self): mp3_1, _ = create_mp3('1001') mp3_2, _ = create_mp3('1002') mp3_3, _ = create_mp3('1003') mp3_1.state = MP3.READY mp3_2.state = MP3.BAD_METADATA for x in (mp3_1, mp3_2, mp3_3): Session.add(x) Session.commit() results = MP3.get_with_no_fingerprint() result_ids = set(x.sha1 for x in results) self.assertEquals(set(['1001', '1002']), result_ids) def test_mp3_get_many(self): for i in range(1, 4): mp3, _ = create_mp3(unicode(i)) Session.add(mp3) Session.commit() hashes = [u'3', u'1', u'2'] results = MP3.get_many(hashes) result_hashes = [x.sha1 for x in results] self.assertEquals(hashes, result_hashes) def test_mp3_get_with_bad_metadata(self): pass def test_mp3_last_uploaded(self): pass def test_mp3_get_songs_for_album(self): mp3_1, _ = create_mp3('1001', album=u'other album', state=MP3.READY) mp3_2, _ = create_mp3('1002', state=MP3.READY) Session.add(mp3_1) Session.add(mp3_2) Session.commit() results = MP3.get_songs_for_album(u'artist', u'album') self.assertEquals(['1002'], [x.sha1 for x in results]) #def test_mp3_get_random_songs(self): # mp3_1, _ = create_mp3('1001', state=MP3.READY) # mp3_2, _ = create_mp3('1002', state=MP3.READY) # mp3_3, _ = create_mp3('1003', state=MP3.READY) # for x in (mp3_1, mp3_2, mp3_3): # Session.add(x) # Session.commit() # # results = MP3.get_random_songs(1) # self.assertEquals(1, len(results)) # self.assertTrue(results[0].sha1 in (u'1001', u'1002', u'1003')) # # results = MP3.get_random_songs(10) # self.assertEquals(10, len(results)) class PlayLogTest(DbTestCase): def setUp(self): DbTestCase.setUp(self) last = None for sha in ('1', '2', '3', '4', '1'): p = PlayLog(sha1=sha, userid='user', prev=last, stamp=datetime.now()) last = sha Session.add(p) Session.commit() def test_playlog_most_played(self): result = PlayLog.most_played(1).all() self.assertEquals(1, len(result)) self.assertEquals(u'1', result[0][0]) self.assertEquals(2, result[0][1]) def test_playlog_generate_tuples(self): for i in range(1, 5): mp3, _ = create_mp3(unicode(i)) Session.add(mp3) Session.commit() result = list(PlayLog.generate_tuples(n=2)) result.sort() expected = [(u'1', [None]), (u'1', [u'4']), (u'2', [u'1']), (u'3', [u'2']), (u'4', [u'3'])] self.assertEquals(expected, result) def test_playlog_generate_tuples_with_duplicates(self): for i in range(1, 4): mp3, _ = create_mp3(unicode(i)) Session.add(mp3) mp3, _ = create_mp3(u'4', state=MP3.DUPLICATE, duplicate_of=u'2') Session.add(mp3) Session.commit() result = list(PlayLog.generate_tuples(n=2)) result.sort() expected = [(u'1', [None]), (u'1', [u'2']), (u'2', [u'1']), (u'2', [u'3']), (u'3', [u'2'])] self.assertEquals(expected, result) class PlayListTest(DbTestCase): def setUp(self): DbTestCase.setUp(self) pl1 = Playlist(modified_at=datetime.now(), userid='userid', contents='1,2,3,4') Session.add(pl1) pl2 = Playlist(modified_at=datetime.now(), userid='userid', contents='2,3,4,5', title=u'title') Session.add(pl2) Session.commit() def test_playlist_std(self): pldata = {'modified_at': datetime.now(), 'userid': 'userid', 'play_count': 3, 'contents': '1,2,3,4', 'title': u'title'} pl = Playlist(**pldata) Session.add(pl) Session.commit() self.assertTrue(pl.uuid is not None) for key, value in pldata.items(): self.assertEquals(value, getattr(pl, key)) pl_ser = pl.to_dict() self.assertEquals(u'title', pl_ser['title']) self.assertEquals(['1', '2', '3', '4'], pl_ser['songs']) def test_playlist_get_all_by_user(self): results = Playlist.get_all_by_user('userid').all() self.assertEquals(2, len(results)) results = Playlist.get_all_by_user('userid', public=True).all() self.assertEquals(1, len(results)) def test_playlist_get_by_title(self): results = Playlist.get_by_title('userid', u'title') self.assertEquals(u'title', results.title) class IndexerTest(SolrTestCase): def test_indexer_setup(self): self.mox.ReplayAll() self.assertTrue(indexer._get_solr()) def test_indexer_commit(self): self.solr.commit() self.mox.ReplayAll() indexer.commit() def test_indexer_delete_all(self): self.solr.delete_query('id:*') self.mox.ReplayAll() indexer.delete_all() def test_indexer_del_mp3(self): self.solr.delete('1234') self.mox.ReplayAll() mp3 = MP3(sha1='1234') indexer.del_mp3(mp3) def test_indexer_add_mp3(self): self.solr.add({'id': '1', 'artist': u'artist', 'album': u'album', 'title': u'title', 'genre': u'genre'}) self.solr.delete('2') self.mox.ReplayAll() good_mp3 = MP3(sha1='1', artist=u'artist', album=u'album', title=u'title', genre=u'genre', state=MP3.READY) indexer.add_mp3(good_mp3) bad_mp3 = MP3(sha1='2', state=MP3.ERROR) indexer.add_mp3(bad_mp3)