Commit 68d0b801 authored by ale's avatar ale

start adding tests for the SAML module

parent 4ae23e57
......@@ -203,6 +203,11 @@ def create_app(config_file=None, config={}):
except ImportError:
pass
# Optionally enable SAML handlers.
if 'SAML' in app.config:
from sso_server.saml.flask_views import init_app as saml_init_app
saml_init_app(app)
app.login_service = login_service.LoginService(app.config)
return app
......
from flask import Blueprint
saml_app = Blueprint('saml', __name__,
template_folder='templates')
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import base64
import logging
import time
import uuid
from flask import request, session, current_app, g
from flask import request, session, g
from BeautifulSoup import BeautifulStoneSoup
from . import codex
from . import exceptions
from . import xml_render
from .logging import get_saml_logger
from .app import saml_app
MINUTES = 60
HOURS = 60 * MINUTES
......@@ -45,7 +47,7 @@ class Processor(object):
def __init__(self, config, name=None):
self.name = name
self._config = config.copy()
self._logger = get_saml_logger()
self._logger = logging.getLogger('saml')
processor_path = self._config.get('processor', 'invalid')
......@@ -170,7 +172,7 @@ class Processor(object):
"""
Formats _response_params as _response_xml.
"""
sign_it = current_app.config['SAML2IDP_CONFIG']['signing']
sign_it = saml_app.config.get('SIGNING', True)
self._response_xml = xml_render.get_response_xml(self._response_params, signed=sign_it)
def _get_response_params(self):
......@@ -181,7 +183,7 @@ class Processor(object):
'acs_url': self._request_params['ACS_URL'],
'saml_response': self._saml_response,
'relay_state': self._relay_state,
'autosubmit': current_app.config['SAML2IDP_CONFIG']['autosubmit'],
'autosubmit': saml_app.config.get('AUTOSUBMIT', True),
}
def _parse_request(self):
......@@ -221,7 +223,7 @@ class Processor(object):
self._subject = None
self._subject_format = 'urn:oasis:names:tc:SAML:2.0:nameid-format:email'
self._system_params = {
'ISSUER': current_app.config['SAML2IDP_CONFIG']['issuer'],
'ISSUER': saml_app.issuer,
}
def _validate_request(self):
......
from __future__ import absolute_import
import functools
import os
import sso
import urllib
from flask import Blueprint, current_app, request, session, abort, redirect, make_response, url_for, g
from flask import request, session, abort, redirect, make_response, render_template, url_for, g
from . import exceptions
from . import xml_signing
from .logging import get_saml_logger
from .app import saml_app
saml_app = Blueprint('saml', __name__,
template_folder='templates')
logger = get_saml_logger()
sso_cookie_name = 'SSO_SAML'
def init_app(app):
app.register_blueprint(saml_app, url_prefix='/saml/')
# Stick a 'saml_config' object in the main app, so that
# xml_signing can find it via current_app.
app.saml_certificate = xml_signing.load_certificate(saml_config['CERTIFICATE_FILE'])
app.saml_private_key = xml_signing.load_private_key(saml_config['PRIVATE_KEY_FILE'])
app.register_blueprint(saml_app, url_prefix='/saml')
saml_config = app.config['SAML']
# Stick some immutable config objects in the app, so that
# other functions can find them.
saml_app.saml_certificate = xml_signing.load_certificate(saml_config['CERTIFICATE_FILE'])
saml_app.saml_private_key = xml_signing.load_private_key(saml_config['PRIVATE_KEY_FILE'])
saml_app.config = saml_config
saml_app.login_server = app.config['SSO_LOGIN_SERVER']
saml_app.login_server = saml_config['SSO_LOGIN_SERVER']
saml_app.sso_service = saml_app.login_server + 'saml/'
url_base = 'https://' + saml_app.sso_service
saml_app.sso_url = url_base + 'login'
saml_app.slo_url = url_base + 'logout'
saml_app.issuer = 'https://' + saml_app.login_server
with open(app.config['SSO_PUBLIC_KEY']) as fd:
public_key = fd.read()
saml_app.sso_verifier = sso.Verifier(
......@@ -41,6 +41,7 @@ def init_app(app):
def login_required(fn):
@functools.wraps(fn)
def _wrapper(*args, **kwargs):
# Try to fetch the cookie.
try:
......@@ -52,7 +53,7 @@ def login_required(fn):
's': saml_app.sso_service,
'd': request.url}))
return redirect(redir_url)
return functools.wraps(_wrapper)
return _wrapper
@saml_app.route('/sso_login')
......@@ -115,14 +116,13 @@ def logout():
@saml_app.route('/metadata/xml/')
def descriptor():
idp_config = current_app.config['SAML2IDP_CONFIG']
tv = {
'entity_id': idp_config['issuer'],
'entity_id': saml_app.issuer,
'slo_url': saml_app.slo_url,
'sso_url': saml_app.sso_url,
'pubkey': current_app.saml_certificate,
'pubkey': saml_app.saml_certificate,
}
resp = make_response(render_template('saml/idpssodescriptor.xml', tv))
resp = make_response(render_template('saml/idpssodescriptor.xml', **tv))
resp.headers['Content-Type'] = 'application/xml'
return resp
......
# -*- coding: utf-8 -*-
import structlog
def get_saml_logger():
"""
Get a logger named `saml2idp` after the main package.
"""
return structlog.get_logger('saml2idp')
......@@ -3,14 +3,14 @@ Query metadata from settings.
"""
from __future__ import absolute_import
from .exceptions import ImproperlyConfigured
from flask import current_app
from .app import saml_app
def get_config_for_acs(acs_url):
"""
Return SP configuration instance that handles acs_url.
"""
for friendlyname, config in current_app.config['SAML2IDP_REMOTES'].items():
for friendlyname, config in saml_app.config['SAML2IDP_REMOTES'].items():
if config['acs_url'] == acs_url:
return config
msg = 'SAML2IDP_REMOTES is not configured to handle the AssertionConsumerService at "%s"'
......@@ -21,7 +21,7 @@ def get_config_for_resource(resource_name):
"""
Return the SP configuration that handles a deep-link resource_name.
"""
for friendlyname, config in current_app.config['SAML2IDP_REMOTES'].items():
for friendlyname, config in saml_app.config['SAML2IDP_REMOTES'].items():
links = get_links(config)
for name, pattern in links:
if name == resource_name:
......@@ -35,7 +35,7 @@ def get_deeplink_resources():
Returns a list of resources that can be used for deep-linking.
"""
resources = []
for key, sp_config in current_app.config['SAML2IDP_REMOTES'].items():
for key, sp_config in saml_app.config['SAML2IDP_REMOTES'].items():
links = get_links(sp_config)
for resource, patterns in links:
if '/' not in resource:
......
......@@ -10,9 +10,7 @@ from importlib import import_module
from . import base
from . import exceptions
from . import xml_render
from .logging import get_saml_logger
logger = get_saml_logger()
from .app import saml_app
def SSOProcessor(base.Processor):
......@@ -70,7 +68,7 @@ def find_processor(request):
"""
Returns the Processor instance that is willing to handle this request.
"""
for name, sp_config in current_app.config['SAML2IDP_REMOTES'].items():
for name, sp_config in saml_app.config['SAML2IDP_REMOTES'].items():
proc = get_processor(name, sp_config)
try:
if proc.can_handle(request):
......
......@@ -3,6 +3,7 @@
Functions for creating XML output.
"""
from __future__ import absolute_import
import logging
import string
from .xml_signing import get_signature_xml
......@@ -13,9 +14,7 @@ from .xml_templates import (ATTRIBUTE,
RESPONSE,
SUBJECT)
from .logging import get_saml_logger
logger = get_saml_logger()
logger = logging.getLogger('saml')
def _get_attribute_statement(params):
......
......@@ -4,16 +4,15 @@ Signing code goes here.
"""
from __future__ import absolute_import
import hashlib
import logging
import string
import M2Crypto
from flask import current_app
from .app import saml_app
from .codex import nice64
from .xml_templates import SIGNED_INFO, SIGNATURE
from .logging import get_saml_logger
logger = get_saml_logger()
logger = logging.getLogger('saml')
def load_certificate(filename):
......@@ -42,9 +41,8 @@ def get_signature_xml(subject, reference_uri):
Returns XML Signature for subject.
"""
logger.debug('get_signature_xml - Begin.')
#config = current_app.saml_config
private_key = current_app.saml_private_key
certificate = current_app.saml_certificate
private_key = saml_app.saml_private_key
certificate = saml_app.saml_certificate
logger.debug('Subject: ' + subject)
......
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAqeMKrE1O4vnsG0oLN8nHTOn1ob5gqqJ16ioJetTdfIBjkqG2
wSG6ndAWsZ3OW+FuZWUFAWlKNjuk3fKeNImP9/DNnBIQMYgdtZL80rYAW08pc5ux
/7ULtM8aHZLg0+a9UqbYrK3pViwhOovZmuRzHbAM8fpTsbxcn3Wvq8IINMZtsXmU
8jUAf8XLE+a9+Kj4N/AGKs7HJ0Uv3IXJAtfJ2i2LjKz4iwjzEXfny/2Tk1zfoVSR
dO/IKPlVBhoVFNjIwDS4YY+kbUK4nM3uqnwUzpHUxHrG99YfqlDgaBopQIpCtxY/
M85PNBwpq+rpuimhHG4UXj9s6e1M6mUzZ3GulwIDAQABAoIBADYnfLB+GEZ8Oopc
aQUImjJnZfiAMlpSXVoeulTgZBYa6yLDz/jyooVJ3uIRlsVtL/68XAMeKBzvmFdz
zoCa1TzJExHMJENVy3qk91jH0YdJzcHfgLctZCTn28BPoMGLjRA6xVkOOxJQEOuT
xvL8HcH567Z7AJDopXHjVvNeb7U6goyCnlAPTqy+fFPFW2a/eDaro9hTuUntXVq5
O4fnbvWFzWuXZoOQzflyTYcvAaB/r27BUjCHZIg8ApVDi26UdAACQbOG85mCw2Nn
dRa4f44z5WPWkjfczqjP9kK/YeU/alfEu4X15C1jeT+MQfYPdk0jlyhduzU0jRUW
4KgnOwECgYEA2XF6VgFKXGGX/xH+HXgUT74pNHM9923SzQW6F2rEzY8X1FgRxblv
JQkZ0ew0S6k6rATyYvTAjVO/vqtanWypDtOZNJLD2CxwdNgenLvnxCy/bIHusmOX
gw6Ip/YYtcTyfE/PG+L51z+Fx1dEnOJSoOFNX5n/kub8NBdpNwduLBcCgYEAyALP
dQW0Qr8PjY65Zcqroo4lC/kt5rSAG3IYwatyw90hwxlw7tI/Eajt6XJ5KoLHYzAl
cFf/CKUPTPxm9bzZJOJr01bk56jwmEU4Q/8zcW19z2GcnEaEQe7YTATG9FnqM3NC
7mTlpaji0TezA3+hOb6Hk1NkBMRKJNNHZotFoYECgYBY1tw8t1pCVD1doWRJdf4W
doMvQsZWiTBPQ7+8QYsW0RfTUggw2Ipof0dlVTQolYO7NGozs/vXwhfdNvVWzEZm
IF2/9Ra/A6kOlB4tuzf+Fqo/EvcLO1Vz3kRHG20NFuJbl9/ubNXW88UcpLMLxgws
XuDV5n1c77KrNcVvDkIdEQKBgBFZlQ9sqD9l/X5lFBYNZ6gZNoN8YQvzOm2nDxi1
u6Xb59LZwR8mY8wvYDvw7yvN3mfvREAjkf74+oPOHUxRznBRp9LUZdTmpcA/uOrF
F3xOaCx2JjS0NULW6jJl87A8oIFIDVVKsVUWel51u3ShPKYarf4MrM9xC5F4Nmhh
ym+BAoGBANhn2NMY4Mgn4K7l748pRS6jfzlWC6IBNv/4IjSjyx7SneItVyXTulw+
IJFzQEhuX366Ho6c/vL+7gcKzP1+jWiF1wHfjesYYdMVxs3MFGx9l21WusYTI/1T
5haz07zx0EKUPdhp1X7bDYAoiENIBFC5l/xJT3oGPCazxgKjv/WO
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIDPDCCAiQCCQCgQIP3KGiRozANBgkqhkiG9w0BAQsFADBgMQswCQYDVQQGEwJB
VTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
cyBQdHkgTHRkMRkwFwYDVQQDDBBzYW1sLmV4YW1wbGUuY29tMB4XDTE2MTAxMTA4
NDMwMFoXDTI2MTAwOTA4NDMwMFowYDELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNv
bWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEZMBcG
A1UEAwwQc2FtbC5leGFtcGxlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
AQoCggEBAKnjCqxNTuL57BtKCzfJx0zp9aG+YKqideoqCXrU3XyAY5KhtsEhup3Q
FrGdzlvhbmVlBQFpSjY7pN3ynjSJj/fwzZwSEDGIHbWS/NK2AFtPKXObsf+1C7TP
Gh2S4NPmvVKm2Kyt6VYsITqL2Zrkcx2wDPH6U7G8XJ91r6vCCDTGbbF5lPI1AH/F
yxPmvfio+DfwBirOxydFL9yFyQLXydoti4ys+IsI8xF358v9k5Nc36FUkXTvyCj5
VQYaFRTYyMA0uGGPpG1CuJzN7qp8FM6R1MR6xvfWH6pQ4GgaKUCKQrcWPzPOTzQc
Kavq6bopoRxuFF4/bOntTOplM2dxrpcCAwEAATANBgkqhkiG9w0BAQsFAAOCAQEA
VypcH+Ce/XopAASCR6kpfTSJD+kgMm25OeSenCT724HaBLuDmcbL6s1NCHm1I0Ln
9vsgemvjTm7roMRFHTWn7lhzuWEoxExhLfJvX2xhX/wSITJIRIywQ+5hgV/BYtcI
gFyKDfuyrBcl7tP5ovGwW3bllCEvh1f183R80glYQtJD52mH+T7bU1F+zagoIdmT
wLUleIwGZ+SmxMJWRKgarTYvID1S2cwswd6KKQfkZC+7iNRXsQjuEANl4cMEabd1
gA8qPdDnrOHjQLjzgRrsbF1QST4D6k/4NpoG5vha+VdIEqqCYeGVWixDQ6MxKgto
qoVfoEzCxlakeMsKA8ualg==
-----END CERTIFICATE-----
import logging
import os
import shutil
import sso
import tempfile
import unittest
from sso_server import application
logging.basicConfig()
class SAMLTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
public_key, secret_key = sso.generate_keys()
self.public_key_file = os.path.join(self.tmpdir, 'public.key')
with open(self.public_key_file, 'w') as fd:
fd.write(public_key)
self.secret_key_file = os.path.join(self.tmpdir, 'secret.key')
with open(self.secret_key_file, 'w') as fd:
fd.write(secret_key)
self.domain = 'testdomain'
self.saml_cert = os.path.join(os.path.dirname(__file__), 'saml.pem')
self.saml_key = os.path.join(os.path.dirname(__file__), 'saml.key')
self.app = self._make_app()
def _make_app(self, **config):
config_ = {
'SSO_SECRET_KEY': self.secret_key_file,
'SSO_PUBLIC_KEY': self.public_key_file,
'SSO_DOMAIN': self.domain,
'SECRET_KEY': 'barbablu',
'ALLOWED_SERVICES': [],
'SAML': {
'SSO_LOGIN_SERVER': 'https://localhost:1234/',
'CERTIFICATE_FILE': self.saml_cert,
'PRIVATE_KEY_FILE': self.saml_key,
'SAML2IDP_REMOTES': {
# TODO
},
},
}
config_.update(config)
return application.create_app(config=config_)
def tearDown(self):
shutil.rmtree(self.tmpdir)
def test_idp_metadata(self):
# Fetch the IDP XML metadata, to verify that the SAML
# blueprint is properly plugged into the main app.
with self.app.test_client() as c:
response = c.get('/saml/metadata/xml/')
self.assertEquals(200, response.status_code)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment