Commit 0285ed35 authored by ale's avatar ale

fix enough functionality to support insert and basic search with ES

parent 0050bbb2
import pyes
import datetime
import logging
import os
import pyes
import threading
import time
from lens2 import utils
log = logging.getLogger(__name__)
def _to_timestamp(t):
if isinstance(t, datetime.datetime):
t = time.mktime(t.utctimetuple())
return int(t * 1000000)
class LogStore(object):
INDEX = 'logs'
BULK_SIZE = 400
def __init__(self, server_list):
self.conn = pyes.ES(server_list, timeout=10)
self.conn = pyes.ES(server_list, timeout=10,
autorefresh=True,
bulk_size=self.BULK_SIZE)
self.commit_start = threading.Event()
try:
self.conn.open_index(self.INDEX)
log.info('opened index "%s"', self.INDEX)
except pyes.exceptions.IndexMissingException:
self.init()
def clear(self):
""""Remove the entire index."""
self.conn.delete_index(self.INDEX)
def init(self):
"""Create the index and set up the schema."""
log.info('creating index "%s"', self.INDEX)
self.conn.create_index_if_missing(self.INDEX)
default_mapping = {
default_mappings = {
'log': {
'date_detection': false,
'_source': {
'compress': True,
},
'date_detection': False,
'dynamic_templates': [
{
'template1': {
......@@ -29,89 +53,117 @@ class LogStore(object):
'mapping': {
'type': 'string',
'index': 'analyzed',
'store': 'yes',
'store': 'no',
},
},
}],
},
}
template = {
'template': self.INDEX,
'mappings': default_mappings,
}
self.conn._send_request(
'PUT',
self.conn._make_path(['_template', 'tpl1']),
template)
log_mapping = {
'_timestamp': {
'enabled': True,
'timestamp': {
'type': 'long',
'store': 'no',
'include_in_all': 'no',
},
'host': {
'index': 'analyzed',
'store': 'yes',
'store': 'no',
'type': 'string'
},
'facility': {
'index': 'analyzed',
'store': 'yes',
'store': 'no',
'type': 'string',
},
'severity': {
'index': 'analyzed',
'store': 'yes',
'store': 'no',
'type': 'string',
},
'program': {
'index': 'analyzed',
'store': 'yes',
'store': 'no',
'type': 'string',
},
'msg': {
'index': 'analyzed',
'store': 'yes',
'type': 'string',
'term_vector': 'with_position_offsets',
'compress': 'yes',
'term_vector': 'with_positions_offsets',
},
}
self.conn.put_mapping(None, default_mapping, self.INDEX)
self.conn.put_mapping('log', {'properties': log_mapping}, self.INDEX)
def add(self, line_or_lines, parser, filters=None,
commit_every=None, batch_size=INSERT_BATCH_SIZE):
def insert(self, line_or_lines, parser, filters=None):
if isinstance(line_or_lines, basestring):
line_or_lines = [line_or_lines]
start = time.time()
self.commit_start.set()
n_records, n_db_errors, n_parse_errors = 0, 0, 0
stats = {'n_records': 0, 'n_db_errors': 0, 'n_parse_errors': 0}
def to_doc(stream):
for line in stream:
try:
doc = parser.parse(line)
doc = parser(line)
if filters:
for f in filters:
doc = f(doc)
doc['timestamp'] = _to_timestamp(doc['timestamp'])
yield doc
except:
n_parse_errors += 1
except Exception, e:
log.exception('parse error: %s', str(e))
stats['n_parse_errors'] += 1
chunks = utils.batch(to_doc(line_or_lines), batch_size)
for docs in chunks:
n_records += len(docs)
for doc in docs:
try:
self.conn.index(docs, self.INDEX, 'log', bulk=True)
except:
n_db_errors += 1
self.conn.flush_bulk()
for doc in to_doc(line_or_lines):
stats['n_records'] += 1
try:
self.conn.index(doc, self.INDEX, 'log', bulk=True)
except:
stats['n_db_errors'] += 1
self.conn.force_bulk()
elapsed = time.time() - start
log.info('inserted %d docs (errors: db=%d, parse=%d) in %g seconds, '
'%.5g docs/s',
n_records, n_db_errors, n_parse_errors, elapsed,
n_records / elapsed)
return (n_records, n_db_errors + n_parse_errors)
stats['n_records'], stats['n_db_errors'],
stats['n_parse_errors'], elapsed,
stats['n_records'] / elapsed)
return (stats['n_records'],
stats['n_db_errors'] + stats['n_parse_errors'])
def expire(self, timelimit):
trange = pyes.utils.ESRange('_timestamp',
from_value=0,
to_value=timelimit)
trange = pyes.utils.ESRange(
'timestamp',
from_value=0,
to_value=_to_timestamp(timelimit))
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilteredQuery(pyes.query.MatchAllQuery(), f)
self.conn.deleteByQuery(q)
self.conn.optimize(self.INDEX)
def search(self, query_str, time_range):
log.info('search: "%s"', query_str)
query = pyes.query.StringQuery(query_str)
if time_range:
trange = pyes.utils.ESRange(
'timestamp',
from_value=_to_timestamp(time_range[0] or 0),
to_value=_to_timestamp(time_range[1] or time.time()))
f = pyes.filters.NumericRangeFilter(trange)
query = pyes.query.FilteredQuery(query, f)
search = pyes.query.Search(query, track_scores=False,
sort={'timestamp': 'asc'})
results = self.conn.search(search)
print results
return (x['_source'] for x in results['hits']['hits'])
......@@ -24,6 +24,8 @@ def create_syslog_parser(opts):
return syslog_parser.SyslogParser()
elif opts.log_format == 'iso':
return syslog_parser.SyslogIsoParser()
elif opts.log_format == 'dumb':
return syslog_parser.SyslogDumbParser()
else:
raise Exception('Unsupported log format "%s"' % opts.log_format)
......@@ -36,7 +38,7 @@ def run_daemon(lens, opts, args):
while True:
try:
inputf = io.read_fifo(opts.fifo)
lens.insert(inputf, parser, filters, commit_every=300)
lens.insert(inputf, parser, filters)
except Exception, e:
log.error('FIFO reader died with exception: %s', str(e))
......@@ -98,6 +100,7 @@ Known commands:
if len(args) < 1:
parser.error('Please specify a command')
logging.basicConfig(level=logging.DEBUG)
lens = logstore.LogStore(opts.es_url)
cmd, args = args[0], args[1:]
......@@ -114,7 +117,8 @@ Known commands:
elif cmd == 'inject':
if len(args) < 1:
parser.error('Too few arguments')
run_inject(lens, opts, args)
opts.foreground = True
daemonize.daemonize(opts, run_inject, lens, opts, args)
elif cmd == 'search':
if len(args) < 1:
parser.error('Too few arguments')
......@@ -123,6 +127,9 @@ Known commands:
if len(args) != 1:
parser.error('Syntax: expire <TIMESPEC>')
do_expire(lens, opts, args)
elif cmd == 'wipe':
lens.clear()
print 'Database wiped.'
else:
parser.error('Unknown command')
......
......@@ -24,7 +24,7 @@ class SyslogParserBase(object):
host, pri, tag, msg = _msg.split(' ', 3)
prog, pid = _parse_tag(tag)
facility, severity = pri.split('.', 1)
info = {'_timestamp': stamp, 'host': host,
info = {'timestamp': stamp, 'host': host,
'facility': facility, 'severity': severity,
'program': prog, 'msg': msg}
if pid:
......@@ -63,5 +63,23 @@ class SyslogParser(SyslogParserBase):
def parse_date(self, line):
_stamp, msg = line[:15], line[16:]
stamp_tuple = time.strptime(_stamp, '%b %d %H:%M:%S')
stamp_tuple.tm_year = self.year
return time.mktime(stamp_tuple), msg
fixed_tuple = (self.year, stamp_tuple.tm_mon, stamp_tuple.tm_mday,
stamp_tuple.tm_hour, stamp_tuple.tm_min,
stamp_tuple.tm_sec, stamp_tuple.tm_wday,
stamp_tuple.tm_yday, -1)
return time.mktime(fixed_tuple), msg
class SyslogDumbParser(SyslogParser):
def __call__(self, line):
stamp, rest = self.parse_date(line)
host, tag, msg = rest.split(' ', 2)
prog, pid = _parse_tag(tag)
info = {'timestamp': stamp, 'host': host,
'facility': 'all', 'severity': 'info',
'program': prog, 'msg': msg}
if pid:
info['pid'] = pid
return info
......@@ -76,6 +76,7 @@ def format_log(log):
tag = log['program']
if log.get('pid'):
tag = '%s[%s]' % (tag, log['pid'])
stamp = datetime.fromtimestamp(log['timestamp'] / 1000000.0)
return '%s %s %s.%s %s: %s\n' % (
log['timestamp'].isoformat(), log['host'],
stamp.isoformat(), log['host'],
log['facility'], log['severity'], tag, log['msg'])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment