Skip to content
Snippets Groups Projects
Commit f726dd7d authored by ale's avatar ale
Browse files

show (internally) detailed auth error messages

parent 023dd3b7
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@ from authserv import auth
from authserv import protocol
from authserv.ratelimit import *
_blacklisted = (protocol.ERR_AUTHENTICATION_FAILURE, None)
_blacklisted = (protocol.ERR_AUTHENTICATION_FAILURE, 'blacklisted', None)
@blacklist_on_auth_failure(key_from_args(0), count=5, period=600, ttl=43200,
......@@ -13,6 +13,6 @@ _blacklisted = (protocol.ERR_AUTHENTICATION_FAILURE, None)
def do_auth(username, service, shard, password, otp_token, source_ip):
user = current_app.userdb.get_user(username, service, shard)
if not user:
return _blacklisted
return (auth.authenticate(user, service, password, otp_token),
user.get_shard())
return protocol.ERR_AUTHENTICATION_FAILURE, 'user does not exist', None
result, errmsg = auth.authenticate(user, service, password, otp_token)
return (result, errmsg, user.get_shard())
......@@ -34,14 +34,16 @@ def api_auth():
abort(400)
try:
result, _ = do_auth(username, service, shard, password, otp_token, source_ip)
result, errmsg, unused_shard = do_auth(
username, service, shard, password, otp_token, source_ip)
except Exception, e:
app.logger.exception('Unexpected exception in authenticate()')
abort(500)
app.logger.info(
'AUTH %s %s otp=%s %s',
username, service, otp_token and 'y' or 'n', result)
'AUTH %s %s %s otp=%s%s',
username, service, result, otp_token and 'y' or 'n',
(' err=%s' % errmsg) if errmsg else '')
response = make_response(result)
response.headers['Cache-Control'] = 'no-cache'
......
......@@ -31,15 +31,16 @@ def do_nginx_http_auth():
n_attempt = 1
try:
auth_status, shard = do_auth(
auth_status, errmsg, shard = do_auth(
username, service, None, password, None, source_ip)
except Exception, e:
app.logger.exception('Unexpected exception in authenticate()')
abort(500)
app.logger.info(
'NGINX_AUTH %s %s %s shard=%s',
username, service, auth_status, shard)
'NGINX_AUTH %s %s %s shard=%s%s',
username, service, auth_status, shard,
(' err=%s' % errmsg) if errmsg else '')
response = make_response('')
if auth_status == 'OK':
......
......@@ -6,16 +6,16 @@ from authserv import protocol
def _check_main_password(userpw, password):
if safe_str_cmp(crypt.crypt(password, userpw), userpw):
return protocol.OK
return protocol.OK, None
else:
return protocol.ERR_AUTHENTICATION_FAILURE
return protocol.ERR_AUTHENTICATION_FAILURE, 'bad password'
def _check_app_specific_password(asps, password):
for app_pw in asps:
if safe_str_cmp(crypt.crypt(password, app_pw), app_pw):
return protocol.OK
return protocol.ERR_AUTHENTICATION_FAILURE
return protocol.OK, None
return protocol.ERR_AUTHENTICATION_FAILURE, 'bad app-specific password'
def _check_otp(totp_key, token):
......@@ -32,23 +32,20 @@ def _check_otp(totp_key, token):
def authenticate(user, service, password, otp_token):
if not password:
return protocol.ERR_AUTHENTICATION_FAILURE
return protocol.ERR_AUTHENTICATION_FAILURE, 'empty password'
if isinstance(password, unicode):
password = password.encode('utf-8')
if user.app_specific_passwords_enabled():
if _check_app_specific_password(user.get_app_specific_passwords(service),
password) == protocol.OK:
return protocol.OK
else:
return protocol.ERR_AUTHENTICATION_FAILURE
return _check_app_specific_password(
user.get_app_specific_passwords(service), password)
if user.otp_enabled():
if not otp_token:
return protocol.ERR_OTP_REQUIRED
return protocol.ERR_OTP_REQUIRED, 'otp required'
if _check_otp(user.get_totp_key(), otp_token) != protocol.OK:
return protocol.ERR_AUTHENTICATION_FAILURE
return protocol.ERR_AUTHENTICATION_FAILURE, 'bad otp token'
return _check_main_password(user.get_password(), password)
return _check_main_password(user.get_password(), password)
import mock
from authserv.test import *
from authserv.ratelimit import *
from authserv import protocol
......@@ -24,16 +25,17 @@ class ServerTest(unittest.TestCase):
self.app = app.test_client()
def test_nginx_http_auth_ok(self):
response = self.app.get(
'/', headers={
'Auth-User': 'user',
'Auth-Pass': 'pass',
'Client-IP': '127.0.0.1',
'Auth-Protocol': 'imap',
'Auth-Login-Attempt': '1',
})
self.assertEquals(200, response.status_code)
self.assertEquals('OK', response.headers['Auth-Status'])
with mock.patch('socket.gethostbyname', return_value='127.0.0.1'):
response = self.app.get(
'/', headers={
'Auth-User': 'user',
'Auth-Pass': 'pass',
'Client-IP': '127.0.0.1',
'Auth-Protocol': 'imap',
'Auth-Login-Attempt': '1',
})
self.assertEquals(200, response.status_code)
self.assertEquals('OK', response.headers['Auth-Status'])
def test_nginx_http_auth_fail(self):
response = self.app.get(
......
......@@ -10,22 +10,22 @@ class AuthTest(unittest.TestCase):
u = FakeUser('user', 'pass')
self.assertEquals(
protocol.OK,
authenticate(u, 'svc', 'pass', None))
authenticate(u, 'svc', 'pass', None)[0])
def test_main_password_fail(self):
u = FakeUser('user', 'pass')
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'svc', 'badpass', None))
authenticate(u, 'svc', 'badpass', None)[0])
def test_require_otp(self):
u = FakeUser('user', 'pass', otp_key=1234)
self.assertEquals(
protocol.ERR_OTP_REQUIRED,
authenticate(u, 'svc', 'pass', None))
authenticate(u, 'svc', 'pass', None)[0])
self.assertEquals(
protocol.ERR_OTP_REQUIRED,
authenticate(u, 'svc', 'badpass', None))
authenticate(u, 'svc', 'badpass', None)[0])
def test_otp_ok(self):
secret = '089421'
......@@ -34,30 +34,30 @@ class AuthTest(unittest.TestCase):
u = FakeUser('user', 'pass', otp_key=secret)
self.assertEquals(
protocol.OK,
authenticate(u, 'svc', 'pass', str(token)))
authenticate(u, 'svc', 'pass', str(token))[0])
def test_otp_fail(self):
u = FakeUser('user', 'pass', otp_key='123456')
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'svc', 'pass', '123456'))
authenticate(u, 'svc', 'pass', '123456')[0])
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'svc', 'pass', 'malformed otp token!'))
authenticate(u, 'svc', 'pass', 'malformed otp token!')[0])
def test_app_specific_password_ok(self):
u = FakeUser('user', 'pass', asps=[
('svc', crypt.crypt('app-specific', 'zz'))])
self.assertEquals(
protocol.OK,
authenticate(u, 'svc', 'app-specific', None))
authenticate(u, 'svc', 'app-specific', None)[0])
def test_app_specific_password_fail(self):
u = FakeUser('user', 'pass', asps=[
('svc', crypt.crypt('app-specific', 'zz'))])
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'svc', 'badpass', None))
authenticate(u, 'svc', 'badpass', None)[0])
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'svc2', 'app-specific', None))
authenticate(u, 'svc2', 'app-specific', None)[0])
......@@ -63,14 +63,14 @@ class LdapAuthTest(LdapAuthTestBase):
self.assertTrue(u)
self.assertEquals(
protocol.OK,
authenticate(u, 'mail', 'password', None))
authenticate(u, 'mail', 'password', None)[0])
def test_auth_password_fail(self):
u = self.userdb.get_user('test@investici.org', 'mail', -1)
self.assertTrue(u)
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'mail', 'wrong password', None))
authenticate(u, 'mail', 'wrong password', None)[0])
class LdapOtpTest(LdapAuthTestBase):
......@@ -84,14 +84,14 @@ class LdapOtpTest(LdapAuthTestBase):
self.assertTrue(u)
self.assertEquals(
protocol.ERR_OTP_REQUIRED,
authenticate(u, 'account', 'password', None))
authenticate(u, 'account', 'password', None)[0])
def test_auth_bad_password_requires_otp(self):
u = self.userdb.get_user('test@investici.org', 'account', -1)
self.assertTrue(u)
self.assertEquals(
protocol.ERR_OTP_REQUIRED,
authenticate(u, 'account', 'wrong password', None))
authenticate(u, 'account', 'wrong password', None)[0])
def test_auth_otp_ok(self):
u = self.userdb.get_user('test@investici.org', 'account', -1)
......@@ -100,7 +100,7 @@ class LdapOtpTest(LdapAuthTestBase):
token = totp(secret, format='dec6', period=30)
self.assertEquals(
protocol.OK,
authenticate(u, 'account', 'password', str(token)))
authenticate(u, 'account', 'password', str(token))[0])
def test_auth_otp_ok_bad_password(self):
u = self.userdb.get_user('test@investici.org', 'account', -1)
......@@ -109,14 +109,14 @@ class LdapOtpTest(LdapAuthTestBase):
token = totp(secret, format='dec6', period=30)
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'account', 'wrong password', str(token)))
authenticate(u, 'account', 'wrong password', str(token))[0])
def test_auth_bad_otp(self):
u = self.userdb.get_user('test@investici.org', 'account', -1)
self.assertTrue(u)
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'account', 'password', '123456'))
authenticate(u, 'account', 'password', '123456')[0])
class LdapASPTest(LdapAuthTestBase):
......@@ -130,18 +130,18 @@ class LdapASPTest(LdapAuthTestBase):
self.assertTrue(u)
self.assertEquals(
protocol.OK,
authenticate(u, 'mail', 'veryspecificpassword', None))
authenticate(u, 'mail', 'veryspecificpassword', None)[0])
def test_plain_password_fails(self):
u = self.userdb.get_user('test@investici.org', 'mail', -1)
self.assertTrue(u)
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'mail', 'password', None))
authenticate(u, 'mail', 'password', None)[0])
def test_plain_password_and_otp_fails(self):
u = self.userdb.get_user('test@investici.org', 'mail', -1)
self.assertTrue(u)
self.assertEquals(
protocol.ERR_AUTHENTICATION_FAILURE,
authenticate(u, 'mail', 'password', '123456'))
authenticate(u, 'mail', 'password', '123456')[0])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment