Commit 6ae702bb authored by ale's avatar ale

first attempt at rough ES support

parent 752a70ec
import pyes
import logging
import os
import threading
import time
from lens2 import utils
class LogStore(object):
INDEX = 'logs'
def __init__(self, server_list):
self.conn = pyes.ES(server_list, timeout=10)
self.commit_start = threading.Event()
def init(self):
"""Create the index and set up the schema."""
self.conn.create_index_if_missing(self.INDEX)
default_mapping = {
'log': {
'date_detection': false,
'dynamic_templates': [
{
'template1': {
'match': 'attr_*',
'match_mapping_type': 'string',
'mapping': {
'type': 'string',
'index': 'analyzed',
'store': 'yes',
},
},
}],
},
}
log_mapping = {
'_timestamp': {
'enabled': True,
},
'host': {
'index': 'analyzed',
'store': 'yes',
'type': 'string'
},
'facility': {
'index': 'analyzed',
'store': 'yes',
'type': 'string',
},
'severity': {
'index': 'analyzed',
'store': 'yes',
'type': 'string',
},
'program': {
'index': 'analyzed',
'store': 'yes',
'type': 'string',
},
'msg': {
'index': 'analyzed',
'store': 'yes',
'type': 'string',
'term_vector': 'with_position_offsets',
'compress': 'yes',
},
}
self.conn.put_mapping(None, default_mapping, self.INDEX)
self.conn.put_mapping('log', {'properties': log_mapping}, self.INDEX)
def add(self, line_or_lines, parser, filters=None,
commit_every=None, batch_size=INSERT_BATCH_SIZE):
if isinstance(line_or_lines, basestring):
line_or_lines = [line_or_lines]
start = time.time()
self.commit_start.set()
n_records, n_db_errors, n_parse_errors = 0, 0, 0
def to_doc(stream):
for line in stream:
try:
doc = parser.parse(line)
if filters:
for f in filters:
doc = f(doc)
yield doc
except:
n_parse_errors += 1
chunks = utils.batch(to_doc(line_or_lines), batch_size)
for docs in chunks:
n_records += len(docs)
for doc in docs:
try:
self.conn.index(docs, self.INDEX, 'log', bulk=True)
except:
n_db_errors += 1
self.conn.flush_bulk()
elapsed = time.time() - start
log.info('inserted %d docs (errors: db=%d, parse=%d) in %g seconds, '
'%.5g docs/s',
n_records, n_db_errors, n_parse_errors, elapsed,
n_records / elapsed)
return (n_records, n_db_errors + n_parse_errors)
def expire(self, timelimit):
trange = pyes.utils.ESRange('_timestamp',
from_value=0,
to_value=timelimit)
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilteredQuery(pyes.query.MatchAllQuery(), f)
self.conn.deleteByQuery(q)
import logging
import optparse
import sys
from lens2 import api
from lens2 import io
from lens2 import logstore
from lens2 import pattern
from lens2 import syslog_parser
from lens2 import utils
......@@ -11,7 +11,7 @@ from lens2 import daemonize
log = logging.getLogger(__name__)
DEFAULT_SOLR_URL = 'http://localhost:8080/solr'
DEFAULT_ES_URL = 'localhost:9200'
def run_http_server(lens, opts, args):
......@@ -19,8 +19,17 @@ def run_http_server(lens, opts, args):
server.serve_forever(opts.http_port)
def create_syslog_parser(opts):
if opts.log_format == 'standard':
return syslog_parser.SyslogParser()
elif opts.log_format == 'iso':
return syslog_parser.SyslogIsoParser()
else:
raise Exception('Unsupported log format "%s"' % opts.log_format)
def run_daemon(lens, opts, args):
parser = syslog_parser.SyslogParser()
parser = create_syslog_parser(opts)
filters = None
if opts.pattern_file:
filters = [pattern.PatternExtractor(opts.pattern_file)]
......@@ -29,16 +38,16 @@ def run_daemon(lens, opts, args):
inputf = io.read_fifo(opts.fifo)
lens.insert(inputf, parser, filters, commit_every=300)
except Exception, e:
log.error('FIFO reader died with exception: %s' % str(e))
log.error('FIFO reader died with exception: %s', str(e))
def run_inject(lens, opts, args):
parser = syslog_parser.SyslogParser()
parser = create_syslog_parser(opts)
filters = None
if opts.pattern_file:
filters = [pattern.PatternExtractor(opts.pattern_file)]
for filename in args:
logging.info('injecting %s' % filename)
logging.info('injecting %s', filename)
lens.insert(io.read_file(filename), parser, filters)
......@@ -78,15 +87,18 @@ Known commands:
help='TCP port for the HTTP server (default 8081)')
parser.add_option('--range', dest='time_range', metavar='TIMESPEC',
help='Only search within the specified time range')
parser.add_option('--url', dest='solr_url', metavar='URL',
default=DEFAULT_SOLR_URL,
help='URL for the SOLR API (default %s)' % DEFAULT_SOLR_URL)
parser.add_option('--url', dest='es_url', metavar='URL',
default=DEFAULT_ES_URL,
help='URL for the ES servers (default %default)')
parser.add_option('--log-format', dest='log_format',
default='standard', metavar='FMT',
help='Log format ("standard" or "iso")')
daemonize.add_options(parser)
opts, args = parser.parse_args()
if len(args) < 1:
parser.error('Please specify a command')
lens = api.LensClient(opts.solr_url)
lens = logstore.LogStore(opts.es_url)
cmd, args = args[0], args[1:]
if cmd == 'daemon':
......
......@@ -50,4 +50,4 @@ class PatternExtractor(object):
if match:
for i, attr in enumerate(pattern.groups):
meta['attr_%s' % attr] = match.group(i + 1)
return meta
import os
import re
import time
from lens2 import utils
......@@ -9,11 +10,30 @@ def _parse_tag(tag):
if match:
return (match.group(1), match.group(2))
else:
return (tag.rstrip(":"), None)
return (tag.rstrip(':'), None)
class SyslogParser(object):
"""Provides a parser for syslog data.
class SyslogParserBase(object):
"""Base parser class for syslog data."""
def parse_date(self, line):
raise NotImplementedError()
def __call__(self, line):
stamp, _msg = self.parse_date(line)
host, pri, tag, msg = _msg.split(' ', 3)
prog, pid = _parse_tag(tag)
facility, severity = pri.split('.', 1)
info = {'_timestamp': stamp, 'host': host,
'facility': facility, 'severity': severity,
'program': prog, 'msg': msg}
if pid:
info['pid'] = pid
return info
class SyslogIsoParser(SyslogParserBase):
"""Syslog parser with ISO-8601 dates.
The format supported by this class is not the default syslogd one
(which lacks full timestamp and priority information). Use this
......@@ -27,18 +47,21 @@ class SyslogParser(object):
"""
def parse(self, line):
def parse_date(self, line):
_stamp, _msg = line.split(' ', 1)
stamp = utils.parse_iso8601_date(_stamp)
host, pri, tag, msg = _msg.split(' ', 3)
prog, pid = _parse_tag(tag)
facility, severity = pri.split('.', 1)
info = {'timestamp': stamp, 'host': host,
'facility': facility, 'severity': severity,
'program': prog, 'msg': msg}
if pid:
info['pid'] = pid
return info
return stamp, _msg
class SyslogParser(SyslogParserBase):
"""Standard syslog format parser."""
def __init__(self, tz=None):
self.tz = tz # ignored for now
self.year = time.gmtime().tm_year
def parse_date(self, line):
_stamp, msg = line[:15], line[16:]
stamp_tuple = time.strptime(_stamp, '%b %d %H:%M:%S')
stamp_tuple.tm_year = self.year
return time.mktime(stamp_tuple), msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment