diff --git a/client/djrandom_client/throttle.py b/client/djrandom_client/throttle.py new file mode 100644 index 0000000000000000000000000000000000000000..7fd095f9df782c48e7eb3ee18e6784b2fb442eaa --- /dev/null +++ b/client/djrandom_client/throttle.py @@ -0,0 +1,94 @@ +import socket +import httplib +import threading +import time +import urllib2 + + +class TokenBucket(object): + """An implementation of the token bucket algorithm. + + Source: http://code.activestate.com/recipes/511490/ (thread-safe). + """ + + def __init__(self, tokens, fill_rate): + """Cosntructor. + + tokens is the total tokens in the bucket. fill_rate is the + rate in tokens/second that the bucket will be refilled. + """ + self.capacity = float(tokens) + self._tokens = float(tokens) + self.fill_rate = float(fill_rate) + self.timestamp = time.time() + self.lock = threading.RLock() + + def consume(self, tokens): + """Consume tokens from the bucket. + + Returns 0 if there were sufficient tokens, otherwise the + expected time until enough tokens become available. + """ + with self.lock: + tokens = max(tokens, self.tokens) + expected_time = (tokens - self.tokens) / self.fill_rate + if expected_time <= 0: + self._tokens -= tokens + return max(0, expected_time) + + @property + def tokens(self): + with self.lock: + if self._tokens < self.capacity: + now = time.time() + delta = self.fill_rate * (now - self.timestamp) + self._tokens = min(self.capacity, self._tokens + delta) + self.timestamp = now + value = self._tokens + return value + + +# We maintain a single process-wide TokenBucket for rate-limiting +# HTTP uploads. If set_rate_limit() is not called, no bandwidth +# limit will be applied. +_bucket = None + +def set_rate_limit(kbps): + global _bucket + _bucket = TokenBucket(2 * kbps, kbps) + + +class ThrottledConnection(httplib.HTTPConnection): + + def send(self, str): + if self.sock is None: + if self.auto_open: + self.connect() + else: + raise httplib.NotConnected() + + if self.debuglevel > 0: + print "send:", repr(str) + try: + tot = len(str) + for off in xrange(0, tot, BLOCK_SIZE): + n = min(BLOCK_SIZE, tot - off) + chunk = str[off:off + n] + if _bucket: + wait_time = _bucket.consume(n) + while wait_time > 0: + time.sleep(wait_time) + wait_time = _bucket.consume(n) + self.sock.sendall(chunk) + except socket.error, v: + if v[0] == 32: # Broken pipe + self.close() + raise + + +class ThrottledHTTPHandler(urllib2.HTTPHandler): + + def http_open(self, req): + return self.do_open(ThrottledHTTPConnection, req) + + diff --git a/client/djrandom_client/upload.py b/client/djrandom_client/upload.py index 27880d33c7ae48d778c4d54332fec204674c55b4..2c97be660ca7121b6e46514dcd3a9232a5fed14a 100644 --- a/client/djrandom_client/upload.py +++ b/client/djrandom_client/upload.py @@ -10,6 +10,7 @@ import urllib2 import Queue from djrandom_client import utils from djrandom_client import stats +from djrandom_client import throttle log = logging.getLogger(__name__) @@ -38,11 +39,12 @@ class Uploader(object): self.server_url = server_url self.queue = Queue.Queue(1000) self.db = FileDatabase(db) + self.opener = urllib2.build_opener(throttle.ThrottledHTTPHandler) def _get(self, url): req = urllib2.Request(self.server_url + url, headers={'X-Key': self.api_key}) - result = json.loads(urllib2.urlopen(req).read()) + result = json.loads(self.opener.open(req).read()) return result['status'] def _put(self, url, path): @@ -57,8 +59,7 @@ class Uploader(object): headers={'X-Key': self.api_key, 'Content-Type': 'audio/mpeg', 'Content-Length': str(filesize)}) - req.get_method = lambda: 'POST' - result = json.loads(urllib2.urlopen(req).read()) + result = json.loads(self.opener.open(req).read()) return result['status'] finally: filemap.close()