Commit 14bd1e56 authored by ale's avatar ale
Browse files

Run probes asynchronously

Modify the executor to schedule each probe independently, spreading
them all over the probing interval. This reduces the risk of being
throttled by server-side rate-limiters and allows us to handle a
larger number of probes.
parent 30ea193c
Pipeline #5866 passed with stages
in 2 minutes and 6 seconds
import logging
import os
import queue
import random
import threading
import time
try:
from prometheus_client import Gauge
probe_last_run_ts = Gauge(
'probe_last_run_seconds', 'Timestamp of last probe run')
has_prometheus = True
except ImportError:
has_prometheus = False
def _worker(q, results):
while True:
fn = q.get()
if fn is None:
break
fn(results)
q.task_done()
class Runner(object):
"""Run callables in parallel with limited concurrency.
Can be used as a context manager, to ensure close() is
called.
"""
def __init__(self, results, num_workers=5):
self.queue = queue.Queue()
self.threads = [
threading.Thread(name='Worker %d' % i,
target=_worker,
args=(self.queue, results))
for i in range(num_workers)]
for t in self.threads:
t.start()
def run(self, fn):
self.queue.put(fn)
def close(self):
self.queue.join()
for t in self.threads:
self.queue.put(None)
for t in self.threads:
t.join()
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
log = logging.getLogger('driver')
class _ResultLog(object):
......@@ -128,41 +79,59 @@ class Results(object):
return result
class _PeriodicWorker(threading.Thread):
def __init__(self, fn, results, interval):
super(_PeriodicWorker, self).__init__()
self._fn = fn
self._results = results
self._interval = interval
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def _sleep(self, interval):
log.info('%s: waiting for %g seconds', self._fn, interval)
return self._stop_event.wait(timeout=interval)
def _next_interval(self, ts):
now = time.time()
while ts < now:
ts += self._interval
return ts, ts - now
def run(self):
# Sleep a random amount of time before the first run to offset
# all workers uniformly.
if self._sleep(random.randint(0, self._interval)):
log.info('%s: worker exiting', self._fn)
return
# Run periodically until stopped.
ts = time.time()
while True:
log.info('%s: executing', self._fn)
self._fn(self._results)
ts, interval = self._next_interval(ts)
if self._sleep(interval):
log.info('%s: worker exiting', self._fn)
return
class PeriodicExecutor(object):
def __init__(self, results, probes, period_secs=900):
def __init__(self, results, probes, interval_secs=600):
self.results = results
self.probes = probes
self.period = period_secs
self.stop_event = threading.Event()
self.thread = threading.Thread(
target=self._run_loop)
self.thread.start()
#self.probes = probes
self.threads = [
_PeriodicWorker(probe, results, interval_secs)
for probe in probes]
for t in self.threads:
t.start()
def stop(self):
self.stop_event.set()
self.thread.join()
def _sleep(self, duration):
self.stop_event.wait(timeout=duration)
def _run_loop(self):
# Delay initial probe time a little.
self._sleep(random.random() * 10)
deadline = time.time() + self.period
while not self.stop_event.is_set():
self._run()
now = time.time()
while deadline <= now:
deadline += self.period
self._sleep(deadline - now)
def _run(self):
logging.info('starting new scheduled run...')
with Runner(self.results) as runner:
for probe in self.probes:
runner.run(probe)
logging.info('scheduled run ended')
if has_prometheus:
probe_last_run_ts.set(time.time())
for t in self.threads:
t.stop()
for t in self.threads:
t.join()
......@@ -68,11 +68,11 @@ def wait_for_unique_email(ctx, conn, message_id, timeout=300):
while time.time() < deadline:
msg = _check_for_msg()
if msg:
ctx.log('message found and removed successfully')
ctx.log('message %s found and removed successfully', message_id)
return msg
time.sleep(1)
ctx.log('timeout waiting for message to appear')
ctx.log('timeout waiting for message %s to appear', message_id)
raise Timeout()
......
......@@ -4,6 +4,8 @@ import time
try:
from prometheus_client import Gauge
probe_success = Gauge('probe_success', 'Probe success', ['probe'])
probe_last_run_ts = Gauge(
'probe_last_run_seconds', 'Timestamp of last probe run', ['probe'])
has_prometheus = True
except ImportError:
has_prometheus = False
......@@ -36,6 +38,9 @@ class Probe(object):
ctx = Context(ctx.items())
self.ctx = ctx
def __str__(self):
return self.name
def __call__(self, results):
start_timestamp = time.time()
log_buffer = io.StringIO()
......@@ -58,6 +63,7 @@ class Probe(object):
if has_prometheus:
probe_success.labels(self.name).set(
1 if result['status'] == 'ok' else 0)
probe_last_run_ts.labels(self.name).set(end_timestamp)
results.add(self.name, int(end_timestamp), result)
......
......@@ -8,3 +8,4 @@ def probe_imap(ctx):
ctx['credentials']['username'],
ctx['credentials']['password'])
conn.select('INBOX')
ctx.log('login successful, INBOX opened')
......@@ -11,7 +11,8 @@ class AppTest(unittest.TestCase):
})
self.c = self.app.test_client()
self.app.executor._run()
for p in TEST_PROBES:
p(self.app.results)
def tearDown(self):
self.stop_fn()
......
......@@ -10,9 +10,8 @@ class DriverTest(unittest.TestCase):
pass
def test_run_probes(self):
with Runner(self.results) as runner:
for p in TEST_PROBES:
runner.run(p)
for p in TEST_PROBES:
p(self.results)
results = self.results.get_results()
self.assertEquals(2, len(results))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment