Commit 7fb2037b authored by godog's avatar godog

port to latest pyes 0.90 API

parent ecd2f4a7
import datetime import datetime
import logging import logging
import os import os
import pyes
import threading import threading
import time import time
from lens2 import utils from lens2 import utils
import pyes
import pyes.utils
import pprint import pprint
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -34,28 +36,27 @@ class LogStore(object): ...@@ -34,28 +36,27 @@ class LogStore(object):
def __init__(self, server_list, timeout=60): def __init__(self, server_list, timeout=60):
self.conn = pyes.ES(server_list, timeout=timeout, self.conn = pyes.ES(server_list, timeout=timeout,
autorefresh=True,
bulk_size=self.BULK_SIZE) bulk_size=self.BULK_SIZE)
try: try:
self.conn.open_index(self.INDEX) self.conn.indices.open_index(self.INDEX)
log.info('opened index "%s"', self.INDEX) log.info('opened index "%s"', self.INDEX)
except pyes.exceptions.IndexMissingException: except pyes.exceptions.IndexMissingException:
self._init() self._init()
def clear(self): def clear(self):
""""Remove the entire index.""" """"Remove the entire index."""
self.conn.delete_index(self.INDEX) self.conn.indices.delete_index(self.INDEX)
def get_status(self): def get_status(self):
"""Return the index status.""" """Return the index status."""
status = self.conn.status(indexes=[self.INDEX]) status = self.conn.indices.status(indexes=[self.INDEX])
return status['indices'][self.INDEX] return status['indices'][self.INDEX]
def _init(self): def _init(self):
"""Create the index and set up the schema.""" """Create the index and set up the schema."""
log.info('creating index "%s"', self.INDEX) log.info('creating index "%s"', self.INDEX)
self.conn.create_index_if_missing(self.INDEX) self.conn.indices.create_index_if_missing(self.INDEX)
default_mappings = { default_mappings = {
'log': { 'log': {
'_source': { '_source': {
...@@ -83,7 +84,7 @@ class LogStore(object): ...@@ -83,7 +84,7 @@ class LogStore(object):
} }
self.conn._send_request( self.conn._send_request(
'PUT', 'PUT',
self.conn._make_path(['_template', 'tpl1']), pyes.utils.make_path('_template', 'tpl1'),
template) template)
log_mapping = { log_mapping = {
'timestamp': { 'timestamp': {
...@@ -122,7 +123,7 @@ class LogStore(object): ...@@ -122,7 +123,7 @@ class LogStore(object):
'term_vector': 'with_positions_offsets', 'term_vector': 'with_positions_offsets',
}, },
} }
self.conn.put_mapping('log', {'properties': log_mapping}, self.INDEX) self.conn.indices.put_mapping('log', {'properties': log_mapping}, self.INDEX)
def insert(self, line_or_lines, parser, filters=None): def insert(self, line_or_lines, parser, filters=None):
"""Insert one or more logs. """Insert one or more logs.
...@@ -218,12 +219,11 @@ class LogStore(object): ...@@ -218,12 +219,11 @@ class LogStore(object):
else: else:
search.facet.add_term_facet(f, size=fsize) search.facet.add_term_facet(f, size=fsize)
response = self.conn.search(search, result = self.conn.search(search, model=lambda x,y: y,
search_type='query_then_fetch') search_type='query_then_fetch', indices=self.INDEX)
total = response['hits']['total'] docs = [x['_source'] for x in result]
docs = [x['_source'] for x in response['hits']['hits']] result_facets = result.facets if facets else {}
result_facets = response.get('facets') if facets else {} return docs, result.total, result_facets, result.took
return docs, total, result_facets, response['took']
def scan(self, query_str, time_range, size=100): def scan(self, query_str, time_range, size=100):
log.info('scan: "%s"', query_str) log.info('scan: "%s"', query_str)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment