Commit 1c1ff77e authored by godog's avatar godog

shard logs into per-day ES indices

parent bf69ab0e
......@@ -31,32 +31,63 @@ class LogStore(object):
ElasticSearch for this).
"""
INDEX = 'logs'
INDEX_PREFIX = 'logs'
BULK_SIZE = 400
def __init__(self, server_list, timeout=60):
self.conn = pyes.ES(server_list, timeout=timeout,
bulk_size=self.BULK_SIZE)
self._open_indices = {}
def _open_index(self, index_name):
if index_name in self._open_indices:
return
try:
self.conn.indices.open_index(self.INDEX)
log.info('opened index "%s"', self.INDEX)
self.conn.indices.open_index(index_name)
log.info('opened index "%s"', index_name)
except pyes.exceptions.IndexMissingException:
self._init()
self._init_index(index_name)
self._open_indices[index_name] = True
return
def _index_from_timestamp(self, t):
if not isinstance(t, datetime.datetime):
t = datetime.datetime.utcfromtimestamp(int(t/1000000))
return self.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d')
def _index_from_time_range(self, time_range):
start = to_timestamp(time_range[0] or 0)
end = to_timestamp(time_range[1] or time.time())
res = []
while end >= start:
res.append(self._index_from_timestamp(end))
end -= 86400 * 1000000
return res
def _datetime_from_index(self, index):
try:
d = datetime.datetime.strptime(index, self.INDEX_PREFIX + '-' + '%Y.%m.%d')
return d
except ValueError:
return None
def clear(self):
""""Remove the entire index."""
self.conn.indices.delete_index(self.INDEX)
for index in self._valid_indices():
self.conn.indices.delete_index(index)
def get_status(self):
"""Return the index status."""
status = self.conn.indices.status(indexes=[self.INDEX])
return status['indices'][self.INDEX]
indices = self.open_indices.keys()
status = self.conn.indices.status(indices=indices)
return [status['indices'][x] for x in indices]
def _init(self):
def _init_index(self, index_name):
"""Create the index and set up the schema."""
log.info('creating index "%s"', self.INDEX)
self.conn.indices.create_index_if_missing(self.INDEX)
log.info('creating index "%s"', index_name)
self.conn.indices.create_index_if_missing(index_name)
default_mappings = {
'log': {
'_source': {
......@@ -79,7 +110,7 @@ class LogStore(object):
},
}
template = {
'template': self.INDEX,
'template': index_name,
'mappings': default_mappings,
}
self.conn._send_request(
......@@ -123,7 +154,7 @@ class LogStore(object):
'term_vector': 'with_positions_offsets',
},
}
self.conn.indices.put_mapping('log', {'properties': log_mapping}, self.INDEX)
self.conn.indices.put_mapping('log', {'properties': log_mapping}, index_name)
def insert(self, line_or_lines, parser, filters=None):
"""Insert one or more logs.
......@@ -159,8 +190,10 @@ class LogStore(object):
for doc in to_doc(line_or_lines):
stats['n_records'] += 1
index_name = self._index_from_timestamp(doc['timestamp'])
self._open_index(index_name)
try:
self.conn.index(doc, self.INDEX, 'log', bulk=True)
self.conn.index(doc, index_name, 'log', bulk=True)
except:
stats['n_db_errors'] += 1
self.conn.force_bulk()
......@@ -179,14 +212,30 @@ class LogStore(object):
Args:
timelimit: a timestamp indicating the cutoff absolute time
"""
trange = pyes.utils.ESRange(
'timestamp',
from_value=0,
to_value=to_timestamp(timelimit))
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilteredQuery(pyes.query.MatchAllQuery(), f)
self.conn.deleteByQuery([self.INDEX], ['log'], q)
self.conn.optimize(self.INDEX)
def _expire_indices(timelimit):
"""Delete existing indices up until timelimit."""
for index in self._valid_indices():
index_date = self._datetime_from_index(index)
if index_date is None or index_date >= timelimit:
continue
log.debug("deleting %s", index)
self.conn.indices.delete_index(index)
timediff = datetime.datetime.utcnow() - timelimit
if timediff.days == 0:
trange = pyes.utils.ESRange(
'timestamp',
from_value=0,
to_value=to_timestamp(timelimit))
f = pyes.filters.NumericRangeFilter(trange)
q = pyes.query.FilterQuery(pyes.query.MatchAllQuery(), f)
index = self._index_from_timestamp(timelimit)
# XXX this fails with an ES error
self.conn.delete_by_query(index, ['log'], q)
self.conn.optimize(index)
else:
_expire_indices(timelimit)
def _make_search(self, query_str, time_range, **search_args):
query = pyes.query.StringQuery(query_str)
......@@ -203,11 +252,22 @@ class LogStore(object):
**search_args)
return search
def _valid_indices(self):
for x in self.conn.indices.aliases():
if x.startswith(self.INDEX_PREFIX + '-'):
yield x
def search(self, query_str, time_range, size=100, start=0, facets=None):
log.debug('search: "%s", range=%s, start=%d, facets=%s',
query_str, time_range, start, facets)
search = self._make_search(query_str, time_range,
start=start, size=size)
indices = list(self._valid_indices())
if time_range:
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
if facets:
for f, fsize in facets:
if f == 'timestamp':
......@@ -220,7 +280,7 @@ class LogStore(object):
search.facet.add_term_facet(f, size=fsize)
result = self.conn.search(search, model=lambda x,y: y,
search_type='query_then_fetch', indices=self.INDEX)
search_type='query_then_fetch', indices=indices)
docs = [x['_source'] for x in result]
result_facets = result.facets if facets else {}
return docs, result.total, result_facets, result.took
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment