Commit 6e184a7b authored by ale's avatar ale
Browse files

add an accounting system based on Redis, and related command-line tool

parent e04e4f9f
import json
import optparse
import redis
import time
class Accounting(object):
def __init__(self, local_db, aggr_db=None):
self._local_db = local_db
self._aggr_db = aggr_db or local_db
def add_connection(self, cn, conn_info):
data = json.dumps(conn_info)
with self._local_db.pipeline() as local_pipe:
local_pipe.sadd('in_cns', cn)
local_pipe.rpush('aggr_in:%s' % cn, data)
def get_connections(self, cn, n=0):
return self._aggr_db.lrange('connections:%s' % cn, 0, (n - 1))
def aggregate(self, cn):
conns = []
with self._local_db.pipeline() as local_pipe:
while True:
try:
key = 'aggr_in:%s' % cn
local_pipe.watch(key)
for data in local_pipe.lrange(key, 0, -1):
conns.append(data)
local_pipe.delete(key)
break
except redis.WatchError:
del conns[:]
# Compute daily aggregates, and copy the connection data to the master.
aggr = {}
with self._aggr_db.pipeline() as pipe:
pipe.sadd('all_cns', cn)
for data in conns:
pipe.lpush('connections:%s' % cn, data)
conn_info = json.loads(data)
day = time.strftime('%Y%m%d', time.gmtime(conn_info['end_time']))
aggr_day = aggr.setdefault(day, {'conn_time': 0,
'bytes_sent': 0,
'bytes_recv': 0})
for attr in ('conn_time', 'bytes_recv', 'bytes_sent'):
aggr_day[attr] += conn_info[attr]
# Aggregate values on the master server.
days = aggr.keys()
aggr_key = 'aggr:%s' % cn
with self._aggr_db.pipeline() as pipe:
while True:
try:
pipe.watch(aggr_key)
pipe.multi()
old_values = [json.loads(x) for x in
pipe.hmget(aggr_key, days).execute()]
old_aggr = dict(zip(days, old_values))
pipe.multi()
for day, aggr_data in aggr.iteritems():
old_aggr_data = old_aggr.get(day, {})
for attr in aggr_data:
aggr_data[attr] += old_aggr_data.get(attr, 0)
pipe.hset(aggr_key, day, json.dumps(aggr_data))
break
except redis.WatchError:
continue
def aggregate_all(self):
with self._local_db.pipeline() as local_pipe:
while True:
try:
local_pipe.watch('in_cns')
input_cns = local_pipe.get('in_cns')
local_pipe.delete('in_cns')
break
except redis.WatchError:
continue
for cn in input_cns:
self.aggregate(cn)
def get_aggregate_counts(self, cn, when=None):
if not when:
when = time.time()
day = time.strftime('%Y%m%d', time.gmtime(when))
data = self._aggr_db.hget('aggr:%s' % cn, day)
if data:
return json.loads(data)
else:
return {'bytes_sent': 0, 'bytes_recv': 0, 'conn_time': 0}
def main():
parser = optparse.OptionParser()
parser.add_option('--db', dest='local_db', default='localhost:6379',
help='endpoint of the local Redis database')
parser.add_option('--aggr-db', dest='aggr_db',
help='endpoint of the aggregate Redis database (optional)')
parser.add_option('--password', dest='password',
help='Redis password (optional)')
opts, args = parser.parse_args()
if not args:
parser.error('No command specified')
cmd, args = args[0], args[1:]
if cmd == 'help':
parser.show_help()
local_db = redis.Redis(opts.local_db, password=opts.password)
if opts.aggr_db:
aggr_db = redis.Redis(opts.aggr_db, password=opts.password)
else:
aggr_db = None
acct = Accounting(local_db, aggr_db)
if cmd == 'connect':
if len(args) < 2:
parser.error('Syntax: connect <CN> <ATTR=VALUE>...')
cn = args[0]
conn_info = str2kv(args[1:])
for mandatory in ('bytes_sent', 'bytes_recv', 'remote_ip', 'conn_time'):
if mandatory not in conn_info:
parser.error('Missing mandatory attribute "%s"' % mandatory)
for int_attr in ('bytes_sent', 'bytes_recv', 'conn_time'):
conn_info[int_attr] = int(conn_info[int_attr])
conn_info['end_time'] = int(time.time())
conn_info['start_time'] = conn_info['end_time'] - conn_info['conn_time']
acct.add_connection(cn, conn_info)
elif cmd == 'aggregate':
if len(args) != 1:
parser.error('Syntax: aggregate <CN | "all">')
cn = args[0]
if cn == 'all':
acct.aggregate_all()
else:
acct.aggregate(cn)
elif cmd == 'get-aggr':
if len(args) != 1:
parser.error('Syntax: get-aggr <CN>')
cn = args[0]
result = acct.get_aggregate_counts(cn)
for key in sorted(result):
print '%s %d' % (key, result[key])
elif cmd == 'list':
if len(args) != 1:
parser.error('Syntax: list <CN>')
cn = args[0]
for conn in acct.get_connections(cn):
print '%s %s %-6d %-20s %-20s' % (
time.strftime('%Y/%m/%d %H:%M:%S', time.gmtime(conn['start_time'])),
time.strftime('%Y/%m/%d %H:%M:%S', time.gmtime(conn['end_time'])),
conn['conn_time'],
'%d/%d' % (conn['bytes_recv'], conn['bytes_sent']),
conn['remote_ip'])
else:
parser.error('Unknown command "%s"' % cmd)
if __name__ == '__main__':
main()
......@@ -16,6 +16,7 @@ setup(
entry_points = {
"console_scripts": [
"autoca = autoca.ca_tool:main",
"vpnacct = autovpn.acct:main",
],
},
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment