import datetime
import logging
import os
import threading
import time
from lens2 import utils
import pyes
import pyes.utils
from pyes.exceptions import ElasticSearchException
import elasticsearch
import elasticsearch.helpers
from elasticsearch_dsl import Search
import pprint
log = logging.getLogger(__name__)
......@@ -24,6 +22,10 @@ def from_timestamp(t):
return datetime.datetime.utcfromtimestamp(t / 1000000.0)
class NoIndicesFound(Exception):
class LogStore(object):
"""Interface to the log database.
......@@ -36,8 +38,7 @@ class LogStore(object):
def __init__(self, server_list, timeout=60):
self.conn = pyes.ES(server_list, timeout=timeout,
self.conn = elasticsearch.Elasticsearch(server_list, timeout=timeout)
self._open_indices = {}
def _open_index(self, index_name):
......@@ -45,9 +46,9 @@ class LogStore(object):
self.conn.indices.open_index(index_name)'opened index "%s"', index_name)
except pyes.exceptions.IndexMissingException:'opening index %r', index_name)
except elasticsearch.NotFoundError:
self._open_indices[index_name] = True
......@@ -55,8 +56,7 @@ class LogStore(object):
def _index_from_timestamp(cls, t):
if not isinstance(t, datetime.datetime):
t = datetime.datetime.utcfromtimestamp(int(t/1000000))
t = from_timestamp(t)
return cls.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d')
......@@ -77,20 +77,23 @@ class LogStore(object):
return None
def clear(self):
""""Remove the entire index."""
""""Remove all indices."""
for index in self._valid_indices():
def get_status(self):
"""Return the index status."""
indices = self.open_indices.keys()
# XXX fix, does it work?
status = self.conn.indices.status(indices=indices)
return [status['indices'][x] for x in indices]
def _init_index(self, index_name):
"""Create the index and set up the schema."""'creating index "%s"', index_name)
self.conn.indices.create_index_if_missing(index_name)'creating index %r', index_name)
self.conn.indices.create(index=index_name, ignore=404)
# XXX Other than where documented, existing type and field mappings cannot be updated.
default_mappings = {
'log': {
'_source': {
......@@ -116,10 +119,7 @@ class LogStore(object):
'template': index_name,
'mappings': default_mappings,
pyes.utils.make_path('_template', 'tpl1'),
self.conn.indices.put_template(name='tpl1', body=template, create=True)
log_mapping = {
'timestamp': {
'index': 'not_analyzed',
......@@ -157,7 +157,7 @@ class LogStore(object):
'term_vector': 'with_positions_offsets',
self.conn.indices.put_mapping('log', {'properties': log_mapping}, index_name)
self.conn.indices.put_mapping(doc_type='log', body={'properties': log_mapping}, index=index_name)
def insert(self, line_or_lines, parser, filters=None):
"""Insert one or more logs.
......@@ -188,20 +188,24 @@ class LogStore(object):
doc['timestamp'] = to_timestamp(doc['timestamp'])
yield doc
except Exception, e:
log.exception('parse error: %s', str(e))
log.exception('parse error: %r', e)
stats['n_parse_errors'] += 1
for doc in to_doc(line_or_lines):
stats['n_records'] += 1
index_name = self._index_from_timestamp(doc['timestamp'])
self.conn.index(doc, index_name, 'log', bulk=True)
except Exception, e:
if not isinstance(e, ElasticSearchException):
raise e
stats['n_db_errors'] += 1
def to_bulk(stream):
for doc in to_doc(stream):
doc['_index'] = self._index_from_timestamp(doc['timestamp'])
doc['_type'] = 'log'
# XXX pass in also _id ?
yield doc
success, errors = elasticsearch.helpers.bulk(self.conn,
stats['n_db_errors'] += len(errors)
stats['n_records'] += success
elapsed = time.time() - start'inserted %d docs (errors: db=%d, parse=%d) in %g seconds, '
'%.5g docs/s',
......@@ -223,11 +227,11 @@ class LogStore(object):
index_date = self._datetime_from_index(index)
if index_date is None or index_date >= timelimit:
log.debug("deleting %s", index)
self.conn.indices.delete_index(index)'deleting %r', index)
timediff = datetime.datetime.utcnow() - timelimit
# XXX fix for es 2.0
if timediff.days == 0:
trange = pyes.utils.ESRange(
......@@ -236,47 +240,50 @@ class LogStore(object):
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilterQuery(pyes.query.MatchAllQuery(), f)
index = self._index_from_timestamp(timelimit)
# XXX this fails with an ES error
self.conn.delete_by_query(index, ['log'], q)
def _make_search(self, query_str, time_range, **search_args):
query = pyes.query.QueryStringQuery(query_str)
def _make_search(self, query_str, time_range):
indices = list(self._valid_indices())
if time_range:
trange = pyes.utils.ESRange(
from_value=to_timestamp(time_range[0] or 0),
to_value=to_timestamp(time_range[1] or time.time()))
f = pyes.filters.NumericRangeFilter(trange)
query = pyes.query.FilteredQuery(query, f)
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
search = pyes.query.Search(query, track_scores=False,
sort={'timestamp': 'desc'},
return search
if not indices:
e = NoIndicesFound('no indices for search %r range=%s' %
(query_str, time_range))
raise e
s = Search(using=self.conn, index=','.join(indices))
s = s.params(search_type='query_then_fetch')
s = s.query("query_string", query=query_str)
if time_range:
s = s.filter('range',
**{'timestamp': {
'gte': to_timestamp(time_range[0] or 0),
'lte': to_timestamp(time_range[1] or time.time()),
return s.sort('-timestamp')
def _valid_indices(self):
for x in self.conn.indices.aliases():
# XXX check return value
for x in self.conn.indices.get_aliases():
if x.startswith(self.INDEX_PREFIX + '-'):
yield x
def search(self, query_str, time_range, size=100, start=0, facets=None):
log.debug('search: "%s", range=%s, start=%d, facets=%s',
query_str, time_range, start, facets)
search = self._make_search(query_str, time_range,
start=start, size=size)
indices = list(self._valid_indices())
if time_range:
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
if not indices:
log.warn('no indices found for search: "%s" range=%s' % (query_str, time_range))
search = self._make_search(query_str, time_range)[start:size]
if not search:
return [], 0, {}, 0
# XXX reenable
facets = None
if facets:
for f, fsize in facets:
if f == 'timestamp':
......@@ -288,16 +295,16 @@ class LogStore(object):
search.facet.add_term_facet(f, size=fsize)
result =, model=lambda x,y: y,
search_type='query_then_fetch', indices=indices)
docs = [x['_source'] for x in result]
result = search.execute()
docs = [x.to_dict() for x in result]
result_facets = result.facets if facets else {}
return docs,, result_facets, result.took
return docs,, result_facets, result.took
def scan(self, query_str, time_range, size=100):'scan: "%s"', query_str)
search = self._make_search(query_str, time_range, size=size)
results = self.conn.scan(search)
for batch in results:
for doc in batch['hits']['hits']:
yield doc['_source']'scan: %r', query_str)
search = self._make_search(query_str, time_range)[:size]
if not search:
yield None
for doc in search.scan():
yield doc
......@@ -121,7 +121,12 @@ Known commands:
if len(args) < 1:
parser.error('Please specify a command')
# XXX turn into a debug option
lens = logstore.LogStore(opts.es_url, timeout=3600)
cmd, args = args[0], args[1:]
......@@ -77,8 +77,9 @@ def batch(iterator, n=100):
def format_log(log):
"""Print 'log' in syslog-compatible format."""
tag = log['program']
if log.get('pid'):
if 'pid' in log:
tag = '%s[%s]' % (tag, log['pid'])
return '%s %s %s.%s %s: %s\n' % (
log['timestamp'].strftime('%Y-%m-%dT%H:%M:%S'), log['host'],
log.get('facility', '<none>'), log.get('severity', '<none>'), tag, log['msg'])
log.get('facility', '<none>'), log.get('severity', '<none>'),
tag, log['msg'])
