from __future__ import absolute_import import functools import os import sso import urllib from flask import request, session, abort, redirect, make_response, render_template, url_for, g, current_app from . import exceptions from . import registry from . import xml_signing from .app import saml_app sso_cookie_name = 'SSO_SAML' def init_app(app): app.register_blueprint(saml_app, url_prefix='/saml') saml_config = app.config['SAML'] # Stick some immutable config objects in the app, so that # other functions can find them. saml_app.saml_certificate = xml_signing.load_certificate(saml_config['CERTIFICATE_FILE']) saml_app.saml_private_key = xml_signing.load_private_key(saml_config['PRIVATE_KEY_FILE']) saml_app.config = saml_config saml_app.login_server = saml_config['SSO_LOGIN_SERVER'] saml_app.sso_service = saml_app.login_server + '/saml/' url_base = 'https://' + saml_app.sso_service saml_app.sso_url = url_base + 'login' saml_app.slo_url = url_base + 'logout' saml_app.issuer = 'https://' + saml_app.login_server with open(app.config['SSO_PUBLIC_KEY']) as fd: public_key = fd.read() saml_app.sso_verifier = sso.Verifier( public_key, saml_app.sso_service, app.config['SSO_DOMAIN'], []) class NoCookieError(Exception): pass def login_required(fn): @functools.wraps(fn) def _wrapper(*args, **kwargs): # Try to fetch the cookie. try: cookie = request.cookies.get(sso_cookie_name) if not cookie: raise NoCookieError('no cookie') current_app.logger.info('retrieved cookie: %s', cookie) g.sso_ticket = saml_app.sso_verifier.verify(str(cookie)) # Cheat by looking up the email using the LoginService # private to the main app. g.user_email = current_app.login_service.auth.get_user_email( g.sso_ticket.user()) return fn(*args, **kwargs) except (NoCookieError, TypeError, sso.Error) as e: current_app.logger.error('auth failed: %s', str(e)) redir_url = 'https://%s/?%s' % ( saml_app.login_server, urllib.urlencode({ 's': saml_app.sso_service, 'd': request.url})) return redirect(redir_url) return _wrapper @saml_app.route('/sso_login') def sso_login(): tkt_str = request.args['t'] next_url = request.args['d'] resp = redirect(next_url) resp.set_cookie(sso_cookie_name, tkt_str) current_app.logger.info('set sso cookie %s to %s', sso_cookie_name, tkt_str) return resp @saml_app.route('/sso_logout') def sso_logout(): resp = make_response('OK') resp.set_cookie(sso_cookie_name, '', expires=0) return resp @saml_app.route('/login', methods=('GET', 'POST')) def login_begin(): if request.method == 'POST': source = request.form else: source = request.args try: session['SAMLRequest'] = source['SAMLRequest'] except KeyError: abort(400) session['RelayState'] = source.get('RelayState', '') return redirect(url_for('saml.login_process')) @saml_app.route('/init/{resource}/{target}/') @login_required def login_init(resource, target): name, sp_config = metadata.get_config_for_resource(resource) proc = registry.get_processor(name, sp_config) try: linkdict = dict(metadata.get_links(sp_config)) pattern = linkdict[resource] except KeyError: abort(400) url = pattern % target proc.init_deep_link(sp_config, url) return _generate_response(proc) @saml_app.route('/login/process') @login_required def login_process(): proc = registry.find_processor() return _generate_response(proc) @saml_app.route('/logout') def logout(): pass @saml_app.route('/metadata/xml/') def descriptor(): tv = { 'entity_id': saml_app.issuer, 'slo_url': saml_app.slo_url, 'sso_url': saml_app.sso_url, 'pubkey': saml_app.saml_certificate, } resp = make_response(render_template('saml/idpssodescriptor.xml', **tv)) resp.headers['Content-Type'] = 'application/xml' return resp def _generate_response(processor): try: tv = processor.generate_response() except exceptions.UserNotAuthorized: return render_template('saml/invalid_user.html') return render_template('saml/login.html', **tv)