Commit 25a5bba5 authored by godog's avatar godog

tests and fixes for logstore

parent 1c1ff77e
......@@ -7,6 +7,7 @@ from lens2 import utils
import pyes
import pyes.utils
from pyes.exceptions import ElasticSearchException
import pprint
......@@ -52,17 +53,19 @@ class LogStore(object):
self._open_indices[index_name] = True
return
def _index_from_timestamp(self, t):
@classmethod
def _index_from_timestamp(cls, t):
if not isinstance(t, datetime.datetime):
t = datetime.datetime.utcfromtimestamp(int(t/1000000))
return self.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d')
return cls.INDEX_PREFIX + '-' + t.strftime('%Y.%m.%d')
def _index_from_time_range(self, time_range):
@classmethod
def _index_from_time_range(cls, time_range):
start = to_timestamp(time_range[0] or 0)
end = to_timestamp(time_range[1] or time.time())
res = []
while end >= start:
res.append(self._index_from_timestamp(end))
res.append(cls._index_from_timestamp(end))
end -= 86400 * 1000000
return res
......@@ -194,7 +197,9 @@ class LogStore(object):
self._open_index(index_name)
try:
self.conn.index(doc, index_name, 'log', bulk=True)
except:
except Exception, e:
if not isinstance(e, ElasticSearchException):
raise e
stats['n_db_errors'] += 1
self.conn.force_bulk()
elapsed = time.time() - start
......@@ -268,6 +273,10 @@ class LogStore(object):
wanted_indices = self._index_from_time_range(time_range)
indices = set(wanted_indices) & set(indices)
if not indices:
log.warn('no indices found for search: "%s" range=%s' % (query_str, time_range))
return
if facets:
for f, fsize in facets:
if f == 'timestamp':
......
import os
import unittest
import logging
import datetime
import mox
import pyes
import pyes.managers
from pyes.exceptions import IndexMissingException
from lens2 import logstore, syslog_parser, utils
class FakeResultSet(object):
def __init__(self, count=5):
self._count = count
self.total = count
self.took = 0.1
self._result = []
for x in range(count):
self._result.append({'_source': 'fake %d' % x})
def __iter__(self):
return iter(self._result)
class LogStoreTest(unittest.TestCase):
def _index_name(self, when=None):
if when is None:
when = datetime.datetime.utcnow()
return when.strftime('logs-%Y.%m.%d')
def _gen_indices(self, days):
"""Generate index names for the last n days."""
now = datetime.datetime.utcnow()
for i in range(days):
yield self._index_name(now - datetime.timedelta(days=i))
def test_expire_indices(self):
"""Test expire 5d logs having 10d logs."""
days_available = 10
days_delete = 5
indices_available = list(self._gen_indices(days_available))
indices_delete = list(self._gen_indices(days_delete))
fake_indices = self.mox.CreateMock(pyes.managers.Indices)
fake_indices.aliases().AndReturn(dict([(x, True) for x in indices_available]))
for x in set(indices_available) - set(indices_delete):
fake_indices.delete_index(x).InAnyOrder()
self.mox.StubOutClassWithMocks(pyes, 'ES')
es = pyes.ES('server', timeout=5, bulk_size=logstore.LogStore.BULK_SIZE)
es.indices = fake_indices
self.mox.ReplayAll()
parser = syslog_parser.get_parser('dumb')
ls = logstore.LogStore('server', timeout=5)
since = utils.parse_timespec('%dd' % days_delete)
ls.expire(since)
self.mox.VerifyAll()
def test_search_indices(self):
"""Test searching 3d ago having 10d logs."""
days_available = 10
days_lookback = 3
indices_available = list(self._gen_indices(days_available))
indices_lookback = list(self._gen_indices(days_lookback+1))
fake_indices = self.mox.CreateMock(pyes.managers.Indices)
fake_indices.aliases().AndReturn(dict([(x, True) for x in indices_available]))
self.mox.StubOutClassWithMocks(pyes, 'ES')
es = pyes.ES('server', timeout=5, bulk_size=logstore.LogStore.BULK_SIZE)
self.mox.StubOutWithMock(es, 'search')
es.indices = fake_indices
es.search(mox.IgnoreArg(),
indices=mox.SameElementsAs(indices_lookback),
model=mox.IgnoreArg(),
search_type=mox.IgnoreArg()
).AndReturn(FakeResultSet())
self.mox.ReplayAll()
parser = syslog_parser.get_parser('dumb')
ls = logstore.LogStore('server', timeout=5)
time_range = utils.parse_time_range('%dd' % days_lookback)
ls.search('query', time_range, 0, 400)
self.mox.VerifyAll()
def test_create_index(self):
"""Test creation of a non-existing index."""
fake_indices = self.mox.CreateMock(pyes.managers.Indices)
fake_indices.open_index('logs-2014.02.23').AndRaise(IndexMissingException('foo'))
fake_indices.create_index_if_missing('logs-2014.02.23')
fake_indices.put_mapping('log', mox.IsA(dict), 'logs-2014.02.23')
self.mox.StubOutClassWithMocks(pyes, 'ES')
es = pyes.ES('server', timeout=5, bulk_size=logstore.LogStore.BULK_SIZE)
self.mox.StubOutWithMock(es, 'force_bulk')
self.mox.StubOutWithMock(es, 'index')
self.mox.StubOutWithMock(es, '_send_request')
es.indices = fake_indices
es._send_request('PUT', '/_template/tpl1', mox.IsA(dict))
es.index(mox.IsA(dict), 'logs-2014.02.23', 'log', bulk=True)
es.force_bulk()
self.mox.ReplayAll()
parser = syslog_parser.get_parser('dumb')
ls = logstore.LogStore('server', timeout=5)
ls.insert("Feb 23 18:49:50 void prog[666]: message", parser)
self.mox.VerifyAll()
def test_open_index(self):
"""Test open of an existing index."""
fake_indices = self.mox.CreateMock(pyes.managers.Indices)
fake_indices.open_index('logs-2014.02.23')
self.mox.StubOutClassWithMocks(pyes, 'ES')
es = pyes.ES('server', timeout=5, bulk_size=logstore.LogStore.BULK_SIZE)
self.mox.StubOutWithMock(es, 'force_bulk')
self.mox.StubOutWithMock(es, 'index')
es.indices = fake_indices
es.index(mox.IsA(dict), 'logs-2014.02.23', 'log', bulk=True)
es.force_bulk()
self.mox.ReplayAll()
parser = syslog_parser.get_parser('dumb')
ls = logstore.LogStore('server', timeout=5)
ls.insert("Feb 23 18:49:50 void prog[6666]: message", parser)
self.mox.VerifyAll()
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.UnsetStubs()
if __name__ == '__main__':
#logging.basicConfig(level=logging.DEBUG)
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment