Commit e7a088e5 authored by godog's avatar godog
Browse files

elasticsearch 2.0 support

parent 89b6dfcc
import datetime
import logging
import os
import threading
import time
from lens2 import utils
import pyes
import pyes.utils
from pyes.exceptions import ElasticSearchException
import elasticsearch
import elasticsearch.helpers
from elasticsearch_dsl import Search
import pprint
log = logging.getLogger(__name__)
......@@ -24,6 +22,10 @@ def from_timestamp(t):
return datetime.datetime.utcfromtimestamp(t / 1000000.0)
class NoIndicesFound(Exception):
pass
class LogStore(object):
"""Interface to the log database.
......@@ -36,8 +38,7 @@ class LogStore(object):
BULK_SIZE = 400
def __init__(self, server_list, timeout=60):
self.conn = pyes.ES(server_list, timeout=timeout,
bulk_size=self.BULK_SIZE)
self.conn = elasticsearch.Elasticsearch(server_list, timeout=timeout)
self._open_indices = {}
def _open_index(self, index_name):
......@@ -45,9 +46,9 @@ class LogStore(object):
return
try:
self.conn.indices.open_index(index_name)
log.info('opened index "%s"', index_name)
except pyes.exceptions.IndexMissingException:
log.info('opening index %r', index_name)
self.conn.indices.open(index_name)
except elasticsearch.NotFoundError:
self._init_index(index_name)
self._open_indices[index_name] = True
......@@ -55,8 +56,7 @@ class LogStore(object):
@classmethod
def _index_from_timestamp(cls, t):
if not isinstance(t, datetime.datetime):
t = datetime.datetime.utcfromtimestamp(int(t/1000000))
t = from_timestamp(t)
return cls.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d')
@classmethod
......@@ -77,20 +77,23 @@ class LogStore(object):
return None
def clear(self):
""""Remove the entire index."""
""""Remove all indices."""
for index in self._valid_indices():
self.conn.indices.delete_index(index)
self.conn.indices.delete(index)
def get_status(self):
"""Return the index status."""
indices = self.open_indices.keys()
# XXX fix, does it work?
status = self.conn.indices.status(indices=indices)
return [status['indices'][x] for x in indices]
def _init_index(self, index_name):
"""Create the index and set up the schema."""
log.info('creating index "%s"', index_name)
self.conn.indices.create_index_if_missing(index_name)
log.info('creating index %r', index_name)
self.conn.indices.create(index=index_name, ignore=404)
# XXX Other than where documented, existing type and field mappings cannot be updated.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#updating-field-mappings
default_mappings = {
'log': {
'_source': {
......@@ -116,10 +119,7 @@ class LogStore(object):
'template': index_name,
'mappings': default_mappings,
}
self.conn._send_request(
'PUT',
pyes.utils.make_path('_template', 'tpl1'),
template)
self.conn.indices.put_template(name='tpl1', body=template, create=True)
log_mapping = {
'timestamp': {
'index': 'not_analyzed',
......@@ -157,7 +157,7 @@ class LogStore(object):
'term_vector': 'with_positions_offsets',
},
}
self.conn.indices.put_mapping('log', {'properties': log_mapping}, index_name)
self.conn.indices.put_mapping(doc_type='log', body={'properties': log_mapping}, index=index_name)
def insert(self, line_or_lines, parser, filters=None):
"""Insert one or more logs.
......@@ -188,20 +188,24 @@ class LogStore(object):
doc['timestamp'] = to_timestamp(doc['timestamp'])
yield doc
except Exception, e:
log.exception('parse error: %s', str(e))
log.exception('parse error: %r', e)
stats['n_parse_errors'] += 1
for doc in to_doc(line_or_lines):
stats['n_records'] += 1
index_name = self._index_from_timestamp(doc['timestamp'])
self._open_index(index_name)
try:
self.conn.index(doc, index_name, 'log', bulk=True)
except Exception, e:
if not isinstance(e, ElasticSearchException):
raise e
stats['n_db_errors'] += 1
self.conn.force_bulk()
def to_bulk(stream):
for doc in to_doc(stream):
doc['_index'] = self._index_from_timestamp(doc['timestamp'])
doc['_type'] = 'log'
# XXX pass in also _id ?
yield doc
success, errors = elasticsearch.helpers.bulk(self.conn,
to_bulk(line_or_lines),
chunk_size=self.BULK_SIZE,
raise_on_error=False,
raise_on_exception=False)
stats['n_db_errors'] += len(errors)
stats['n_records'] += success
elapsed = time.time() - start
log.info('inserted %d docs (errors: db=%d, parse=%d) in %g seconds, '
'%.5g docs/s',
......@@ -223,11 +227,11 @@ class LogStore(object):
index_date = self._datetime_from_index(index)
if index_date is None or index_date >= timelimit:
continue
log.debug("deleting %s", index)
self.conn.indices.delete_index(index)
log.info('deleting %r', index)
self.conn.indices.delete(index)
timediff = datetime.datetime.utcnow() - timelimit
# XXX fix for es 2.0
if timediff.days == 0:
trange = pyes.utils.ESRange(
'timestamp',
......@@ -236,47 +240,50 @@ class LogStore(object):
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilterQuery(pyes.query.MatchAllQuery(), f)
index = self._index_from_timestamp(timelimit)
# XXX this fails with an ES error
self.conn.delete_by_query(index, ['log'], q)
self.conn.optimize(index)
else:
_expire_indices(timelimit)
def _make_search(self, query_str, time_range, **search_args):
query = pyes.query.QueryStringQuery(query_str)
def _make_search(self, query_str, time_range):
indices = list(self._valid_indices())
if time_range:
trange = pyes.utils.ESRange(
'timestamp',
from_value=to_timestamp(time_range[0] or 0),
to_value=to_timestamp(time_range[1] or time.time()))
f = pyes.filters.NumericRangeFilter(trange)
query = pyes.query.FilteredQuery(query, f)
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
search = pyes.query.Search(query, track_scores=False,
sort={'timestamp': 'desc'},
**search_args)
return search
if not indices:
e = NoIndicesFound('no indices for search %r range=%s' %
(query_str, time_range))
log.exception(e)
raise e
s = Search(using=self.conn, index=','.join(indices))
s = s.params(search_type='query_then_fetch')
s = s.query("query_string", query=query_str)
if time_range:
s = s.filter('range',
**{'timestamp': {
'gte': to_timestamp(time_range[0] or 0),
'lte': to_timestamp(time_range[1] or time.time()),
}})
return s.sort('-timestamp')
def _valid_indices(self):
for x in self.conn.indices.aliases():
# XXX check return value
for x in self.conn.indices.get_aliases():
if x.startswith(self.INDEX_PREFIX + '-'):
yield x
def search(self, query_str, time_range, size=100, start=0, facets=None):
log.debug('search: "%s", range=%s, start=%d, facets=%s',
query_str, time_range, start, facets)
search = self._make_search(query_str, time_range,
start=start, size=size)
indices = list(self._valid_indices())
if time_range:
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
if not indices:
log.warn('no indices found for search: "%s" range=%s' % (query_str, time_range))
return
search = self._make_search(query_str, time_range)[start:size]
if not search:
return [], 0, {}, 0
# XXX reenable
facets = None
if facets:
for f, fsize in facets:
if f == 'timestamp':
......@@ -288,16 +295,16 @@ class LogStore(object):
else:
search.facet.add_term_facet(f, size=fsize)
result = self.conn.search(search, model=lambda x,y: y,
search_type='query_then_fetch', indices=indices)
docs = [x['_source'] for x in result]
result = search.execute()
docs = [x.to_dict() for x in result]
result_facets = result.facets if facets else {}
return docs, result.total, result_facets, result.took
return docs, result.hits.total, result_facets, result.took
def scan(self, query_str, time_range, size=100):
log.info('scan: "%s"', query_str)
search = self._make_search(query_str, time_range, size=size)
results = self.conn.scan(search)
for batch in results:
for doc in batch['hits']['hits']:
yield doc['_source']
log.info('scan: %r', query_str)
search = self._make_search(query_str, time_range)[:size]
if not search:
yield None
else:
for doc in search.scan():
yield doc
......@@ -121,7 +121,12 @@ Known commands:
if len(args) < 1:
parser.error('Please specify a command')
logging.basicConfig(level=logging.DEBUG)
# XXX turn into a debug option
logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("elasticsearch").setLevel(logging.ERROR)
logging.getLogger("elasticsearch.trace").setLevel(logging.ERROR)
lens = logstore.LogStore(opts.es_url, timeout=3600)
cmd, args = args[0], args[1:]
......
......@@ -77,8 +77,9 @@ def batch(iterator, n=100):
def format_log(log):
"""Print 'log' in syslog-compatible format."""
tag = log['program']
if log.get('pid'):
if 'pid' in log:
tag = '%s[%s]' % (tag, log['pid'])
return '%s %s %s.%s %s: %s\n' % (
log['timestamp'].strftime('%Y-%m-%dT%H:%M:%S'), log['host'],
log.get('facility', '<none>'), log.get('severity', '<none>'), tag, log['msg'])
log.get('facility', '<none>'), log.get('severity', '<none>'),
tag, log['msg'])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment