Commit 848b6798 authored by ale's avatar ale
Browse files

apparently the context manager syntax for Pipeline objects is not supported in...

apparently the context manager syntax for Pipeline objects is not supported in the Redis Python library of Debian stable
parent 6e184a7b
...@@ -12,72 +12,79 @@ class Accounting(object): ...@@ -12,72 +12,79 @@ class Accounting(object):
def add_connection(self, cn, conn_info): def add_connection(self, cn, conn_info):
data = json.dumps(conn_info) data = json.dumps(conn_info)
with self._local_db.pipeline() as local_pipe: local_pipe = self._local_db.pipeline()
local_pipe.sadd('in_cns', cn) local_pipe.sadd('in_cns', cn)
local_pipe.rpush('aggr_in:%s' % cn, data) local_pipe.rpush('aggr_in:%s' % cn, data)
local_pipe.execute()
def get_connections(self, cn, n=0): def get_connections(self, cn, n=0):
return self._aggr_db.lrange('connections:%s' % cn, 0, (n - 1)) return self._aggr_db.lrange('connections:%s' % cn, 0, (n - 1))
def aggregate(self, cn): def aggregate(self, cn):
conns = [] conns = []
with self._local_db.pipeline() as local_pipe: local_pipe = self._local_db.pipeline()
while True: while True:
try: try:
key = 'aggr_in:%s' % cn key = 'aggr_in:%s' % cn
local_pipe.watch(key) local_pipe.watch(key)
for data in local_pipe.lrange(key, 0, -1): for data in local_pipe.lrange(key, 0, -1):
conns.append(data) conns.append(data)
local_pipe.delete(key) local_pipe.delete(key)
break break
except redis.WatchError: except redis.WatchError:
del conns[:] del conns[:]
finally:
local_pipe.reset()
# Compute daily aggregates, and copy the connection data to the master. # Compute daily aggregates, and copy the connection data to the master.
aggr = {} aggr = {}
with self._aggr_db.pipeline() as pipe: pipe = self._aggr_db.pipeline()
pipe.sadd('all_cns', cn) pipe.sadd('all_cns', cn)
for data in conns: for data in conns:
pipe.lpush('connections:%s' % cn, data) pipe.lpush('connections:%s' % cn, data)
conn_info = json.loads(data) conn_info = json.loads(data)
day = time.strftime('%Y%m%d', time.gmtime(conn_info['end_time'])) day = time.strftime('%Y%m%d', time.gmtime(conn_info['end_time']))
aggr_day = aggr.setdefault(day, {'conn_time': 0, aggr_day = aggr.setdefault(day, {'conn_time': 0,
'bytes_sent': 0, 'bytes_sent': 0,
'bytes_recv': 0}) 'bytes_recv': 0})
for attr in ('conn_time', 'bytes_recv', 'bytes_sent'): for attr in ('conn_time', 'bytes_recv', 'bytes_sent'):
aggr_day[attr] += conn_info[attr] aggr_day[attr] += conn_info[attr]
pipe.execute()
# Aggregate values on the master server. # Aggregate values on the master server.
days = aggr.keys() days = aggr.keys()
aggr_key = 'aggr:%s' % cn aggr_key = 'aggr:%s' % cn
with self._aggr_db.pipeline() as pipe: pipe = self._aggr_db.pipeline()
while True: while True:
try: try:
pipe.watch(aggr_key) pipe.watch(aggr_key)
pipe.multi() old_values = [json.loads(x) for x in pipe.hmget(aggr_key, days)]
old_values = [json.loads(x) for x in old_aggr = dict(zip(days, old_values))
pipe.hmget(aggr_key, days).execute()] pipe.multi()
old_aggr = dict(zip(days, old_values)) for day, aggr_data in aggr.iteritems():
pipe.multi() old_aggr_data = old_aggr.get(day, {})
for day, aggr_data in aggr.iteritems(): for attr in aggr_data:
old_aggr_data = old_aggr.get(day, {}) aggr_data[attr] += old_aggr_data.get(attr, 0)
for attr in aggr_data: pipe.hset(aggr_key, day, json.dumps(aggr_data))
aggr_data[attr] += old_aggr_data.get(attr, 0) pipe.execute()
pipe.hset(aggr_key, day, json.dumps(aggr_data)) break
break except redis.WatchError:
except redis.WatchError: continue
continue finally:
pipe.reset()
def aggregate_all(self): def aggregate_all(self):
with self._local_db.pipeline() as local_pipe: local_pipe = self._local_db.pipeline()
while True: while True:
try: try:
local_pipe.watch('in_cns') local_pipe.watch('in_cns')
input_cns = local_pipe.get('in_cns') input_cns = local_pipe.get('in_cns')
local_pipe.delete('in_cns') local_pipe.delete('in_cns')
break break
except redis.WatchError: except redis.WatchError:
continue continue
finally:
local_pipe.reset()
for cn in input_cns: for cn in input_cns:
self.aggregate(cn) self.aggregate(cn)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment