Commit 67fc4602 authored by ale's avatar ale

add some code ported from django-saml2-idp

parent 278c74d8
This diff is collapsed.
# Portions borrowed from:
# http://stackoverflow.com/questions/1089662/python-inflate-and-deflate-implementations
import zlib
import base64
def decode_base64_and_inflate( b64string ):
decoded_data = base64.b64decode( b64string )
return zlib.decompress( decoded_data , -15)
def deflate_and_base64_encode( string_val ):
zlibbed_str = zlib.compress( string_val )
compressed_string = zlibbed_str[2:-4]
return base64.b64encode( compressed_string )
def nice64(src):
""" Returns src base64-encoded and formatted nicely for our XML. """
return src.encode('base64').replace('\n', '')
import base
import exceptions
import xml_render
class Processor(base.Processor):
"""
Demo Response Handler Processor for testing against django-saml2-sp.
"""
def _format_assertion(self):
# NOTE: This uses the SalesForce assertion for the demo.
self._assertion_xml = xml_render.get_assertion_salesforce_xml(self._assertion_params, signed=True)
class AttributeProcessor(base.Processor):
"""
Demo Response Handler Processor for testing against django-saml2-sp;
Adds SAML attributes to the assertion.
"""
def _format_assertion(self):
# NOTE: This uses the SalesForce assertion for the demo.
self._assertion_params['ATTRIBUTES'] = {
'foo': 'bar',
}
self._assertion_xml = xml_render.get_assertion_salesforce_xml(self._assertion_params, signed=True)
class CannotHandleAssertion(Exception):
"""
This processor does not handle this assertion.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class UserNotAuthorized(Exception):
"""
User not authorized for SAML 2.0 authentication.
"""
def __init__(self, msg):
self.msg = msg
def __str__(self):
return repr(self.msg)
class ImproperlyConfigured(Exception):
pass
from __future__ import absolute_import
import os
from flask import Blueprint, current_app, request, session, abort, redirect, make_response, url_for
from . import exceptions
from . import xml_signing
from .logging import get_saml_logger
saml_app = Blueprint('saml', __name__,
template_folder='templates')
logger = get_saml_logger()
sso_cookie_name = 'SSO_SAML'
def _verifier():
if not hasattr(current_app, 'saml_sso_verifier'):
current_app.saml_sso_verifier = sso.Verifier(
current_app.config['PUBLIC_KEY'],
current_app.config['SAML_SSO_SERVICE'],
current_app.config['SSO_DOMAIN'],
[])
return current_app.saml_sso_verifier
def login_required(fn):
def _wrapper(*args, **kwargs):
# Try to fetch the cookie.
try:
tkt = _verifier().verify(request.cookies.get(sso_cookie_name))
return fn(*args, **kwargs)
except (TypeError, sso.Error) as e:
redir_url = 'https://%s?%s' % (
login_server, urllib.urlencode({
's': service,
'd': _make_absolute_url()}))
return redirect(redir_url)
return functools.wraps(_wrapper)
@saml_app.route('/sso_login')
def sso_login():
tkt_str = request.args['t']
next_url = request.args['d']
resp = redirect(next_url)
resp.set_cookie(sso_cookie_name, tkt_str)
return resp
@saml_app.route('/sso_logout')
def sso_logout():
resp = make_response('OK')
resp.set_cookie(sso_cookie_name, '', expires=0)
return resp
@saml_app.route('/login', methods=('GET', 'POST'))
def login_begin():
if request.method == 'POST':
source = request.form
else:
source = request.args
try:
session['SAMLRequest'] = source['SAMLRequest']
except KeyError:
abort(400)
session['RelayState'] = source.get('RelayState', '')
return redirect(url_for('saml.login_process'))
@saml_app.route('/init/{resource}/{target}/')
@login_required
def login_init(resource, target):
name, sp_config = metadata.get_config_for_resource(resource)
proc = registry.get_processor(name, sp_config)
try:
linkdict = dict(metadata.get_links(sp_config))
pattern = linkdict[resource]
except KeyError:
abort(400)
url = pattern % target
proc.init_deep_link(sp_config, url)
return _generate_response(proc)
@saml_app.route('/login/process')
@login_required
def login_process():
proc = registry.find_processor(request)
return _generate_response(proc)
@saml_app.route('/logout')
def logout():
pass
@saml_app.route('/metadata/xml/')
def descriptor():
idp_config = current_app.config['SAML2IDP_CONFIG']
tv = {
'entity_id': idp_config['issuer'],
'slo_url': current_app.config['SAML_SLO_URL'],
'sso_url': current_app.config['SAML_LOGIN_URL'],
'pubkey': xml_signing.load_certificate(idp_config),
}
resp = make_response(render_template('saml/idpssodescriptor.xml', tv))
resp.headers['Content-Type'] = 'application/xml'
return resp
def _generate_response(processor):
try:
tv = processor.generate_response()
except exceptions.UserNotAuthorized:
return render_template('saml/invalid_user.html')
return render_template('saml/login.html', tv)
import base
import codex
import exceptions
import xml_render
class Processor(base.Processor):
"""
SalesForce.com-specific SAML 2.0 AuthnRequest to Response Handler Processor.
"""
def _decode_request(self):
"""
Decodes request using both Base64 and Zipping.
"""
self._request_xml = codex.decode_base64_and_inflate(self._saml_request)
def _validate_request(self):
"""
Validates the _saml_request. Sub-classes should override this and
throw an Exception if the validation does not succeed.
"""
super(Processor, self)._validate_request()
if not '.google.com/a/' in self._request_params['ACS_URL']:
raise exceptions.CannotHandleAssertion('AssertionConsumerService is not a Google Apps URL.')
def _format_assertion(self):
self._assertion_xml = xml_render.get_assertion_googleapps_xml(self._assertion_params, signed=True)
# -*- coding: utf-8 -*-
import structlog
def get_saml_logger():
"""
Get a logger named `saml2idp` after the main package.
"""
return structlog.get_logger('saml2idp')
"""
Query metadata from settings.
"""
from __future__ import absolute_import
from .exceptions import ImproperlyConfigured
from flask import current_app
def get_config_for_acs(acs_url):
"""
Return SP configuration instance that handles acs_url.
"""
for friendlyname, config in current_app.config['SAML2IDP_REMOTES'].items():
if config['acs_url'] == acs_url:
return config
msg = 'SAML2IDP_REMOTES is not configured to handle the AssertionConsumerService at "%s"'
raise ImproperlyConfigured(msg % resource_name)
def get_config_for_resource(resource_name):
"""
Return the SP configuration that handles a deep-link resource_name.
"""
for friendlyname, config in current_app.config['SAML2IDP_REMOTES'].items():
links = get_links(config)
for name, pattern in links:
if name == resource_name:
return friendlyname, config
msg = 'SAML2IDP_REMOTES is not configured to handle a link resource "%s"'
raise ImproperlyConfigured(msg % resource_name)
def get_deeplink_resources():
"""
Returns a list of resources that can be used for deep-linking.
"""
resources = []
for key, sp_config in current_app.config['SAML2IDP_REMOTES'].items():
links = get_links(sp_config)
for resource, patterns in links:
if '/' not in resource:
# It's a simple deeplink, which is handled by 'login_init' URL.
continue
resources.append(resource)
return resources
def get_links(sp_config):
"""
Returns a list of (resource, pattern) tuples for the 'links' for an sp.
"""
links = sp_config.get('links', [])
if type(links) is dict:
links = links.items()
return links
# -*- coding: utf-8 -*-
from __future__ import absolute_import
"""
Registers and loads Processor classes from settings.
"""
import warnings
from importlib import import_module
from . import base
from . import exceptions
from . import xml_render
from .logging import get_saml_logger
logger = get_saml_logger()
def SSOProcessor(base.Processor):
def _validate_request(self):
super(SSOProcessor, self)._validate_request()
url = self._request_params['ACS_URL']
if '.autistici.org' not in url:
raise exceptions.CannotHandleAssertion('ACS is not a supported URL')
def _format_assertion(self):
self._assertion_xml = xml_render.get_assertion_salesforce_xml(self._assertion_params, signed=True)
def get_processor(name, config):
return SSOProcessor(name, config)
def old_get_processor(name, config):
"""
Get an instance of the processor with config.
"""
dottedpath = config['processor']
try:
dot = dottedpath.rindex('.')
except ValueError:
raise exceptions.ImproperlyConfigured('%s isn\'t a processors module' % dottedpath)
sp_module, sp_classname = dottedpath[:dot], dottedpath[dot+1:]
try:
mod = import_module(sp_module)
except ImportError as exc:
raise exceptions.ImproperlyConfigured(
'Error importing processors {0}: "{1}"'.format(sp_module, exc))
try:
sp_class = getattr(mod, sp_classname)
except AttributeError:
raise exceptions.ImproperlyConfigured(
'processors module "{0}" does not define a "{1}" class'.format(
sp_module, sp_classname))
try:
instance = sp_class(name=name, config=config)
except TypeError:
warnings.warn(
"the new version of the Processor class expects a 'name' argument "
"to be passed in. The use of old processors is deprecated and will "
"be removed in the future.", DeprecationWarning)
instance = sp_class(config=config)
instance.name = name
return instance
def find_processor(request):
"""
Returns the Processor instance that is willing to handle this request.
"""
for name, sp_config in current_app.config['SAML2IDP_REMOTES'].items():
proc = get_processor(name, sp_config)
try:
if proc.can_handle(request):
return proc
except exceptions.CannotHandleAssertion as exc:
# Log these, but keep looking.
logger.debug('%s %s' % (proc, exc))
raise exceptions.CannotHandleAssertion(
'None of the processors in SAML2IDP_REMOTES could handle this request.')
import base
import exceptions
import xml_render
class Processor(base.Processor):
"""
SalesForce.com-specific SAML 2.0 AuthnRequest to Response Handler Processor.
"""
def _validate_request(self):
"""
Validates the _saml_request. Sub-classes should override this and
throw an Exception if the validation does not succeed.
"""
super(Processor, self)._validate_request()
if not '.salesforce.com' in self._request_params['ACS_URL']:
raise exceptions.CannotHandleAssertion('AssertionConsumerService is not a SalesForce URL.')
def _determine_audience(self):
self._audience = 'https://saml.salesforce.com'
def _format_assertion(self):
self._assertion_xml = xml_render.get_assertion_salesforce_xml(self._assertion_params, signed=True)
import base
import xml_render
import zlib
import base64
class Processor(base.Processor):
"""
Shib-specific Processor
"""
def _format_assertion(self):
self._assertion_xml = xml_render.get_assertion_salesforce_xml(self._assertion_params, signed=True)
def _decode_request(self):
"""
Decodes _request_xml from _saml_request.
"""
self._request_xml = zlib.decompress(base64.b64decode(self._saml_request), -15)
def _determine_audience(self):
"""
Determines the _audience.
"""
self._audience = "https://sp.testshib.org/shibboleth-sp"
<?xml version="1.0"?>
<md:EntityDescriptor xmlns:md="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ds="http://www.w3.org/2000/09/xmldsig#" entityID="{{ entity_id }}">
<md:IDPSSODescriptor protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
<md:KeyDescriptor use="signing">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>{{ cert_public_key }}</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:KeyDescriptor use="encryption">
<ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
<ds:X509Data>
<ds:X509Certificate>{{ cert_public_key }}</ds:X509Certificate>
</ds:X509Data>
</ds:KeyInfo>
</md:KeyDescriptor>
<md:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="{{ slo_url }}"/>
<md:NameIDFormat>urn:oasis:names:tc:SAML:2.0:nameid-format:email</md:NameIDFormat>
<md:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="{{ sso_url }}"/>
</md:IDPSSODescriptor>
</md:EntityDescriptor>
{% extends "saml/base.html" %}
{% block content %}
You have logged in, but your user account is not enabled for SAML 2.0.
{% endblock %}
{% extends "saml/base.html" %}
{% block content %}
You have successfully logged out of the Identity Provider.
{% endblock %}
{% extends "saml/base.html" %}
{% block content %}
<div id="logged_in_message">
You are now signed in at the Identity Provider.
<br />
<span id="logged_in_message_continue">
Click Continue to return to the Service Provider.
</span>
<span id="logged_in_message_automatic" style="display:none">
You will now be automatically redirected to the Service Provider.
</span>
</div>
<form id="logged_in_post_form" method="post" action="{{ acs_url }}">
<input type="hidden" name="RelayState" value="{{ relay_state }}" />
<input type="hidden" name="SAMLResponse" value="{{ saml_response }}" />
<input id="logged_in_button_continue" type="submit" value="Continue" />
</form>
{% if autosubmit %}
<script language="javascript">
<!--
/* Hide the submit button and message and automatically submit the form. */
var msg = document.getElementById('logged_in_message_continue'),
aut = document.getElementById('logged_in_message_automatic'),
btn = document.getElementById('logged_in_button_continue'),
frm = document.getElementById('logged_in_post_form');
msg.style.display = 'none';
aut.style.display = 'block';
btn.style.display = 'none';
frm.submit();
//-->
</script>
{% endif %}
{% endblock %}
from django.conf.urls import patterns, url
from views import descriptor, login_begin, login_init, login_process, logout
from metadata import get_deeplink_resources
def deeplink_url_patterns(
prefix='',
url_base_pattern=r'^init/%s/$',
login_init_func=login_init,
):
"""
Returns new deeplink URLs based on 'links' from settings.SAML2IDP_REMOTES.
Parameters:
- url_base_pattern - Specify this if you need non-standard deeplink URLs.
NOTE: This will probably closely match the 'login_init' URL.
"""
resources = get_deeplink_resources()
new_patterns = []
for resource in resources:
new_patterns += patterns(prefix,
url( url_base_pattern % resource,
login_init_func,
{
'resource': resource,
},
)
)
return new_patterns
urlpatterns = patterns('',
url(r'^login/$', login_begin, name="saml_login_begin"),
url(r'^login/process/$', login_process, name='saml_login_process'),
url(r'^logout/$', logout, name="saml_logout"),
url(r'^metadata/xml/$', descriptor, name='metadata_xml'),
# For "simple" deeplinks:
url(r'^init/(?P<resource>\w+)/(?P<target>\w+)/$',
login_init,
name="login_init"),
)
# Issue 13 - Add new automagically-created URLs for deeplinks:
urlpatterns += deeplink_url_patterns()
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from django.contrib import auth
from django.core.validators import URLValidator
from django.contrib.auth.decorators import login_required
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.core.urlresolvers import reverse
from django.utils.datastructures import MultiValueDictKeyError
from django.shortcuts import render_to_response, redirect
from django.http import HttpResponseBadRequest, HttpResponseRedirect
from django.template import RequestContext
from django.views.decorators.csrf import csrf_exempt
from . import exceptions
from . import metadata
from . import registry
from . import xml_signing
from .logging import get_saml_logger
logger = get_saml_logger()
# The 'schemes' argument for the URLValidator was introduced in Django 1.6. This
# ensure that URL validation works in 1.5 as well.
try:
URL_VALIDATOR = URLValidator(schemes=('http', 'https'))
except TypeError:
URL_VALIDATOR = URLValidator()
BASE_TEMPLATE_DIR = 'saml2idp'
def _get_template_names(filename, processor=None):
"""
Create a list of template names to use based on the processor name. This
makes it possible to have processor-specific templates.
"""
specific_templates = []
if processor and processor.name:
specific_templates = [
os.path.join(BASE_TEMPLATE_DIR, processor.name, filename)]
return specific_templates + [os.path.join(BASE_TEMPLATE_DIR, filename)]
def _generate_response(request, processor):
"""
Generate a SAML response using processor and return it in the proper Django
response.
"""
try:
tv = processor.generate_response()
except exceptions.UserNotAuthorized:
template_names = _get_template_names('invalid_user.html', processor)
return render_to_response(template_names,
context_instance=RequestContext(request))
template_names = _get_template_names('login.html', processor)
return render_to_response(template_names,
tv,
context_instance=RequestContext(request))
def xml_response(request, template, tv):
return render_to_response(template, tv, content_type="application/xml")
@csrf_exempt
def login_begin(request, *args, **kwargs):
"""
Receives a SAML 2.0 AuthnRequest from a Service Provider and
stores it in the session prior to enforcing login.
"""
if request.method == 'POST':
source = request.POST
else:
source = request.GET
# Store these values now, because Django's login cycle won't preserve them.
try:
request.session['SAMLRequest'] = source['SAMLRequest']
except (KeyError, MultiValueDictKeyError):
return HttpResponseBadRequest('the SAML request payload is missing')
request.session['RelayState'] = source.get('RelayState', '')
return redirect('saml_login_process')
@login_required
def login_init(request, resource, **kwargs):
"""
Initiates an IdP-initiated link to a simple SP resource/target URL.
"""
name, sp_config = metadata.get_config_for_resource(resource)
proc = registry.get_processor(name, sp_config)
try:
linkdict = dict(metadata.get_links(sp_config))
pattern = linkdict[resource]
except KeyError:
raise ImproperlyConfigured('Cannot find link resource in SAML2IDP_REMOTE setting: "%s"' % resource)
is_simple_link = ('/' not in resource)
if is_simple_link:
simple_target = kwargs['target']
url = pattern % simple_target
else:
url = pattern % kwargs
proc.init_deep_link(request, sp_config, url)
return _generate_response(request, proc)
@login_required
def login_process(request):
"""
Processor-based login continuation.
Presents a SAML 2.0 Assertion for POSTing back to the Service Provider.
"""
logger.debug("Request: %s" % request)
proc = registry.find_processor(request)
return _generate_response(request, proc)
@csrf_exempt
def logout(request):
"""
Allows a non-SAML 2.0 URL to log out the user and
returns a standard logged-out page. (SalesForce and others use this method,
though it's technically not SAML 2.0).
"""
auth.logout(request)
redirect_url = request.GET.get('redirect_to', '')
try:
URL_VALIDATOR(redirect_url)
except ValidationError:
pass
else:
return HttpResponseRedirect(redirect_url)
return render_to_response(_get_template_names('logged_out.html'),
{},
context_instance=RequestContext(request))
@login_required
@csrf_exempt
def slo_logout(request):
"""
Receives a SAML 2.0 LogoutRequest from a Service Provider,
logs out the user and returns a standard logged-out page.
"""
request.session['SAMLRequest'] = request.POST['SAMLRequest']
#TODO: Parse SAML LogoutRequest from POST data, similar to login_process().
#TODO: Add a URL dispatch for this view.
#TODO: Modify the base processor to handle logouts?
#TODO: Combine this with login_process(), since they are so very similar?
#TODO: Format a LogoutResponse and return it to the browser.
#XXX: For now, simply log out without validating the request.
auth.logout(request)
tv = {}
return render_to_response(_get_template_names('logged_out.html'),
tv,
context_instance=RequestContext(request))
def descriptor(request):
"""
Replies with the XML Metadata IDSSODescriptor.
"""
idp_config = current_app.config['SAML2IDP_CONFIG']
entity_id = idp_config['issuer']
slo_url = request.build_absolute_uri(reverse('saml_logout'))
sso_url = request.build_absolute_uri(reverse('saml_login_begin'))
pubkey = xml_signing.load_certificate(idp_config)
tv = {
'entity_id': entity_id,
'cert_public_key': pubkey,
'slo_url': slo_url,