diff --git a/lens2/logstore.py b/lens2/logstore.py index 1bfe620f2885e8ab198608c3bfac95e8c5accfda..e34a539fd3b110b9f8b97a182c43bbe4c7839500 100644 --- a/lens2/logstore.py +++ b/lens2/logstore.py @@ -1,15 +1,13 @@ import datetime import logging import os -import threading import time from lens2 import utils -import pyes -import pyes.utils -from pyes.exceptions import ElasticSearchException +import elasticsearch +import elasticsearch.helpers +from elasticsearch_dsl import Search -import pprint log = logging.getLogger(__name__) @@ -24,6 +22,10 @@ def from_timestamp(t): return datetime.datetime.utcfromtimestamp(t / 1000000.0) +class NoIndicesFound(Exception): + pass + + class LogStore(object): """Interface to the log database. @@ -36,8 +38,7 @@ class LogStore(object): BULK_SIZE = 400 def __init__(self, server_list, timeout=60): - self.conn = pyes.ES(server_list, timeout=timeout, - bulk_size=self.BULK_SIZE) + self.conn = elasticsearch.Elasticsearch(server_list, timeout=timeout) self._open_indices = {} def _open_index(self, index_name): @@ -45,9 +46,9 @@ class LogStore(object): return try: - self.conn.indices.open_index(index_name) - log.info('opened index "%s"', index_name) - except pyes.exceptions.IndexMissingException: + log.info('opening index %r', index_name) + self.conn.indices.open(index_name) + except elasticsearch.NotFoundError: self._init_index(index_name) self._open_indices[index_name] = True @@ -55,8 +56,7 @@ class LogStore(object): @classmethod def _index_from_timestamp(cls, t): - if not isinstance(t, datetime.datetime): - t = datetime.datetime.utcfromtimestamp(int(t/1000000)) + t = from_timestamp(t) return cls.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d') @classmethod @@ -77,20 +77,23 @@ class LogStore(object): return None def clear(self): - """"Remove the entire index.""" + """"Remove all indices.""" for index in self._valid_indices(): - self.conn.indices.delete_index(index) + self.conn.indices.delete(index) def get_status(self): """Return the index status.""" indices = self.open_indices.keys() + # XXX fix, does it work? status = self.conn.indices.status(indices=indices) return [status['indices'][x] for x in indices] def _init_index(self, index_name): """Create the index and set up the schema.""" - log.info('creating index "%s"', index_name) - self.conn.indices.create_index_if_missing(index_name) + log.info('creating index %r', index_name) + self.conn.indices.create(index=index_name, ignore=404) + # XXX Other than where documented, existing type and field mappings cannot be updated. + # https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#updating-field-mappings default_mappings = { 'log': { '_source': { @@ -116,10 +119,7 @@ class LogStore(object): 'template': index_name, 'mappings': default_mappings, } - self.conn._send_request( - 'PUT', - pyes.utils.make_path('_template', 'tpl1'), - template) + self.conn.indices.put_template(name='tpl1', body=template, create=True) log_mapping = { 'timestamp': { 'index': 'not_analyzed', @@ -157,7 +157,7 @@ class LogStore(object): 'term_vector': 'with_positions_offsets', }, } - self.conn.indices.put_mapping('log', {'properties': log_mapping}, index_name) + self.conn.indices.put_mapping(doc_type='log', body={'properties': log_mapping}, index=index_name) def insert(self, line_or_lines, parser, filters=None): """Insert one or more logs. @@ -188,20 +188,24 @@ class LogStore(object): doc['timestamp'] = to_timestamp(doc['timestamp']) yield doc except Exception, e: - log.exception('parse error: %s', str(e)) + log.exception('parse error: %r', e) stats['n_parse_errors'] += 1 - for doc in to_doc(line_or_lines): - stats['n_records'] += 1 - index_name = self._index_from_timestamp(doc['timestamp']) - self._open_index(index_name) - try: - self.conn.index(doc, index_name, 'log', bulk=True) - except Exception, e: - if not isinstance(e, ElasticSearchException): - raise e - stats['n_db_errors'] += 1 - self.conn.force_bulk() + def to_bulk(stream): + for doc in to_doc(stream): + doc['_index'] = self._index_from_timestamp(doc['timestamp']) + doc['_type'] = 'log' + # XXX pass in also _id ? + yield doc + + success, errors = elasticsearch.helpers.bulk(self.conn, + to_bulk(line_or_lines), + chunk_size=self.BULK_SIZE, + raise_on_error=False, + raise_on_exception=False) + stats['n_db_errors'] += len(errors) + stats['n_records'] += success + elapsed = time.time() - start log.info('inserted %d docs (errors: db=%d, parse=%d) in %g seconds, ' '%.5g docs/s', @@ -223,11 +227,11 @@ class LogStore(object): index_date = self._datetime_from_index(index) if index_date is None or index_date >= timelimit: continue - - log.debug("deleting %s", index) - self.conn.indices.delete_index(index) + log.info('deleting %r', index) + self.conn.indices.delete(index) timediff = datetime.datetime.utcnow() - timelimit + # XXX fix for es 2.0 if timediff.days == 0: trange = pyes.utils.ESRange( 'timestamp', @@ -236,47 +240,50 @@ class LogStore(object): f = pyes.filters.NumericRangeFilter(trange) q = pyes.query.FilterQuery(pyes.query.MatchAllQuery(), f) index = self._index_from_timestamp(timelimit) - # XXX this fails with an ES error self.conn.delete_by_query(index, ['log'], q) self.conn.optimize(index) else: _expire_indices(timelimit) - def _make_search(self, query_str, time_range, **search_args): - query = pyes.query.QueryStringQuery(query_str) + def _make_search(self, query_str, time_range): + indices = list(self._valid_indices()) if time_range: - trange = pyes.utils.ESRange( - 'timestamp', - from_value=to_timestamp(time_range[0] or 0), - to_value=to_timestamp(time_range[1] or time.time())) - f = pyes.filters.NumericRangeFilter(trange) - query = pyes.query.FilteredQuery(query, f) + wanted_indices = self._index_from_time_range(time_range) + indices = set(wanted_indices) & set(indices) - search = pyes.query.Search(query, track_scores=False, - sort={'timestamp': 'desc'}, - **search_args) - return search + if not indices: + e = NoIndicesFound('no indices for search %r range=%s' % + (query_str, time_range)) + log.exception(e) + raise e + + s = Search(using=self.conn, index=','.join(indices)) + s = s.params(search_type='query_then_fetch') + s = s.query("query_string", query=query_str) + if time_range: + s = s.filter('range', + **{'timestamp': { + 'gte': to_timestamp(time_range[0] or 0), + 'lte': to_timestamp(time_range[1] or time.time()), + }}) + return s.sort('-timestamp') def _valid_indices(self): - for x in self.conn.indices.aliases(): + # XXX check return value + for x in self.conn.indices.get_aliases(): if x.startswith(self.INDEX_PREFIX + '-'): yield x + def search(self, query_str, time_range, size=100, start=0, facets=None): log.debug('search: "%s", range=%s, start=%d, facets=%s', query_str, time_range, start, facets) - search = self._make_search(query_str, time_range, - start=start, size=size) - - indices = list(self._valid_indices()) - if time_range: - wanted_indices = self._index_from_time_range(time_range) - indices = set(wanted_indices) & set(indices) - - if not indices: - log.warn('no indices found for search: "%s" range=%s' % (query_str, time_range)) - return + search = self._make_search(query_str, time_range)[start:size] + if not search: + return [], 0, {}, 0 + # XXX reenable + facets = None if facets: for f, fsize in facets: if f == 'timestamp': @@ -288,16 +295,16 @@ class LogStore(object): else: search.facet.add_term_facet(f, size=fsize) - result = self.conn.search(search, model=lambda x,y: y, - search_type='query_then_fetch', indices=indices) - docs = [x['_source'] for x in result] + result = search.execute() + docs = [x.to_dict() for x in result] result_facets = result.facets if facets else {} - return docs, result.total, result_facets, result.took + return docs, result.hits.total, result_facets, result.took def scan(self, query_str, time_range, size=100): - log.info('scan: "%s"', query_str) - search = self._make_search(query_str, time_range, size=size) - results = self.conn.scan(search) - for batch in results: - for doc in batch['hits']['hits']: - yield doc['_source'] + log.info('scan: %r', query_str) + search = self._make_search(query_str, time_range)[:size] + if not search: + yield None + else: + for doc in search.scan(): + yield doc diff --git a/lens2/main.py b/lens2/main.py index 341bf83096f0dd875f1c72702e730f854824e305..5f2904485f2ba30474d6d2381f276c903a38b334 100644 --- a/lens2/main.py +++ b/lens2/main.py @@ -121,7 +121,12 @@ Known commands: if len(args) < 1: parser.error('Please specify a command') - logging.basicConfig(level=logging.DEBUG) + # XXX turn into a debug option + logging.basicConfig(level=logging.INFO) + logging.getLogger("urllib3").setLevel(logging.ERROR) + logging.getLogger("elasticsearch").setLevel(logging.ERROR) + logging.getLogger("elasticsearch.trace").setLevel(logging.ERROR) + lens = logstore.LogStore(opts.es_url, timeout=3600) cmd, args = args[0], args[1:] diff --git a/lens2/utils.py b/lens2/utils.py index 7d3c37a772d63741929b7b4c01d64c42379b1173..396bb37a17bbc298213e1228c342c3e1c8ae920d 100644 --- a/lens2/utils.py +++ b/lens2/utils.py @@ -77,8 +77,9 @@ def batch(iterator, n=100): def format_log(log): """Print 'log' in syslog-compatible format.""" tag = log['program'] - if log.get('pid'): + if 'pid' in log: tag = '%s[%s]' % (tag, log['pid']) return '%s %s %s.%s %s: %s\n' % ( log['timestamp'].strftime('%Y-%m-%dT%H:%M:%S'), log['host'], - log.get('facility', '<none>'), log.get('severity', '<none>'), tag, log['msg']) + log.get('facility', '<none>'), log.get('severity', '<none>'), + tag, log['msg']) diff --git a/requirements.txt b/requirements.txt index 1e054c9cc7566522426aad5721f5109ca28335af..f3b30486c22c7c376d0e177d64d61fedb3277fe5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ elasticsearch>=2.0.0,<3.0.0 +elasticsearch-dsl>=2.0.0,<3.0.0 Flask