Skip to content
Snippets Groups Projects
Commit af8a777c authored by ale's avatar ale
Browse files

improve the LDAP authentication model

parent 1ea7ab05
No related branches found
No related tags found
No related merge requests found
import contextlib
import ldap
from ldap.ldapobject import ReconnectLDAPObj
from ldap.dn import escape_dn_chars
from ldap.filter import escape_filter_chars
from ldap.ldapobject import LDAPObject
from authserv import model
......@@ -24,35 +26,49 @@ class UserDb(model.UserDb):
@contextlib.contextmanager
def _conn(self):
c = ReconnectLDAPObj(self.ldap_uri)
c = LDAPObject(self.ldap_uri)
c.protocol_version = ldap.VERSION3
c.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_pw)
yield c
c.unbind_s()
def _query_user(self, username, service):
# Allow referencing a service to another, by specifying a
# string rather than a dictionary as the value. If you build
# infinite loops this way, it's your fault.
ldap_params = self.service_map.get(service)
while isinstance(ldap_params, basestring):
ldap_params = self.service_map.get(ldap_params)
if not ldap_params:
return None
if callable(ldap_params['dn']):
basedn = ldap_params['dn'](username)
else:
basedn = ldap_params['dn']
with self._conn() as c:
result = c.search_s(
basedn,
ldap_params.get('scope', ldap.SCOPE_SUBTREE),
ldap_params['filter'].replace('%s', username),
self.ldap_attrs)
if not result:
return None
if len(result) > 1:
raise Error('too many results from LDAP')
return User(username, result[0][0], result[0][1])
# LDAP queries can be built in two ways:
#
# - specify a fully-qualified 'dn' attribute, in which
# case we'll run a SCOPE_BASE query for that specific DN.
# In this case, 'filter' is optional.
#
# - specify 'base' and 'filter' together to identify a
# single object. This is a SCOPE_SUBTREE search and
# 'filter' is required.
#
if 'dn' in ldap_params:
base = ldap_params['dn'].replace('%s', escape_dn_chars(username))
filt = ldap_params.get('filter', '').replace('%s', escape_filter_chars(username))
scope = ldap.SCOPE_BASE
else:
base = ldap_params['base'].replace('%s', escape_dn_chars(username))
filt = ldap_params['filter'].replace('%s', escape_filter_chars(username))
scope = ldap.SCOPE_SUBTREE
result = c.search_s(base, scope, filt, self.ldap_attrs)
if not result:
return None
if len(result) > 1:
raise Error('too many results from LDAP')
return User(username, result[0][0], result[0][1])
def get_user(self, username, service):
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment