Commit a4dae1a9 authored by ale's avatar ale
Browse files

Fix get_errors() return type, and add tests

parent 0a605728
Pipeline #5806 passed with stages
in 2 minutes and 7 seconds
......@@ -20,7 +20,10 @@ app = Flask(__name__)
def create_app(config={}):
app.config.update(config)
probes = load_config(app.config['PROBES_CONFIG_DIR'])
if 'PROBES' in app.config:
probes = app.config['PROBES']
else:
probes = load_config(app.config['PROBES_CONFIG_DIR'])
probe_interval = app.config.get('PROBE_INTERVAL_SECS', 900)
app.results = Results(
......
......@@ -23,7 +23,11 @@ def _worker(q, results):
class Runner(object):
"""Run callables in parallel with limited concurrency."""
"""Run callables in parallel with limited concurrency.
Can be used as a context manager, to ensure close() is
called.
"""
def __init__(self, results, num_workers=5):
self.queue = queue.Queue()
......@@ -45,6 +49,12 @@ class Runner(object):
for t in self.threads:
t.join()
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
class _ResultLog(object):
"""Keep around a limited number of probe results."""
......@@ -67,7 +77,8 @@ class _ResultLog(object):
return self.results[(name, timestamp)]
def get_all(self):
return self.log[::-1]
return [(x[0], x[1], self.results[x])
for x in self.log[::-1]]
class Results(object):
......@@ -149,10 +160,9 @@ class PeriodicExecutor(object):
def _run(self):
logging.info('starting new scheduled run...')
runner = Runner(self.results)
for probe in self.probes:
runner.run(probe)
runner.close()
with Runner(self.results) as runner:
for probe in self.probes:
runner.run(probe)
logging.info('scheduled run ended')
if has_prometheus:
probe_last_run_ts.set(time.time())
import unittest
from ai_diagnostics.driver import *
from ai_diagnostics.probe import *
def probe_ok(ctx):
ctx.log('success')
def probe_err(ctx):
raise Exception('failure')
TEST_PROBES = [
Probe(probe_ok, 'ok', {}),
Probe(probe_err, 'err', {}),
]
from ai_diagnostics.test import *
from ai_diagnostics.app import create_app
class AppTest(unittest.TestCase):
def setUp(self):
self.app, self.stop_fn = create_app({
'DEBUG': True,
'PROBES': TEST_PROBES,
})
self.c = self.app.test_client()
self.app.executor._run()
def tearDown(self):
self.stop_fn()
def test_index(self):
resp = self.c.get('/')
self.assertEquals(200, resp.status_code)
self.assertTrue(b'ok' in resp.data)
self.assertTrue(b'err' in resp.data)
from ai_diagnostics.test import *
class DriverTest(unittest.TestCase):
def setUp(self):
self.results = Results()
def tearDown(self):
pass
def test_run_probes(self):
with Runner(self.results) as runner:
for p in TEST_PROBES:
runner.run(p)
results = self.results.get_results()
self.assertEquals(2, len(results))
errs = self.results.get_errors()
self.assertEquals(1, len(errs))
err_name, err_ts, err_res = errs[0]
self.assertEquals('err', err_name)
self.assertEquals('error', err_res['status'])
[tox]
envlist = py3
[testenv]
deps=
nose
commands=
nosetests -vv []
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment