Commit 59f465c3 authored by ale's avatar ale
Browse files

Initial commit

Pipeline #5243 passed with stages
in 2 minutes and 1 second
image: docker:latest
- docker_build
- release
- docker:dind
stage: docker_build
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN
- docker build --build-arg ci_token=$CI_JOB_TOKEN --pull -t $IMAGE_TAG .
- docker push $IMAGE_TAG
stage: release
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN
- docker pull $IMAGE_TAG
- docker tag $IMAGE_TAG $RELEASE_TAG
- docker push $RELEASE_TAG
- master
FROM debian:buster
RUN apt-get -q update && \
env DEBIAN_FRONTEND=noninteractive apt-get -qy install --no-install-recommends \
python3 python3-pip python3-setuptools python3-wheel
ADD . /tmp/src
WORKDIR /tmp/src
RUN python3 install
ENTRYPOINT ["/usr/bin/ai-diagnostics-server"]
Application-level probes
This package contains a series of probes for application-level
functionality of the A/I services (email, web, etc), and a driver to
run them periodically and export their results to our monitoring
The configuration for the probes takes the form of Python code, stored
in files in the *--config* directory (/etc/ai/diagnostics by default),
with a `.py` extension. These Python modules should define a top-level
`PROBES` variable containing the desired probes.
Probes should be instances of Probe objects, that compose the
individual probe functions defined in the [probes/](./probes/)
directory with different sets of parameters, stored in the *probe
context*. This is facilitated by the *parameterize* function, that
creates list of contexts by parameterizing a single attribute, for
test_credentials= [
Credentials(username='user1', password='pass1'),
Credentials(username='user2', password='pass2')]
PROBES = parameterize(
'url': ''
The above snippet will create two instances of the *probe_webmail*
probe, each pointing at the same URL but with performing the login
with different test users.
Probes are named by concatenating the base name (here "webmail") with
the values passed to parameterize(), so in this case we would have
The Credentials object is just a subclass of *dict* that, when
stringified, only returns the username, so we can avoid having the
password in the probe names above.
The application runs a simple HTTP server with a */metrics* endpoint
for Prometheus scraping.
This module implements a set of requests TransportAdapter, PoolManager,
ConnectionPool and HTTPSConnection with one goal only:
* to use a specific IP address when connecting via SSL to a web service without
running into SNI trouble.
The usual technique to force an IP address on an HTTP connection with Requests
is (assuming I want on IP
requests.get("", headers={'Host': ''})
this is useful if I want to specifically test how is responding; for
instance, if is DNS round-robined to several IP
addresses and I want to hit one of them specifically.
This also works for https requests if using Python <2.7.9 because older
versions don't do SNI and thus don't pass the requested hostname as part of the
SSL handshake.
However, Python >=2.7.9 and >=3.4.x conveniently added SNI support, breaking
this way of connecting to the IP, because the IP address embedded in the URL
*is* passed as part of the SSL handshake, causing errors (mainly, the server
returns a 400 Bad Request because the SNI host doesn't match the one in
the HTTP headers
The "easiest" way to achieve this is to force the IP address at the lowest
possible level, namely when we do socket.create_connection. The rest of the
"stack" is given the actual hostname. So the sequence is:
1- Open a socket to
2- SSL wrap this socket using the hostname.
3- Do the rest of the HTTPS traffic, headers and all over this socket.
Unfortunately Requests hides the socket.create_connection call in the deep
recesses of urllib3, so the specified chain of classes is needed to propagate
the given dest_ip value all the way down the stack.
Because this applies to a very limited set of circumstances, the overridden
code is very simplistic and eschews many of the nice checks Requests does for
- It ONLY handles HTTPS.
- It does NO certificate verification (which would be pointless)
- Only tested with Requests 2.2.1 and 2.9.1.
- Does NOT work with the ancient urllib3 (1.7.1) shipped with Ubuntu 14.04.
Should not be an issue because Ubunt 14.04 has older Python which doesn't do
How to use it
It's like any other transport adapter. Just pass the IP address that
connections to the given URL prefix should use.
session = requests.Session()
session.mount("", ForcedIPHTTPSAdapter(dest_ip=''))
response = session.get(
'/some/path', headers={'Host': ''}, verify=False)
# Note this module will ImportError if there's no sane requests/urllib
# combination available so the adapter won't work, and it's up to the caller to
# decide what to do. The caller can, for instance, check the Python version and
# if it's <2.7.9 decide to use the old "http://$IP/ technique. If Python is
# >2.7.9 and the adapter doesn't work, unfortunately, there's nothing that can
# be done :(
from distutils.version import StrictVersion
from socket import error as SocketError, timeout as SocketTimeout
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import (
PoolManager, HTTPSConnectionPool,
from requests.packages.urllib3.exceptions import ConnectTimeoutError
# For requests 2.9.x
from requests.packages.urllib3.util import connection
from requests.packages.urllib3.exceptions import NewConnectionError
except ImportError:
# For requests <= 2.2.x
import socket as connection
from socket import error as NewConnectionError
# Requests older than 2.4.0's VerifiedHHTPSConnection is broken and doesn't
# properly use _new_conn. On these versions, use UnverifiedHTTPSConnection
# instead.
if StrictVersion(requests.__version__) < StrictVersion('2.4.0'):
from requests.packages.urllib3.connection import (
UnverifiedHTTPSConnection as HTTPSConnection
from requests.packages.urllib3.connection import HTTPSConnection
class LoggingHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
self._ctx = kwargs.pop('ctx', None)
super(LoggingHTTPAdapter, self).__init__(*args, **kwargs)
def send(self, request, *args, **kwargs):
response = super(LoggingHTTPAdapter, self).send(request, *args, **kwargs)
if self._ctx:
'%s %s - %d %s',
request.method, request.url,
if (response.status_code > 300 and response.status_code < 400)
else len(response.content)),
return response
class ForcedIPHTTPSAdapter(LoggingHTTPAdapter):
def __init__(self, *args, **kwargs):
self._dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSAdapter, self).__init__(*args, **kwargs)
def init_poolmanager(self, *args, **pool_kwargs):
pool_kwargs['dest_ip'] = self._dest_ip
self.poolmanager = ForcedIPHTTPSPoolManager(*args, **pool_kwargs)
class ForcedIPHTTPSPoolManager(PoolManager):
def __init__(self, *args, **kwargs):
self._dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSPoolManager, self).__init__(*args, **kwargs)
def _new_pool(self, scheme, host, port, request_context=None):
kwargs = dict(self.connection_pool_kw)
assert scheme == 'https'
kwargs['dest_ip'] = self._dest_ip
return ForcedIPHTTPSConnectionPool(host, port, **kwargs)
class ForcedIPHTTPSConnectionPool(HTTPSConnectionPool):
def __init__(self, *args, **kwargs):
self._dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSConnectionPool, self).__init__(*args, **kwargs)
def _new_conn(self):
self.num_connections += 1
actual_host =
actual_port = self.port
if self.proxy is not None:
actual_host =
actual_port = self.proxy.port
self.conn_kw = getattr(self, 'conn_kw', {})
self.conn_kw['dest_ip'] = self._dest_ip
conn = ForcedIPHTTPSConnection(
host=actual_host, port=actual_port,
strict=self.strict, **self.conn_kw)
pc = self._prepare_conn(conn)
return pc
def __str__(self):
return '%s(host=%r, port=%r, dest_ip=%s)' % (
type(self).__name__,, self.port, self._dest_ip)
class ForcedIPHTTPSConnection(HTTPSConnection, object):
def __init__(self, *args, **kwargs):
self._dest_ip = kwargs.pop('dest_ip', None)
super(ForcedIPHTTPSConnection, self).__init__(*args, **kwargs)
def _new_conn(self):
extra_kw = {}
if self.source_address:
extra_kw['source_address'] = self.source_address
if getattr(self, 'socket_options', None):
extra_kw['socket_options'] = self.socket_options
dest_host = self._dest_ip if self._dest_ip else
conn = connection.create_connection(
(dest_host, self.port), self.timeout, **extra_kw)
except SocketTimeout as e:
raise ConnectTimeoutError(
self, "Connection to %s timed out. (connect timeout=%s)" %
(, self.timeout))
except SocketError as e:
raise NewConnectionError(
self, "Failed to establish a new connection: %s" % e)
return conn
import logging
import signal
import time
from flask import Flask, render_template, abort, request
from cheroot.wsgi import Server, PathInfoDispatcher
from prometheus_client import make_wsgi_app
has_prometheus = True
except ImportError:
has_prometheus = False
from ai_diagnostics.driver import Results, PeriodicExecutor
from ai_diagnostics.config import load_config
app = Flask(__name__)
def create_app(config={}):
probes = load_config(app.config['PROBES_CONFIG_DIR'])
probe_interval = app.config.get('PROBE_INTERVAL_SECS', 900)
app.results = Results(
max_age=app.config.get('RESULTS_MAX_AGE', probe_interval * 3))
app.executor = PeriodicExecutor(
app.results, probes, probe_interval)
def _stop():
return app, _stop
def home():
return render_template(
def probe_detail(probe_name):
result = app.results.get_result(probe_name)
if not result:
return render_template(
def timefmt(t):
return time.strftime('%d/%m/%Y %H:%M', time.gmtime(t))
def main():
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('--config', metavar='PATH',
help='directory with probe configurations (default %(default)s)')
parser.add_argument('--addr', metavar='IP', default='',
help='address to listen on (default %(default)s)')
parser.add_argument('--port', metavar='N', type=int, default=8419,
help='TCP port to listen on (default %(default)s)')
parser.add_argument('--interval', metavar='SECONDS', type=int, default=900,
help='interval between probe runs (default %(default)s secs)')
args = parser.parse_args()
# Create the application and start the CherryPY WSGI server.
app, cleanup = create_app({
'PROBES_CONFIG_DIR': args.config,
'PROBE_INTERVAL_SECS': args.interval,
if has_prometheus:
app = PathInfoDispatcher({
'/metrics': make_wsgi_app(),
'/': app,
server = Server(
(args.addr, args.port),
# Nicely shut down on SIGTERM.
def _sig_shutdown(signo, frame):
signal.signal(signal.SIGTERM, _sig_shutdown)
try:'server starting on http://%s:%d/', args.addr, args.port)
except KeyboardInterrupt:
server.stop()'server shutdown')
if __name__ == '__main__':
import contextlib
import mechanicalsoup
import requests
from urllib.parse import urlparse
from ai_diagnostics.adapters import ForcedIPHTTPSAdapter, LoggingHTTPAdapter
def make_browser(ctx):
session = requests.Session()
# Install DNS overrides, or just a simple logging adapter.
dns_map = ctx.get('dns', {})
if dns_map:
for name, ip in dns_map.items():
adapter = ForcedIPHTTPSAdapter(ctx=ctx, dest_ip=ip)
session.mount('https://' + name, adapter)
session.mount('https://', LoggingHTTPAdapter(ctx=ctx))
browser = mechanicalsoup.StatefulBrowser(
soup_config={'features': 'lxml'},
# Print a dump of browser state on errors.
yield browser
except Exception as e:
ctx.log('ERROR: %s', e)
ctx.log('Browser URL: %s\n\nPage contents:\n\n%s',
class LoginError(Exception):
def _login_error(page):
errp = page.find('p', class_='error')
if not errp:
return None
return errp.string.strip()
def _is_login_page(browser):
url = browser.get_url().split('?')[0]
return url.endswith('/sso/login')
def _is_2fa_form(page):
return (page.find('input', id='u2fResponseField') is not None)
def open_url(ctx, browser, url):
"""Open a URL, transparently handling authentication."""
resp =, timeout=60)
ctx.log('requesting %s', browser.get_url())
if not _is_login_page(browser):
return resp
if 'credentials' not in ctx:
raise LoginError('Authentication required but no credentials provided')
browser['username'] = ctx['credentials']['username']
browser['password'] = ctx['credentials']['password']
ctx.log('login page detected, logging in as %s...', ctx['credentials']['username'])
resp = browser.submit_selected()
page = browser.get_current_page()
err_msg = _login_error(page)
if err_msg:
raise LoginError(err_msg)
elif _is_2fa_form(page):
raise LoginError('2FA required')
ctx.log('login successful')
return resp
if __name__ == '__main__':
userinfo = {
#'username': '',
#'password': '1skIPK9m0/N!j5cW',
'username': '',
'password': 'pm7q4fyncnw6nb7bZ',
browser = make_browser({})
open_url(browser, '', userinfo)
open_url(browser, '', userinfo)
import glob
import logging
import os
from ai_diagnostics.probe import Probe, parameterize
from ai_diagnostics.probes.pannello import probe_pannello
from ai_diagnostics.probes.webmail import probe_webmail
from ai_diagnostics.probes.imap import probe_imap
from ai_diagnostics.probes.smtp import *
class Credentials(dict):
def __str__(self):
return self['username']
class ExternalMailAccount(dict):
def __str__(self):
return self['credentials']['username']
def _execfile(path):
locals = {}
with open(path, 'rb') as file:
exec(compile(, path, 'exec'), globals(), locals)
return locals
def load_config(directory):
probes = []
for path in glob.glob(os.path.join(directory, '*.py')):'loading configuration file %s', path)
data = _execfile(path)
probes.extend(data.get('PROBES', []))
except Exception as e:
logging.error('configuration error in %s: %s', path, e)
continue'%d probes defined.', len(probes))
return probes
import logging
import os
import queue
import random
import threading
import time
from prometheus_client import Gauge
probe_last_run_ts = Gauge(
'probe_last_run_seconds', 'Timestamp of last probe run')
has_prometheus = True
except ImportError:
has_prometheus = False
def _worker(q, results):
while True:
fn = q.get()
if fn is None:
class Runner(object):
"""Run callables in parallel with limited concurrency."""
def __init__(self, results, num_workers=5):
self.queue = queue.Queue()
self.threads = [
threading.Thread(name='Worker %d' % i,
args=(self.queue, results))
for i in range(num_workers)]
for t in self.threads:
def run(self, fn):
def close(self):
for t in self.threads:
for t in self.threads:
class Results(object):
"""Result cache that eventually forgets old results.