Skip to content
Snippets Groups Projects
Commit ee7621bf authored by ale's avatar ale
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
import functools
import os
import re
import ssl
import subprocess
from flask import Flask, request, abort
import werkzeug.serving
from .sso_api import sso_api_auth_required
### TLS authentication.
class PeerCertWSGIRequestHandler(werkzeug.serving.WSGIRequestHandler):
"""
We subclass this class so that we can gain access to the connection
property. self.connection is the underlying client socket. When a TLS
connection is established, the underlying socket is an instance of
SSLSocket, which in turn exposes the getpeercert() method.
The output from that method is what we want to make available elsewhere
in the application.
"""
def make_environ(self):
"""
The superclass method develops the environ hash that eventually
forms part of the Flask request object.
We allow the superclass method to run first, then we insert the
peer certificate into the hash. That exposes it to us later in
the request variable that Flask provides
"""
environ = super(PeerCertWSGIRequestHandler, self).make_environ()
environ['peercert'] = self.connection.getpeercert()
return environ
def _get_subject_cn(peercert):
"""Extract subject CN from the parsed peercert data."""
parsed_subject = peercert['subject']
if len(parsed_subject) != 1:
raise Exception('multiple subjects')
for attr, value in parsed_subject[0]:
if attr == 'commonName':
return value
def _regexp_match(rx, s):
"""Returns True if the anchored rx matches s."""
res = re.match('^%s$' % rx, s)
return res is not None
def tls_auth(fn):
@functools.wraps(fn)
def _tls_auth_wrapper(*args, **kwargs):
cn = _get_subject_cn(request.environ['peercert'])
for acl_path, acl_cn_rx in app.config.get('TLS_AUTH_ACLS', []):
if request.path.startswith(acl_path) and _regexp_match(acl_cn_rx, cn):
return fn(*args, **kwargs)
abort(403)
return _tls_auth_wrapper
def to_json(fn):
@functools.wraps(fn)
def _json_wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
resp = make_response(json.dumps(result))
resp.headers['Content-Type'] = 'application/json'
return resp
return _json_wrapper
### SSL server with the right parameters.
def serve_ssl(app, host='localhost', port=3000, **kwargs):
# Create a validating SSLContext.
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_ctx.set_ciphers('ECDHE-ECDSA-AES256-GCM-SHA384')
ssl_ctx.load_cert_chain(certfile='server.crt',
keyfile='server.key')
ssl_ctx.load_verify_locations(cafile='ca.crt')
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
app.run(
host, port,
ssl_context=ssl_ctx,
request_handler=PeerCertWSGIRequestHandler,
**kwargs
)
### Test application.
def _random_password():
return os.urandom(12).encode('base64').rstrip('=\n')
app = Flask(__name__)
app.config['TLS_AUTH_ACLS'] = [
('/', 'client'),
]
@app.route('/')
@tls_auth
def index():
return '<h1>%s</h1>\n<pre>%s</pre>' % (
_get_subject_cn(request.environ['peercert']),
str(request.environ['peercert']),
)
@app.route('/api/create_list')
@tls_auth
@to_json
def create_list():
list_addr = request.json['list']
admin_addr = request.json['admin']
lang = request.json.get('lang', 'en')
list_name, list_domain = list_addr.split('@')
subprocess.check_call([
'/var/lib/mailman/bin/newlist',
'--automate',
'--language=' + lang,
'--urlhost=' + app.config['MAILMAN_URL_HOST'],
'--emailhost=' + list_domain,
list_name,
admin_addr,
_random_password(),
])
if os.path.exists('/etc/mailman/forced_params'):
subprocess.check_call([
'/var/lib/mailman/bin/config_list',
'-i', '/etc/mailman/forced_params',
list_name,
])
app.logger.info('created new list %s', list_addr)
return {}
@app.route('/api/get_list_attrs')
@tls_auth
@to_json
def get_list_attrs():
lists = request.json['lists']
out = []
for l in lists:
m = MailList.MailList(l.split('@')[0], 0)
attrs = {
'name': l,
'public': 'yes' if m.archive_private else 'no',
'description': unicode(m.description, 'utf-8'),
'owners': _unique(m.owner[:]),
}
out.append(attrs)
return out
@app.route('/api/pwreset')
@tls_auth
@sso_api_auth_required
@to_json
def pwreset():
list_addr = request.json['list']
list_name = list_addr.split('@')[0]
m = MailList.MailList(list_name, 0)
if not g.sso_is_admin and not (g.sso_user in m.owner):
abort(403)
subprocess.check_call([
'/var/lib/mailman/bin/changepw',
list_name,
])
return {}
if __name__ == '__main__':
serve_ssl(app, debug=True)
from functools import wraps
import os
import sso
import urllib
import urlparse
from flask import request, session, redirect, make_response, render_template, abort, g
from .app import app
def init_sso(app):
if 'SSO_LOGIN_SERVER' not in app.config:
raise Exception('Must configure SSO_LOGIN_SERVER')
if 'SSO_SERVICE' not in app.config:
raise Exception('Must configure SSO_SERVICE')
if 'SSO_DOMAIN' not in app.config:
raise Exception('Must configure SSO_DOMAIN')
pubkey_file = app.config.get(
'SSO_PUBLIC_KEY_FILE', '/etc/sso/public.key')
with open(pubkey_file) as f:
pubkey = f.read()
# Ensure the login server URL is /-terminated.
app.sso_login_server = app.config['SSO_LOGIN_SERVER'].rstrip('/') + '/'
app.sso_service = app.config['SSO_SERVICE']
app.sso_cookie_name = app.config.get('SSO_COOKIE_NAME', '_sso')
app.sso_validator = sso.Verifier(
pubkey, app.config['SSO_SERVICE'], app.config['SSO_DOMAIN'],
app.config.get('SSO_GROUPS'))
if app.config.get('SSO_DEBUG'):
app.logger.info(
'SSO verifier created (service=%s, domain=%s, groups=%s)',
app.config['SSO_SERVICE'], app.config['SSO_DOMAIN'],
app.config.get('SSO_GROUPS'))
def sso_api_auth_required(func):
"""Wrap an API (non-interactive) handler with SSO authentication.
This wrapper will redirect the user to the single sign-on login
server if the request lacks a valid SSO ticket.
Sets the 'g.current_user' variable to the name of the
authenticated user, and 'g.sso_ticket' to the SSO ticket itself.
Does not support interactive clients (browsers), so it will not
support nonces and won't send redirects to the login server.
"""
@wraps(func)
def _auth_wrapper(*args, **kwargs):
if app.config.get('FAKE_SSO_USER'):
g.current_user = app.config['FAKE_SSO_USER']
g.sso_ticket = 'sso_ticket'
g.raw_sso_ticket = 'sso_ticket'
return func(*args, **kwargs)
sso_ticket = request.cookies.get(app.sso_cookie_name)
if not sso_ticket:
abort(401)
try:
ticket = app.sso_validator.verify(sso_ticket.encode())
g.current_user = ticket.user()
g.sso_ticket = ticket
g.raw_sso_ticket = sso_ticket.encode()
except sso.Error as e:
app.logger.error('authentication failed: %s', e)
abort(403)
return func(*args, **kwargs)
return _auth_wrapper
git+https://git.autistici.org/ai/sso.git@nonce#egg=sso&subdirectory=src/python
Flask
backoff
requests
setup.py 0 → 100755
#!/usr/bin/python
from setuptools import setup, find_packages
setup(
name="mailman-api",
version="0.1",
description="Internal Mailman API server",
author="Autistici/Inventati",
author_email="info@autistici.org",
url="https://git.autistici.org/ai3/python-mailman-api",
install_requires=["Flask",
"backoff",
"requests",
"sso"],
packages=find_packages(),
entry_points={
"console_scripts": [
"python-mailman-api-server = mailman_api.mailman:main",
],
},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment