Skip to content
Snippets Groups Projects
Commit 9f56b8f1 authored by ale's avatar ale
Browse files

add initial support for upload bandwidth throttling

parent ebbe181a
No related branches found
No related tags found
No related merge requests found
import socket
import httplib
import threading
import time
import urllib2
class TokenBucket(object):
"""An implementation of the token bucket algorithm.
Source: http://code.activestate.com/recipes/511490/ (thread-safe).
"""
def __init__(self, tokens, fill_rate):
"""Cosntructor.
tokens is the total tokens in the bucket. fill_rate is the
rate in tokens/second that the bucket will be refilled.
"""
self.capacity = float(tokens)
self._tokens = float(tokens)
self.fill_rate = float(fill_rate)
self.timestamp = time.time()
self.lock = threading.RLock()
def consume(self, tokens):
"""Consume tokens from the bucket.
Returns 0 if there were sufficient tokens, otherwise the
expected time until enough tokens become available.
"""
with self.lock:
tokens = max(tokens, self.tokens)
expected_time = (tokens - self.tokens) / self.fill_rate
if expected_time <= 0:
self._tokens -= tokens
return max(0, expected_time)
@property
def tokens(self):
with self.lock:
if self._tokens < self.capacity:
now = time.time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
value = self._tokens
return value
# We maintain a single process-wide TokenBucket for rate-limiting
# HTTP uploads. If set_rate_limit() is not called, no bandwidth
# limit will be applied.
_bucket = None
def set_rate_limit(kbps):
global _bucket
_bucket = TokenBucket(2 * kbps, kbps)
class ThrottledConnection(httplib.HTTPConnection):
def send(self, str):
if self.sock is None:
if self.auto_open:
self.connect()
else:
raise httplib.NotConnected()
if self.debuglevel > 0:
print "send:", repr(str)
try:
tot = len(str)
for off in xrange(0, tot, BLOCK_SIZE):
n = min(BLOCK_SIZE, tot - off)
chunk = str[off:off + n]
if _bucket:
wait_time = _bucket.consume(n)
while wait_time > 0:
time.sleep(wait_time)
wait_time = _bucket.consume(n)
self.sock.sendall(chunk)
except socket.error, v:
if v[0] == 32: # Broken pipe
self.close()
raise
class ThrottledHTTPHandler(urllib2.HTTPHandler):
def http_open(self, req):
return self.do_open(ThrottledHTTPConnection, req)
...@@ -10,6 +10,7 @@ import urllib2 ...@@ -10,6 +10,7 @@ import urllib2
import Queue import Queue
from djrandom_client import utils from djrandom_client import utils
from djrandom_client import stats from djrandom_client import stats
from djrandom_client import throttle
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -38,11 +39,12 @@ class Uploader(object): ...@@ -38,11 +39,12 @@ class Uploader(object):
self.server_url = server_url self.server_url = server_url
self.queue = Queue.Queue(1000) self.queue = Queue.Queue(1000)
self.db = FileDatabase(db) self.db = FileDatabase(db)
self.opener = urllib2.build_opener(throttle.ThrottledHTTPHandler)
def _get(self, url): def _get(self, url):
req = urllib2.Request(self.server_url + url, req = urllib2.Request(self.server_url + url,
headers={'X-Key': self.api_key}) headers={'X-Key': self.api_key})
result = json.loads(urllib2.urlopen(req).read()) result = json.loads(self.opener.open(req).read())
return result['status'] return result['status']
def _put(self, url, path): def _put(self, url, path):
...@@ -57,8 +59,7 @@ class Uploader(object): ...@@ -57,8 +59,7 @@ class Uploader(object):
headers={'X-Key': self.api_key, headers={'X-Key': self.api_key,
'Content-Type': 'audio/mpeg', 'Content-Type': 'audio/mpeg',
'Content-Length': str(filesize)}) 'Content-Length': str(filesize)})
req.get_method = lambda: 'POST' result = json.loads(self.opener.open(req).read())
result = json.loads(urllib2.urlopen(req).read())
return result['status'] return result['status']
finally: finally:
filemap.close() filemap.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment