diff --git a/authserv/app_common.py b/authserv/app_common.py index 9fcbfc6f20114a3d5188faa515c77651d06c18eb..c3681aa56f16cd2f95e7a695da2c39a086e37a0f 100644 --- a/authserv/app_common.py +++ b/authserv/app_common.py @@ -22,7 +22,8 @@ def check_ratelimit(request, username, source_ip): abort(503) -def do_auth(username, service, shard, password, otp_token, source_ip): +def do_auth(username, service, shard, password, otp_token, source_ip, + password_only=False): bl = AuthBlackList(current_app.config.get('BLACKLIST_COUNT', 5), current_app.config.get('BLACKLIST_PERIOD', 600), current_app.config.get('BLACKLIST_TIME', 6*3600)) @@ -40,7 +41,7 @@ def do_auth(username, service, shard, password, otp_token, source_ip): user = current_app.userdb.get_user(username, service, shard) if user: retval, errmsg = auth.authenticate( - user, service, password, otp_token, source_ip) + user, service, password, otp_token, source_ip, password_only) out_shard = user.get_shard() if shard and out_shard != shard: retval = protocol.ERR_AUTHENTICATION_FAILURE diff --git a/authserv/app_main.py b/authserv/app_main.py index 4e53de90c4f673113569380f78f4e4e6595083bf..17016878c225111a411010d173d8907365962aa8 100644 --- a/authserv/app_main.py +++ b/authserv/app_main.py @@ -51,3 +51,27 @@ def api_auth(): response.headers['Content-Type'] = 'text/plain' response.headers['Expires'] = '-1' return response + + +@app.route('/api/1/auth_pwonly', methods=('POST',)) +def api_auth_pwonly(): + username = request.form.get('username') + password = request.form.get('password') + + if not username: + abort(400) + + try: + auth_status, errmsg, unused_shard = do_auth( + username, None, None, password, None, None, + password_only=True) + except Exception, e: + app.logger.exception('Unexpected exception in auth_pwonly()') + abort(500) + + response = make_response(auth_status) + response.headers['Cache-Control'] = 'no-cache' + response.headers['Content-Type'] = 'text/plain' + response.headers['Expires'] = '-1' + return response + diff --git a/authserv/auth.py b/authserv/auth.py index 3228dacbbb2853db938d6491f891546cf0cc85ff..77e675491724fb893b120a3937c950e25784e66d 100644 --- a/authserv/auth.py +++ b/authserv/auth.py @@ -30,31 +30,33 @@ def _check_otp(totp_key, token): return protocol.ERR_AUTHENTICATION_FAILURE -def authenticate(user, service, password, otp_token, source_ip=None): +def authenticate(user, service, password, otp_token, source_ip=None, + password_only=False): if not password: return protocol.ERR_AUTHENTICATION_FAILURE, 'empty password' if isinstance(password, unicode): password = password.encode('utf-8') - # If the account has app-specific passwords, do not allow login - # with the standard password unless it comes from localhost (so - # that the old-crappy-SSO can successfully pass the password along - # from the user panel to the webmail). The localhost exception - # should be removed as soon as that system is replaced. - if user.app_specific_passwords_enabled(): - result = _check_app_specific_password( - user.get_app_specific_passwords(service), password) - if result == protocol.OK or source_ip != '127.0.0.1': - return result - - # If OTP is enabled for this account, require it along with the - # right password for a successful login. - if user.otp_enabled(): - if not otp_token: - return protocol.ERR_OTP_REQUIRED, 'otp required' - if _check_otp(user.get_totp_key(), otp_token) != protocol.OK: - return protocol.ERR_AUTHENTICATION_FAILURE, 'bad otp token' - return _check_main_password(user.get_password(), password) + if not password_only: + # If the account has app-specific passwords, do not allow login + # with the standard password unless it comes from localhost (so + # that the old-crappy-SSO can successfully pass the password along + # from the user panel to the webmail). The localhost exception + # should be removed as soon as that system is replaced. + if user.app_specific_passwords_enabled(): + result = _check_app_specific_password( + user.get_app_specific_passwords(service), password) + if result == protocol.OK or source_ip != '127.0.0.1': + return result + + # If OTP is enabled for this account, require it along with the + # right password for a successful login. + if user.otp_enabled(): + if not otp_token: + return protocol.ERR_OTP_REQUIRED, 'otp required' + if _check_otp(user.get_totp_key(), otp_token) != protocol.OK: + return protocol.ERR_AUTHENTICATION_FAILURE, 'bad otp token' + return _check_main_password(user.get_password(), password) return _check_main_password(user.get_password(), password) diff --git a/authserv/test/test_app_main.py b/authserv/test/test_app_main.py index 89c32057092d941782253af71844779e99cfb473..b54ad531f6b361281b3c3ccb6bc979bff2e5bf77 100644 --- a/authserv/test/test_app_main.py +++ b/authserv/test/test_app_main.py @@ -1,10 +1,12 @@ from authserv.test import * from authserv.ratelimit import * +from authserv.oath import totp from authserv import protocol from authserv import server from authserv import app_main URL = '/api/1/auth' +URL_PWONLY = '/api/1/auth_pwonly' class ServerTest(unittest.TestCase): @@ -15,6 +17,7 @@ class ServerTest(unittest.TestCase): return self.tick self.users = { 'user': FakeUser('user', 'pass', shard='a'), + 'otpuser': FakeUser('otpuser', 'pass', otp_key='1234'), } app = server.create_app(app_main.app, userdb=FakeUserDb(self.users), @@ -35,6 +38,16 @@ class ServerTest(unittest.TestCase): 'service': 'svc'}) self.assertEquals(protocol.OK, response.data) + def test_auth_otp_ok(self): + token = totp('1234', format='dec6', period=30) + response = self.app.post( + URL, data={ + 'username': 'otpuser', + 'password': 'pass', + 'service': 'svc', + 'otp': token}) + self.assertEquals(protocol.OK, response.data) + def test_auth_simple_fail(self): response = self.app.post( URL, data={ @@ -205,3 +218,25 @@ class ServerTest(unittest.TestCase): 'service': 'svc', 'source_ip': '127.0.0.1'}) self.assertEquals(200, response.status_code) self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_simple_ok(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'user', + 'password': 'pass'}) + self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_otp_ok(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'otpuser', + 'password': 'pass'}) + self.assertEquals(protocol.OK, response.data) + + def test_auth_pwonly_simple_fail(self): + response = self.app.post( + URL_PWONLY, data={ + 'username': 'user', + 'password': 'badpass'}) + self.assertEquals(protocol.ERR_AUTHENTICATION_FAILURE, + response.data)