Skip to content
Snippets Groups Projects
Commit 82fbfd14 authored by ale's avatar ale
Browse files

Split out TLS auth to its own module

parent ee7621bf
No related branches found
No related tags found
No related merge requests found
import functools
import os
import re
import ssl
import subprocess
from functools import wraps
from flask import Flask, request, abort
import werkzeug.serving
from .sso_api import sso_api_auth_required
from .tls_auth import tls_auth
### TLS authentication.
class PeerCertWSGIRequestHandler(werkzeug.serving.WSGIRequestHandler):
"""
We subclass this class so that we can gain access to the connection
property. self.connection is the underlying client socket. When a TLS
connection is established, the underlying socket is an instance of
SSLSocket, which in turn exposes the getpeercert() method.
The output from that method is what we want to make available elsewhere
in the application.
"""
def make_environ(self):
"""
The superclass method develops the environ hash that eventually
forms part of the Flask request object.
We allow the superclass method to run first, then we insert the
peer certificate into the hash. That exposes it to us later in
the request variable that Flask provides
"""
environ = super(PeerCertWSGIRequestHandler, self).make_environ()
environ['peercert'] = self.connection.getpeercert()
return environ
def _get_subject_cn(peercert):
"""Extract subject CN from the parsed peercert data."""
parsed_subject = peercert['subject']
if len(parsed_subject) != 1:
raise Exception('multiple subjects')
for attr, value in parsed_subject[0]:
if attr == 'commonName':
return value
def _regexp_match(rx, s):
"""Returns True if the anchored rx matches s."""
res = re.match('^%s$' % rx, s)
return res is not None
def tls_auth(fn):
@functools.wraps(fn)
def _tls_auth_wrapper(*args, **kwargs):
cn = _get_subject_cn(request.environ['peercert'])
for acl_path, acl_cn_rx in app.config.get('TLS_AUTH_ACLS', []):
if request.path.startswith(acl_path) and _regexp_match(acl_cn_rx, cn):
return fn(*args, **kwargs)
abort(403)
return _tls_auth_wrapper
def to_json(fn):
@functools.wraps(fn)
@wraps(fn)
def _json_wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
resp = make_response(json.dumps(result))
......@@ -68,25 +20,6 @@ def to_json(fn):
return _json_wrapper
### SSL server with the right parameters.
def serve_ssl(app, host='localhost', port=3000, **kwargs):
# Create a validating SSLContext.
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_ctx.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384')
ssl_ctx.load_cert_chain(certfile='server.crt',
keyfile='server.key')
ssl_ctx.load_verify_locations(cafile='ca.crt')
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
app.run(
host, port,
ssl_context=ssl_ctx,
request_handler=PeerCertWSGIRequestHandler,
**kwargs
)
### Test application.
def _random_password():
......@@ -176,5 +109,37 @@ def pwreset():
return {}
### SSL server with the right parameters.
def serve_ssl(app, host='localhost', port=3000, **kwargs):
# Create a validating SSLContext.
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_ctx.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384')
ssl_ctx.load_cert_chain(certfile='server.crt',
keyfile='server.key')
ssl_ctx.load_verify_locations(cafile='ca.crt')
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
app.run(
host, port,
ssl_context=ssl_ctx,
request_handler=PeerCertWSGIRequestHandler,
**kwargs
)
def main():
parser = optparse.OptionParser()
parser.add_option('--config', default='/etc/mailman/mailman_api.conf')
parser.add_option('--port', type='int', default=6460)
parser.add_option('--addr', default='0.0.0.0')
opts, args = parser.parse_args()
if args:
parser.error('Too many arguments')
app.config.from_pyfile(opts.config)
serve_ssl(app)
if __name__ == '__main__':
serve_ssl(app, debug=True)
main()
from functools import wraps
import os
import sso
import urllib
import urlparse
from flask import request, session, redirect, make_response, render_template, abort, g
from .app import app
from functools import wraps
from flask import current_app, request, make_response, abort, g
def init_sso(app):
......@@ -50,22 +46,22 @@ def sso_api_auth_required(func):
"""
@wraps(func)
def _auth_wrapper(*args, **kwargs):
if app.config.get('FAKE_SSO_USER'):
g.current_user = app.config['FAKE_SSO_USER']
if current_app.config.get('FAKE_SSO_USER'):
g.current_user = current_app.config['FAKE_SSO_USER']
g.sso_ticket = 'sso_ticket'
g.raw_sso_ticket = 'sso_ticket'
return func(*args, **kwargs)
sso_ticket = request.cookies.get(app.sso_cookie_name)
sso_ticket = request.cookies.get(current_app.sso_cookie_name)
if not sso_ticket:
abort(401)
try:
ticket = app.sso_validator.verify(sso_ticket.encode())
ticket = current_app.sso_validator.verify(sso_ticket.encode())
g.current_user = ticket.user()
g.sso_ticket = ticket
g.raw_sso_ticket = sso_ticket.encode()
except sso.Error as e:
app.logger.error('authentication failed: %s', e)
current_app.logger.error('authentication failed: %s', e)
abort(403)
return func(*args, **kwargs)
return _auth_wrapper
......
import re
import werkzeug.serving
from functools import wraps
from flask import current_app, request, abort
class PeerCertWSGIRequestHandler(werkzeug.serving.WSGIRequestHandler):
"""
We subclass this class so that we can gain access to the connection
property. self.connection is the underlying client socket. When a TLS
connection is established, the underlying socket is an instance of
SSLSocket, which in turn exposes the getpeercert() method.
The output from that method is what we want to make available elsewhere
in the application.
"""
def make_environ(self):
"""
The superclass method develops the environ hash that eventually
forms part of the Flask request object.
We allow the superclass method to run first, then we insert the
peer certificate into the hash. That exposes it to us later in
the request variable that Flask provides
"""
environ = super(PeerCertWSGIRequestHandler, self).make_environ()
environ['peercert'] = self.connection.getpeercert()
return environ
def _get_subject_cn(peercert):
"""Extract subject CN from the parsed peercert data."""
parsed_subject = peercert['subject']
if len(parsed_subject) != 1:
raise Exception('multiple subjects')
for attr, value in parsed_subject[0]:
if attr == 'commonName':
return value
def _regexp_match(rx, s):
"""Returns True if the anchored rx matches s."""
res = re.match('^%s$' % rx, s)
return res is not None
def tls_auth(fn):
"""Enable TLS client authentication for this endpoint."""
@wraps(fn)
def _tls_auth_wrapper(*args, **kwargs):
cn = _get_subject_cn(request.environ['peercert'])
for acl_path, acl_cn_rx in current_app.config.get('TLS_AUTH_ACLS', []):
if request.path.startswith(acl_path) and _regexp_match(acl_cn_rx, cn):
return fn(*args, **kwargs)
abort(403)
return _tls_auth_wrapper
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment