Commit e97bef13 authored by ale's avatar ale

support scrolls

parent fe1d47b4
......@@ -11,13 +11,23 @@ import pprint
log = logging.getLogger(__name__)
def _to_timestamp(t):
def to_timestamp(t):
if isinstance(t, datetime.datetime):
t = time.mktime(t.utctimetuple())
return int(t * 1000000)
def from_timestamp(t):
return datetime.datetime.utcfromtimestamp(t / 1000000.0)
class LogStore(object):
"""Interface to the log database.
The LogStore provides facilities to add and search/scan logs in
the underlying database (the current implementation uses
ElasticSearch for this).
"""
INDEX = 'logs'
BULK_SIZE = 400
......@@ -26,18 +36,18 @@ class LogStore(object):
self.conn = pyes.ES(server_list, timeout=10,
autorefresh=True,
bulk_size=self.BULK_SIZE)
self.commit_start = threading.Event()
try:
self.conn.open_index(self.INDEX)
log.info('opened index "%s"', self.INDEX)
except pyes.exceptions.IndexMissingException:
self.init()
self._init()
def clear(self):
""""Remove the entire index."""
self.conn.delete_index(self.INDEX)
def init(self):
def _init(self):
"""Create the index and set up the schema."""
log.info('creating index "%s"', self.INDEX)
self.conn.create_index_if_missing(self.INDEX)
......@@ -105,11 +115,22 @@ class LogStore(object):
self.conn.put_mapping('log', {'properties': log_mapping}, self.INDEX)
def insert(self, line_or_lines, parser, filters=None):
"""Insert one or more logs.
Args:
line_or_lines: a string, or an iterable generating log lines
parser: parser function, must return a dictionary when called
with an input line
filters: a list of filter functions, used to manipulate the
parsed dictionaries
Returns:
A (total_records, num_errors) tuple of integers.
"""
if isinstance(line_or_lines, basestring):
line_or_lines = [line_or_lines]
start = time.time()
self.commit_start.set()
start = time.time()
stats = {'n_records': 0, 'n_db_errors': 0, 'n_parse_errors': 0}
def to_doc(stream):
......@@ -119,7 +140,7 @@ class LogStore(object):
if filters:
for f in filters:
doc = f(doc)
doc['timestamp'] = _to_timestamp(doc['timestamp'])
doc['timestamp'] = to_timestamp(doc['timestamp'])
yield doc
except Exception, e:
log.exception('parse error: %s', str(e))
......@@ -142,41 +163,50 @@ class LogStore(object):
stats['n_db_errors'] + stats['n_parse_errors'])
def expire(self, timelimit):
"""Remove all logs older than a certain time.
Args:
timelimit: a timestamp indicating the cutoff absolute time
"""
trange = pyes.utils.ESRange(
'timestamp',
from_value=0,
to_value=_to_timestamp(timelimit))
to_value=to_timestamp(timelimit))
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilteredQuery(pyes.query.MatchAllQuery(), f)
self.conn.deleteByQuery(q)
self.conn.optimize(self.INDEX)
def _make_search(self, query_str, time_range):
def _make_search(self, query_str, time_range, **search_args):
query = pyes.query.StringQuery(query_str)
if time_range:
trange = pyes.utils.ESRange(
'timestamp',
from_value=_to_timestamp(time_range[0] or 0),
to_value=_to_timestamp(time_range[1] or time.time()))
from_value=to_timestamp(time_range[0] or 0),
to_value=to_timestamp(time_range[1] or time.time()))
f = pyes.filters.NumericRangeFilter(trange)
query = pyes.query.FilteredQuery(query, f)
search = pyes.query.Search(query, track_scores=False, size=100,
sort={'timestamp': 'asc'})
search = pyes.query.Search(query, track_scores=False,
sort={'timestamp': 'desc'},
**search_args)
return search
def search(self, query_str, time_range):
def search(self, query_str, time_range, size=100, scroll_id=None):
log.info('search: "%s"', query_str)
search = self._make_search(query_str, time_range)
search_args = {'size': size,
'scroll': '30m'}
if scroll_id:
search_args['scroll_id'] = scroll_id
search = self._make_search(query_str, time_range, **search_args)
results = self.conn.search(search)
return [x['_source'] for x in results['hits']['hits']]
scroll_id = results['scroll_id']
return [x['_source'] for x in results['hits']['hits']], scroll_id
def scan(self, query_str, time_range):
def scan(self, query_str, time_range, size=100):
log.info('scan: "%s"', query_str)
search = self._make_search(query_str, time_range)
search = self._make_search(query_str, time_range, size=size)
results = self.conn.scan(search)
for batch in results:
for doc in batch['hits']['hits']:
yield doc['_source']
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment