From 6eaea794f4e4c0b13c97413f4e04a4267304b86a Mon Sep 17 00:00:00 2001 From: ale <ale@incal.net> Date: Sun, 31 May 2015 11:10:59 +0000 Subject: [PATCH] add a method to test passwords even if OTP is enabled This method is meant to be used for auxiliary authentication for privileged operations where the user is asked for his current password (like a password change). --- authserv/app_common.py | 5 ++-- authserv/app_main.py | 24 +++++++++++++++++++ authserv/auth.py | 42 ++++++++++++++++++---------------- authserv/test/test_app_main.py | 35 ++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 22 deletions(-) diff --git a/authserv/app_common.py b/authserv/app_common.py index 9fcbfc6..c3681aa 100644 --- a/authserv/app_common.py +++ b/authserv/app_common.py @@ -22,7 +22,8 @@ def check_ratelimit(request, username, source_ip): abort(503) -def do_auth(username, service, shard, password, otp_token, source_ip): +def do_auth(username, service, shard, password, otp_token, source_ip, + password_only=False): bl = AuthBlackList(current_app.config.get('BLACKLIST_COUNT', 5), current_app.config.get('BLACKLIST_PERIOD', 600), current_app.config.get('BLACKLIST_TIME', 6*3600)) @@ -40,7 +41,7 @@ def do_auth(username, service, shard, password, otp_token, source_ip): user = current_app.userdb.get_user(username, service, shard) if user: retval, errmsg = auth.authenticate( - user, service, password, otp_token, source_ip) + user, service, password, otp_token, source_ip, password_only) out_shard = user.get_shard() if shard and out_shard != shard: retval = protocol.ERR_AUTHENTICATION_FAILURE diff --git a/authserv/app_main.py b/authserv/app_main.py index 4e53de9..1701687 100644 --- a/authserv/app_main.py +++ b/authserv/app_main.py @@ -51,3 +51,27 @@ def api_auth(): response.headers['Content-Type'] = 'text/plain' response.headers['Expires'] = '-1' return response + + +@app.route('/api/1/auth_pwonly', methods=('POST',)) +def api_auth_pwonly(): + username = request.form.get('username') + password = request.form.get('password') + + if not username: + abort(400) + + try: + auth_status, errmsg, unused_shard = do_auth( + username, None, None, password, None, None, + password_only=True) + except Exception, e: + app.logger.exception('Unexpected exception in auth_pwonly()') + abort(500) + + response = make_response(auth_status) + response.headers['Cache-Control'] = 'no-cache' + response.headers['Content-Type'] = 'text/plain' + response.headers['Expires'] = '-1' + return response + diff --git a/authserv/auth.py b/authserv/auth.py index 3228dac..77e6754 100644 --- a/authserv/auth.py +++ b/authserv/auth.py @@ -30,31 +30,33 @@ def _check_otp(totp_key, token): return protocol.ERR_AUTHENTICATION_FAILURE -def authenticate(user, service, password, otp_token, source_ip=None): +def authenticate(user, service, password, otp_token, source_ip=None, + password_only=False): if not password: return protocol.ERR_AUTHENTICATION_FAILURE, 'empty password' if isinstance(password, unicode): password = password.encode('utf-8') - # If the account has app-specific passwords, do not allow login - # with the standard password unless it comes from localhost (so - # that the old-crappy-SSO can successfully pass the password along - # from the user panel to the webmail). The localhost exception - # should be removed as soon as that system is replaced. - if user.app_specific_passwords_enabled(): - result = _check_app_specific_password( - user.get_app_specific_passwords(service), password) - if result == protocol.OK or source_ip != '127.0.0.1': - return result - - # If OTP is enabled for this account, require it along with the - # right password for a successful login. - if user.otp_enabled(): - if not otp_token: - return protocol.ERR_OTP_REQUIRED, 'otp required' - if _check_otp(user.get_totp_key(), otp_token) != protocol.OK: - return protocol.ERR_AUTHENTICATION_FAILURE, 'bad otp token' - return _check_main_password(user.get_password(), password) + if not password_only: + # If the account has app-specific passwords, do not allow login + # with the standard password unless it comes from localhost (so + # that the old-crappy-SSO can successfully pass the password along + # from the user panel to the webmail). The localhost exception + # should be removed as soon as that system is replaced. + if user.app_specific_passwords_enabled(): + result = _check_app_specific_password( + user.get_app_specific_passwords(service), password) + if result == protocol.OK or source_ip != '127.0.0.1': + return result + + # If OTP is enabled for this account, require it along with the + # right password for a successful login. + if user.otp_enabled(): + if not otp_token: + return protocol.ERR_OTP_REQUIRED, 'otp required' + if _check_otp(user.get_totp_key(), otp_token) != protocol.OK: + return protocol.ERR_AUTHENTICATION_FAILURE, 'bad otp token' + return _check_main_password(user.get_password(), password) return _check_main_password(user.get_password(), password) diff --git a/authserv/test/test_app_main.py b/authserv/test/test_app_main.py index 89c3205..b54ad53 100644 --- a/authserv/test/test_app_main.py +++ b/authserv/test/test_app_main.py @@ -1,10 +1,12 @@ from authserv.test import * from authserv.ratelimit import * +from authserv.oath import totp from authserv import protocol from authserv import server from authserv import app_main URL = '/api/1/auth' +URL_PWONLY = '/api/1/auth_pwonly' class ServerTest(unittest.TestCase): @@ -15,6 +17,7 @@ class ServerTest(unittest.TestCase): return self.tick self.users = { 'user': FakeUser('user', 'pass', shard='a'), + 'otpuser': FakeUser('otpuser', 'pass', otp_key='1234'), } app = server.create_app(app_main.app, userdb=FakeUserDb(self.users), @@ -35,6 +38,16 @@ class ServerTest(unittest.TestCase): 'service': 'svc'}) self.assertEquals(protocol.OK, response.data) + def test_auth_otp_ok(self): + token = totp('1234', format='dec6', period=30) + response = self.app.post( + URL, data={ + 'username': 'otpuser', + 'password': 'pass', + 'service': 'svc', + 'otp': token}) + self.assertEquals(protocol.OK, response.data) + def test_auth_simple_fail(self): response = self.app.post( URL, data={ @@ -205,3 +218,25 @@ class ServerTest(unittest.TestCase): 'service': 'svc', 'source_ip': '127.0.0.1'}) self.assertEquals(200, response.status_code) self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_simple_ok(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'user', + 'password': 'pass'}) + self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_otp_ok(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'otpuser', + 'password': 'pass'}) + self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_simple_fail(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'user', + 'password': 'badpass'}) + self.assertEquals(protocol.ERR_AUTHENTICATION_FAILURE, + response.data) -- GitLab