Skip to content
Snippets Groups Projects
Commit 06fc1241 authored by ale's avatar ale
Browse files

add a bunch of tests for the client

parent a91f53b1
Branches
No related tags found
No related merge requests found
import mox
import unittest
import sys
import time
import Queue
from djrandom_client import client
from djrandom_client import daemonize
from djrandom_client import upload
from djrandom_client import throttle
from djrandom_client import utils
from djrandom_client import filescan
class EndTest(Exception):
pass
class CompareTuple(mox.Comparator):
def __init__(self, *ref):
self.ref = ref
def equals(self, rhs):
for comparator, value in zip(self.ref, rhs):
if comparator != value:
return False
return True
def __repr__(self):
return '(%s)' % ', '.join(str(x) for x in self.ref)
class FullScanTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
self.queue = self.mox.CreateMock(Queue.Queue)
self.mox.StubOutWithMock(filescan, 'recursive_scan')
def test_run_once(self):
filescan.recursive_scan('basedir', self.queue)
self.queue.put(None)
self.mox.ReplayAll()
fs = client.FullScan('basedir', self.queue, 3600, True)
fs.run()
def test_run_loop(self):
self.mox.StubOutWithMock(time, 'sleep')
filescan.recursive_scan('basedir', self.queue)
time.sleep(3600).AndRaise(EndTest)
self.mox.ReplayAll()
fs = client.FullScan('basedir', self.queue, 3600)
self.assertRaises(EndTest, fs.run)
def test_run_filescan_error(self):
self.mox.StubOutWithMock(time, 'sleep')
filescan.recursive_scan('basedir', self.queue).AndRaise(
Exception('ahi ahi'))
time.sleep(1800).AndRaise(EndTest)
self.mox.ReplayAll()
fs = client.FullScan('basedir', self.queue, 3600)
self.assertRaises(EndTest, fs.run)
class ClientOptionsTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
self.mox.StubOutWithMock(utils, 'read_config_defaults')
utils.read_config_defaults(mox.IgnoreArg(), mox.IsA(str))
self.mox.StubOutWithMock(utils, 'check_version')
self.mox.StubOutWithMock(daemonize, 'daemonize')
def _Run(self, args, expect_success=True):
try:
sys.argv = ['client'] + args
status = client.main()
success = not status
except SystemExit, e:
success = False
self.assertEquals(
expect_success, success,
'execution with args: "%s" failed (status %s, expected %s)' % (
' '.join(args), success, expect_success))
def test_client_needs_api_key(self):
self.mox.ReplayAll()
self._Run([], False)
def test_too_many_arguments(self):
self.mox.ReplayAll()
self._Run(['--api_key=KEY', 'arg'], False)
def test_run_default_options(self):
utils.check_version().AndReturn(False)
daemonize.daemonize(mox.IgnoreArg(),
client.run_client,
CompareTuple(mox.IsA(str),
mox.IsA(str),
'KEY',
None, None, True))
self.mox.ReplayAll()
self._Run(['--api_key=KEY'])
def test_run_with_options(self):
utils.check_version().AndReturn(False)
daemonize.daemonize(mox.IgnoreArg(),
client.run_client,
CompareTuple('http://server/receiver',
'/my/music',
'KEY',
True, 10, False))
self.mox.ReplayAll()
self._Run(['--api_key=KEY', '--server_url=http://server/receiver',
'--music_dir=/my/music', '--bwlimit=10',
'--once', '--no_realtime_watch'])
def test_music_path_expands_tilde(self):
utils.check_version().AndReturn(False)
def music_dir_does_not_start_with_tilde(music_dir):
return not music_dir.startswith('~')
daemonize.daemonize(mox.IgnoreArg(),
client.run_client,
CompareTuple('http://server/receiver',
mox.Func(music_dir_does_not_start_with_tilde),
'KEY',
True, 10, False))
self.mox.ReplayAll()
self._Run(['--api_key=KEY', '--server_url=http://server/receiver',
'--music_dir=~/music', '--bwlimit=10',
'--once', '--no_realtime_watch'])
class RunClientTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
def test_run_once(self):
self.mox.StubOutWithMock(throttle, 'set_rate_limit')
throttle.set_rate_limit(150)
uploader = self.mox.CreateMock(upload.Uploader)
self.mox.StubOutWithMock(upload, 'Uploader', use_mock_anything=True)
upload.Uploader('http://server/receiver', 'KEY').AndReturn(uploader)
uploader.setDaemon(True)
uploader.queue = 'queue!'
fs = self.mox.CreateMock(client.FullScan)
self.mox.StubOutWithMock(client, 'FullScan', use_mock_anything=True)
client.FullScan('/my/music', uploader.queue, mox.IsA(int), True
).AndReturn(fs)
fs.setDaemon(True)
uploader.start()
fs.start()
uploader.join()
self.mox.ReplayAll()
client.run_client(
'http://server/receiver',
'/my/music',
'KEY',
True,
150,
False)
if __name__ == '__main__':
unittest.main()
import os
import unittest
import shutil
import tempfile
from djrandom_client import filescan
DIR = 0
FILE = 1
class FakeQueue(object):
def __init__(self):
self.data = []
def put(self, obj):
self.data.append(obj)
class FilescanTest(unittest.TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
self._createtree([
(DIR, 'dir1'),
(FILE, 'dir1/file1.mp3'),
(FILE, 'dir1/file2.txt'),
(DIR, 'dir2'),
(FILE, 'dir2/file3.mp3'),
])
def tearDown(self):
shutil.rmtree(self.dir)
def _createtree(self, treedata):
for dtype, dname in treedata:
path = os.path.join(self.dir, dname)
if dtype == DIR:
os.mkdir(path)
else:
with open(path, 'w') as fd:
fd.write('data\n')
def test_recursive_scan(self):
queue = FakeQueue()
n = filescan.recursive_scan(self.dir, queue)
self.assertEquals(2, n)
expected_files = [
os.path.join(self.dir, 'dir1/file1.mp3'),
os.path.join(self.dir, 'dir2/file3.mp3'),
]
self.assertEquals(expected_files, queue.data)
def test_directory_scan(self):
queue = FakeQueue()
n = filescan.directory_scan(self.dir, queue)
self.assertEquals(0, n)
self.assertEquals([], queue.data)
queue = FakeQueue()
n = filescan.directory_scan(self.dir + '/dir1', queue)
self.assertEquals(1, n)
self.assertEquals([
os.path.join(self.dir, 'dir1/file1.mp3'),
], queue.data)
if __name__ == '__main__':
unittest.main()
import sys
import time
import threading
import unittest
import urllib2
import BaseHTTPServer
from djrandom_client import throttle
class TokenBucketTest(unittest.TestCase):
def test_token_bucket(self):
tb = throttle.TokenBucket(1024, 512)
# A few tokens should be readily available.
self.assertEquals(0, tb.consume(1024))
# Another 2N tokens should take two seconds.
self.assertAlmostEquals(2, tb.consume(1024), 3)
# Now we expect tokens to be available at the
# normal rate.
self.assertAlmostEquals(1, tb.consume(512), 3)
self.assertAlmostEquals(1, tb.consume(512), 3)
self.assertAlmostEquals(1, tb.consume(512), 3)
self.assertAlmostEquals(1, tb.consume(512), 3)
class HttpSinkHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
clen = int(self.headers.get('Content-Length', 0))
data = self.rfile.read(clen)
print 'received %d bytes on %s' % (len(data), self.path)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write('ok')
do_GET = do_POST
class HttpSink(threading.Thread):
def __init__(self, stop):
threading.Thread.__init__(self)
addr = ('127.0.0.1', 0)
self.httpd = BaseHTTPServer.HTTPServer(addr, HttpSinkHandler)
self.url = 'http://127.0.0.1:%d' % self.httpd.server_port
self.stop = stop
def run(self):
while not self.stop.is_set():
self.httpd.handle_request()
class ThrottledHttpTest(unittest.TestCase):
def setUp(self):
self.http_stop = threading.Event()
self.http_server = HttpSink(self.http_stop)
self.http_server.start()
def tearDown(self):
self.http_stop.set()
urllib2.urlopen(self.http_server.url + '/quit').read()
self.http_server.join()
def test_rate_limit(self):
throttle.set_rate_limit(10)
opener = urllib2.build_opener(throttle.ThrottledHTTPHandler)
testdata = "x" * (50 * 1024)
req = urllib2.Request(
self.http_server.url + '/speedtest',
data=testdata,
headers={'Content-Length': str(len(testdata))})
start = time.time()
result = opener.open(req).read()
end = time.time()
self.assertEquals('ok', result)
elapsed = end - start
print >>sys.stderr, 'elapsed: %g secs' % elapsed
self.assertTrue(elapsed > 4.5 and elapsed < 5.5,
'elapsed time out of range: %g' % elapsed)
def test_rate_limit_across_many_requests(self):
throttle.set_rate_limit(10)
opener = urllib2.build_opener(throttle.ThrottledHTTPHandler)
testdata = "x" * (5 * 1024)
n_reqs = 10
start = time.time()
for i in xrange(n_reqs):
req = urllib2.Request(
self.http_server.url + '/speedtest',
data=testdata,
headers={'Content-Length': str(len(testdata))})
result = opener.open(req).read()
end = time.time()
self.assertEquals('ok', result)
elapsed = end - start
print >>sys.stderr, 'elapsed: %g secs' % elapsed
self.assertTrue(elapsed > 4.5 and elapsed < 5.5,
'elapsed time out of range: %g' % elapsed)
if __name__ == '__main__':
unittest.main()
import json
import os
import mox
import unittest
import shutil
import tempfile
import urllib2
from djrandom_client import stats
from djrandom_client import throttle
from djrandom_client import upload
from djrandom_client import utils
class FileDatabaseTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
self.tmpdir = tempfile.mkdtemp()
self.dbpath = os.path.join(self.tmpdir, 'test.db')
def tearDown(self):
shutil.rmtree(self.tmpdir)
mox.MoxTestBase.tearDown(self)
def test_create_db(self):
db = upload.FileDatabase(self.dbpath)
db.close()
self.assertTrue(os.path.exists(self.dbpath))
def test_add_key(self):
db = upload.FileDatabase(self.dbpath)
db.add('/test/key')
self.assertTrue(db.has('/test/key'))
def test_add_key_twice(self):
db = upload.FileDatabase(self.dbpath)
db.add('/test/key')
db.add('/test/key')
self.assertTrue(db.has('/test/key'))
def test_no_such_key(self):
db = upload.FileDatabase(self.dbpath)
self.assertFalse(db.has('/test/key'))
class UploadTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
self.tmpdir = tempfile.mkdtemp()
self.dbpath = os.path.join(self.tmpdir, 'test.db')
self.statspath = os.path.join(self.tmpdir, 'stats')
self.opener = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(urllib2, 'build_opener')
urllib2.build_opener(throttle.ThrottledHTTPHandler
).AndReturn(self.opener)
self.test_file = os.path.join(self.tmpdir, 'testfile')
with open(self.test_file, 'w') as fd:
fd.write('data')
self.test_sha1 = utils.sha1_of_file(self.test_file)
def tearDown(self):
shutil.rmtree(self.tmpdir)
mox.MoxTestBase.tearDown(self)
def _create_uploader(self):
return upload.Uploader('http://server', 'api_key',
db_path=self.dbpath,
state_path=self.statspath)
def test_get(self):
resp = self.mox.CreateMockAnything()
self.opener.open(mox.IsA(urllib2.Request)).AndReturn(resp)
resp.read().AndReturn(json.dumps({'status': True}))
self.mox.ReplayAll()
up = self._create_uploader()
result = up._get('/url')
self.assertEqual(True, result)
def test_put(self):
def check_request(req):
self.assertTrue(isinstance(req, urllib2.Request))
self.assertEquals('data', str(req.get_data()[:]))
return True
resp = self.mox.CreateMockAnything()
self.opener.open(mox.Func(check_request)).AndReturn(resp)
resp.read().AndReturn(json.dumps({'status': True}))
self.mox.ReplayAll()
up = self._create_uploader()
result = up._put('/url', self.test_file)
self.assertEqual(True, result)
def test_upload_already_on_server(self):
self.mox.StubOutWithMock(upload.Uploader, '_get')
upload.Uploader._get('/check/%s' % self.test_sha1
).AndReturn(True)
self.mox.ReplayAll()
up = self._create_uploader()
up.upload(self.test_file)
self.assertEquals(0, up.stats._data.get('uploaded_files', 0))
def test_upload(self):
self.mox.StubOutWithMock(upload.Uploader, '_get')
upload.Uploader._get('/check/%s' % self.test_sha1
).AndReturn(False)
self.mox.StubOutWithMock(upload.Uploader, '_put')
upload.Uploader._put('/upload/%s' % self.test_sha1,
self.test_file
).AndReturn(True)
self.mox.ReplayAll()
up = self._create_uploader()
up.upload(self.test_file)
self.assertEquals(1, up.stats._data['uploaded_files'])
def test_run(self):
self.mox.StubOutWithMock(upload.Uploader, 'upload')
upload.Uploader.upload(self.test_file)
self.mox.ReplayAll()
up = self._create_uploader()
up.queue.put(self.test_file)
up.queue.put(None)
up.run()
db = upload.FileDatabase(self.dbpath)
self.assertTrue(db.has(self.test_file))
def test_run_seen_file(self):
db = upload.FileDatabase(self.dbpath)
db.add(self.test_file)
db.close()
self.mox.StubOutWithMock(upload.Uploader, 'upload')
self.mox.ReplayAll()
up = self._create_uploader()
up.queue.put(self.test_file)
up.queue.put(None)
up.run()
def test_run_with_upload_error(self):
self.mox.StubOutWithMock(upload.Uploader, 'upload')
upload.Uploader.upload(self.test_file).AndRaise(Exception('argh!'))
self.mox.ReplayAll()
up = self._create_uploader()
up.queue.put(self.test_file)
up.queue.put(None)
up.run()
self.assertEquals(1, up.stats._data['errors'])
def test_run_and_check_stats(self):
st = self.mox.CreateMock(stats.Stats)
self.mox.StubOutWithMock(stats, 'Stats', use_mock_anything=True)
stats.Stats(self.statspath).AndReturn(st)
st.set('uploading', None)
st.set('uploading', self.test_file)
self.mox.StubOutWithMock(upload.Uploader, 'upload')
upload.Uploader.upload(self.test_file)
st.set('uploading', None)
st._save()
self.mox.ReplayAll()
up = self._create_uploader()
up.stats = st
up.queue.put(self.test_file)
up.queue.put(None)
up.run()
if __name__ == '__main__':
unittest.main()
import mox
import unittest
import os
import shutil
import tempfile
import urllib2
from djrandom_client import utils
class UtilsTest(mox.MoxTestBase):
def setUp(self):
mox.MoxTestBase.setUp(self)
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
mox.MoxTestBase.tearDown(self)
def test_sha1_of_file(self):
test_file = os.path.join(self.tmpdir, 'testfile')
with open(test_file, 'w') as fd:
fd.write('test\n')
sha1 = utils.sha1_of_file(test_file)
self.assertEquals('4e1243bd22c66e76c2ba9eddc1f91394e57f9f83', sha1)
def test_read_config_defaults(self):
cfg_file = os.path.join(self.tmpdir, 'config')
with open(cfg_file, 'w') as fd:
fd.write('''
# Test config file
var_a=a
var_b = b
var_c = 42
''')
parser = self.mox.CreateMockAnything()
parser.set_default('var_a', 'a')
parser.set_default('var_b', 'b')
parser.set_default('var_c', '42')
self.mox.ReplayAll()
utils.read_config_defaults(parser, cfg_file)
def test_read_config_file_with_error(self):
cfg_file = os.path.join(self.tmpdir, 'config')
with open(cfg_file, 'w') as fd:
fd.write('this is not a config\n')
self.assertRaises(utils.SyntaxError,
utils.read_config_defaults,
None, cfg_file)
def test_read_config_file_missing(self):
utils.read_config_defaults(
None, os.path.join(self.tmpdir, 'nosuchfile'))
def test_check_version_ok(self):
resp = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(urllib2, 'urlopen')
urllib2.urlopen(mox.IsA(str)).AndReturn(resp)
resp.read().AndReturn("VERSION = '0.1'\n")
self.mox.ReplayAll()
self.assertFalse(utils.check_version())
def test_check_version_should_upgrade(self):
resp = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(urllib2, 'urlopen')
urllib2.urlopen(mox.IsA(str)).AndReturn(resp)
resp.read().AndReturn("VERSION = '9999'\n")
self.mox.ReplayAll()
self.assertTrue(utils.check_version())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment