Skip to content
Snippets Groups Projects
Commit 84dc50d6 authored by ale's avatar ale
Browse files

embed python-oath

parent eade8ade
No related branches found
No related tags found
No related merge requests found
import crypt
from oath import accept_totp
from authserv.oath import accept_totp
from authserv import protocol
......
oath-1.1 from https://github.com/bdauvergne/python-oath
from totp import *
from hotp import *
from ocra import *
VERSION = '1.0'
'''
Google Authenticator API
------------------------
Google Authenticator is based on HOTP and TOTP. It provides a simple way of
provisionning an OTP generator through a new URL scheme.
This module provides parsing and high-level API over the classic HOTP and TOTP
APIs provided by the oath.hotp and oath.totp modules.
'''
import re
import urlparse
import base64
import hashlib
from . import hotp
from . import totp
__ALL__ = ('GoogleAuthenticator',)
otpauth_re = re.compile(r'^otpauth://(?P<type>\w+)'
r'/(?P<labe>[^?]+)'
r'\?(?P<query>.*)$')
LABEL = 'label'
TYPE = 'type'
ALGORITHM = 'algorithm'
DIGITS = 'digits'
SECRET = 'secret'
COUNTER = 'counter'
PERIOD = 'period'
TOTP = 'totp'
HOTP = 'hotp'
DRIFT = 'drift'
def parse_otpauth(otpauth_uri):
m = re.match(otpauth_re, otpauth_uri)
if not m:
raise ValueError('Invalid otpauth URI', otpauth_uri)
d = m.groupdict()
query_parse = urlparse.parse_qs(d['query'])
if SECRET not in query_parse:
raise ValueError('Missing secret field in otpauth URI', otpauth_uri)
try:
d[SECRET] = base64.b32decode(query_parse[SECRET])
except TypeError:
raise ValueError('Invalid base32 encoding of the secret field in '
'otpauth URI', otpauth_uri)
if ALGORITHM in query_parse:
d[ALGORITHM] = query_parse[ALGORITHM].lower()
if d[ALGORITHM] not in ('sha1', 'sha256', 'sha512', 'md5'):
raise ValueError('Invalid value for algorithm field in otpauth '
'URI', otpauth_uri)
else:
d[ALGORITHM] = 'sha1'
try:
d[ALGORITHM] = getattr(hashlib, d[ALGORITHM])
except AttributeError:
raise ValueError('Unsupported algorithm %s in othauth URI' %
d[ALGORITHM], otpauth_uri)
for key in (DIGITS, PERIOD, COUNTER):
try:
if k in query_parse:
d[k] = int(query_parse[k])
except ValueError:
raise ValueError('Invalid value for field %s in otpauth URI, must '
'be a number' % key, otpauth_uri)
if COUNTER not in d:
d[COUNTER] = 0 # what else ?
if DIGITS in d:
if d[DIGITS] not in (6,8):
raise ValueError('Invalid value for field digits in othauth URI, it '
'must 6 or 8', otpauth_uri)
else:
d[DIGITS] = 6
if d[TYPE] == HOTP and COUNTER not in d:
raise ValueError('Missing field counter in otpauth URI, it is '
'mandatory with the hotp type', otpauth_uri)
if d[TYPE] == TOTP and PERIOD not in d:
d[PERIOD] = 30
return d
class GoogleAuthenticator(object):
def __init__(otpauth_uri, state=None):
self.otpauth_uri = otpauth_uri
self.parsed_otpauth_uri = parse_otpauth(otpauth_uri)
self.generator_state = state or {}
self.acceptor_state = state or {}
def generate(self):
format = 'dec%s' % self.parsed_otpauth_uri[DIGITS]
hash = self.parsed_otpauth_uri[ALGORITHM]
secret = self.parsed_otpauth_uri[SECRET]
state = self.generator_state
if self.parsed_otpauth_uri[TYPE] == HOTP:
if COUNTER not in state:
state[COUNTER] = self.parsed_otpauth_uri[COUNTER]
otp = hotp.hotp(secret, state[COUNTER], format=format,
hash=hash)
state[COUNTER] += 1
return otp
elif self.parsed_otpauth_uri[TYPE] == TOTP:
period = 'dec%s' % self.parsed_otpauth_uri[PERIOD]
return hotp.totp(self.secret, format=format, period=period,
hash=hash)
else:
raise NotImplemented(self.parsed_otpauth_uri[TYPE])
def accept(self, otp, hotp_drift=3, forward_drift=None,
hotp_backward_drift=0, totp_forward_drift=1,
totp_backward_drift=1, t=None):
format = 'dec%s' % self.parsed_otpauth_uri[DIGITS]
hash = self.parsed_otpauth_uri[ALGORITHM]
secret = self.parsed_otpauth_uri[SECRET]
state = self.acceptor_state
if self.parsed_otpauth_uri[TYPE] == HOTP:
if COUNTER not in state:
state[COUNTER] = self.parsed_otpauth_uri[COUNTER]
ok, state[COUNTER] = hotp.accept_hotp(otp, secret,
state[COUNTER], format=format, hash=hash,
drift=hotp_drift,
backward_drift=hotp_backward_drift)
return ok
elif self.parsed_otpauth_uri[TYPE] == TOTP:
period = 'dec%s' % self.parsed_otpauth_uri[PERIOD]
if DRIFT not in state:
state[DRIFT] = 0
ok, state[DRIFT] = totp.accept_totp(secret, otp, format=format,
period=period, forward_drift=totp_forward_drift,
backward_drift=totp_backward_drift, drift=state[DRIFT],
t=t)
return ok
else:
raise NotImplemented(self.parsed_otpauth_uri[TYPE])
import hashlib
import hmac
import binascii
'''
HOTP implementation
To compute an HOTP one-time-password:
>>> hotp(key, counter)
where is the hotp is a key given as an hexadecimal string and counter is an
integer. The counter value must be kept synchronized on the server and the
client side.
See also http://tools.ietf.org/html/rfc4226
'''
__ALL__ = ( 'hotp', 'accept_hotp' )
def truncated_value(h):
bytes = map(ord, h)
offset = bytes[-1] & 0xf
v = (bytes[offset] & 0x7f) << 24 | (bytes[offset+1] & 0xff) << 16 | \
(bytes[offset+2] & 0xff) << 8 | (bytes[offset+3] & 0xff)
return v
def dec(h,p):
v = str(truncated_value(h))
return v[len(v)-p:]
def int2beint64(i):
hex_counter = hex(long(i))[2:-1]
hex_counter = '0' * (16 - len(hex_counter)) + hex_counter
bin_counter = binascii.unhexlify(hex_counter)
return bin_counter
def __hotp(key, counter, hash=hashlib.sha1):
bin_counter = int2beint64(counter)
bin_key = binascii.unhexlify(key)
return hmac.new(bin_key, bin_counter, hash).digest()
def hotp(key,counter,format='dec6',hash=hashlib.sha1):
'''
Compute a HOTP value as prescribed by RFC4226
:param key:
the HOTP secret key given as an hexadecimal string
:param counter:
the OTP generation counter
:param format:
the output format, can be:
- hex, for a variable length hexadecimal format,
- hex-notrunc, for a 40 characters hexadecimal non-truncated format,
- dec4, for a 4 characters decimal format,
- dec6,
- dec7, or
- dec8
it defaults to dec6.
:param hash:
the hash module (usually from the hashlib package) to use,
it defaults to hashlib.sha1.
:returns:
a string representation of the OTP value (as instructed by the format parameter).
Examples:
>>> hotp('343434', 2, format='dec6')
'791903'
'''
bin_hotp = __hotp(key, counter, hash)
if format == 'dec4':
return dec(bin_hotp, 4)
elif format == 'dec6':
return dec(bin_hotp, 6)
elif format == 'dec7':
return dec(bin_hotp, 7)
elif format == 'dec8':
return dec(bin_hotp, 8)
elif format == 'hex':
return hex(truncated_value(bin_hotp))[2:]
elif format == 'hex-notrunc':
return binascii.hexlify(bin_hotp)
elif format == 'bin':
return bin_hotp
elif format == 'dec':
return str(truncated_value(bin_hotp))
else:
raise ValueError('unknown format')
def accept_hotp(key, response, counter, format='dec6', hash=hashlib.sha1,
drift=3, backward_drift=0):
'''
Validate a HOTP value inside a window of
[counter-backward_drift:counter+forward_drift]
:param key:
the shared secret
:type key:
hexadecimal string of even length
:param response:
the OTP to check
:type response:
ASCII string
:param counter:
value of the counter running inside an HOTP token, usually it is
just the count of HOTP value accepted so far for a given shared
secret; see the specifications of HOTP for more details;
:param format:
the output format, can be:
- hex40, for a 40 characters hexadecimal format,
- dec4, for a 4 characters decimal format,
- dec6,
- dec7, or
- dec8
it defaults to dec6.
:param hash:
the hash module (usually from the hashlib package) to use,
it defaults to hashlib.sha1.
:param drift:
how far we can look forward from the current value of the counter
:param backward_drift:
how far we can look backward from the current counter value to
match the response, default to zero as it is usually a bad idea to
look backward as the counter is only advanced when a valid value is
checked (and so the counter on the token side should have been
incremented too)
:returns:
a pair of a boolean and an integer:
- first is True if the response is validated and False otherwise,
- second is the new value for the counter; it can be more than
counter + 1 if the drift window was used; you must store it if
the response was validated.
>>> accept_hotp('343434', '122323', 2, format='dec6')
(False, 2)
>>> hotp('343434', 2, format='dec6')
'791903'
>>> accept_hotp('343434', '791903', 2, format='dec6')
(True, 3)
>>> hotp('343434', 3, format='dec6')
'907279'
>>> accept_hotp('343434', '907279', 2, format='dec6')
(True, 4)
'''
for i in range(-backward_drift, drift+1):
if hotp(key, counter+i, format=format, hash=hash) == str(response):
return True, counter+i+1
return False,counter
import hmac
import hashlib
import re
import hotp
import random
import string
'''
Implementation of OCRA
See also http://tools.ietf.org/html/draft-mraihi-mutual-oath-hotp-variants-14
'''
__ALL__ = ('str2ocrasuite', 'StateException', 'OCRAChallengeResponseServer',
'OCRAChallengeResponseClient', 'OCRAMutualChallengeResponseServer',
'OCRAMutualChallengeResponseClient')
def is_int(v):
try:
int(v)
return True
except ValueError:
return False
# Constants
PERIODS = { 'H': 3600, 'M': 60, 'S': 1 }
HOTP = 'HOTP'
OCRA_1 = 'OCRA-1'
class CryptoFunction(object):
'''Represents an OCRA CryptoFunction specification.
:attribute hash_algo:
an object implementing the digest interface as given by PEP 247 and
the hashlib package
:attribute truncation_length:
the length to truncate the decimal representation, can be None, in
this case no truncation is done.
'''
def __init__(self, hash_algo, truncation_length):
assert hash_algo
assert is_int(truncation_length) or truncation_length is None
self.hash_algo = hash_algo
self.truncation_length = truncation_length
def __call__(self, key, data_input):
'''Compute an HOTP digest using the given key and data input and
following the current crypto function description.
:param key:
a byte string containing the HMAC key
:param data_input:
the data input assembled as a byte-string as described by the
OCRA specification
:returns:
the computed digest
:rtype: str
'''
h = hmac.new(key, data_input, self.hash_algo).digest()
if self.truncation_length:
return hotp.dec(h, self.truncation_length)
else:
return str(hotp.truncated_value(h))
def __str__(self):
'''Return the standard representation for the given crypto function.
'''
return 'HOTP-%s-%s' % (self.hash_algo.__name__, self.truncation_length)
def str2hashalgo(description):
'''Convert the name of a hash algorithm as described in the OATH
specifications, to a python object handling the digest algorithm
interface, PEP-xxx.
:param description
the name of the hash algorithm, example
:rtype: a hash algorithm class constructor
'''
algo = getattr(hashlib, description.lower(), None)
if not callable(algo):
raise ValueError, ('Unknown hash algorithm', s[1])
return algo
def str2cryptofunction(crypto_function_description):
'''
Convert an OCRA crypto function description into a CryptoFunction
instance
:param crypto_function_description:
:returns:
the CryptoFunction object
:rtype: CryptoFunction
'''
s = crypto_function_description.split('-')
if len(s) != 3:
raise ValueError, 'CryptoFunction description must be triplet separated by -'
if s[0] != HOTP:
raise ValueError, ('Unknown CryptoFunction kind', s[0])
algo = str2hashalgo(s[1])
try:
truncation_length = int(s[2])
if truncation_length < 0 or truncation_length > 10:
raise ValueError
except ValueError:
raise ValueError, ('Invalid truncation length', s[2])
return CryptoFunction(algo, truncation_length)
class DataInput:
'''
OCRA data input description
By calling this instance of this class and giving the needed parameter
corrresponding to the data input description, it compute a binary string
to give to the HMAC algorithme implemented by a CryptoFunction object
'''
__slots__ = [ 'ocrasuite', 'C', 'Q', 'P', 'S', 'T' ]
def __init__(self, C=None, Q=None, P=None, S=None, T=None):
self.C = C
self.Q = Q
self.P = P
self.S = S
self.T = T
def __call__(self, C=None, Q=None, P=None, P_digest=None, S=None, T=None,
T_precomputed=None, Qsc=None):
datainput = ''
if self.C:
try:
C = int(C)
if C < 0 or C > 2**64:
raise Exception()
except:
raise ValueError, ('Invalid counter value', C)
datainput += hotp.int2beint64(int(C))
if self.Q:
max_length = self.Q[1]
if Qsc is not None:
# Mutual Challenge-Response
Q = Qsc
max_length *= 2
if Q is None or not isinstance(Q, str) or len(Q) > max_length:
raise ValueError, 'challenge'
if self.Q[0] == 'N' and not Q.isdigit():
raise ValueError, 'challenge'
if self.Q[0] == 'A' and not Q.isalnum():
raise ValueError, 'challenge'
if self.Q[0] == 'H':
try:
int(Q, 16)
except ValueError:
raise ValueError, 'challenge'
if self.Q[0] == 'N':
Q = hex(int(Q))[2:]
Q += '0' * (len(Q) % 2)
Q = Q.decode('hex')
if self.Q[0] == 'A':
pass
if self.Q[0] == 'H':
Q = Q.decode('hex')
datainput += Q
datainput += '\0' * (128-len(Q))
if self.P:
if P_digest:
if len(P) == self.P.digest_size:
datainput += P_digest
elif len(P) == 2*self.P.digest_size:
datainput += P_digest.decode('hex')
else:
raise ValueError, ('Pin/Password digest invalid', P_digest)
elif P is None:
raise ValueError, 'Pin/Password missing'
else:
datainput += self.P(P).digest()
if self.S:
if S is None or len(S) != self.S:
raise ValueError, 'session'
datainput += S
if self.T:
if is_int(T_precomputed):
datainput += hotp.int2beint64(int(T_precomputed))
elif is_int(T):
datainput += hotp.int2beint64(int(T / self.T))
else:
raise ValueError, 'timestamp'
return datainput
def __str__(self):
return self.ocrasuite
def str2datainput(datainput_description):
elements = datainput_description.split('-')
datainputs = {}
for element in elements:
letter = element[0]
if letter in datainputs:
raise ValueError, ('DataInput alreadu present %s', element, datainput_description)
if letter == 'C':
datainputs['C'] = 1
elif letter == 'Q':
if len(element) == 1:
datainputs['Q'] = ('N',8)
else:
second_letter = element[1]
try:
if second_letter not in 'ANH':
raise ValueError
length = int(element[2:])
if length < 4 or length > 64:
raise ValueError
except ValueError:
raise ValueError, ('Invalid challenge descriptor', element)
datainputs['Q'] = (second_letter, length)
elif letter == 'P':
algo = str2hashalgo(element[1:] or 'SHA1')
datainputs['P'] = algo
elif letter == 'S':
length = 64
if element[1:]:
try:
length = int(element[1:])
except ValueError:
raise ValueError, ('Invalid session data descriptor', element)
datainputs['S'] = length
elif letter == 'T':
complement = element[1:] or '1M'
try:
length = 0
if not re.match('^(\d+[HMS])+$', complement):
raise ValueError
parts = re.findall('\d+[HMS]', complement)
for part in parts:
period = part[-1]
quantity = int(part[:-1])
length += quantity * PERIODS[period]
datainputs['T'] = length
except ValueError:
raise ValueError, ('Invalid timestamp descriptor', element)
else:
raise ValueError, ('Invalid datainput descriptor', element)
return DataInput(**datainputs)
class OcraSuite(object):
def __init__(self, ocrasuite_description, crypto_function, data_input):
self.ocrasuite_description = ocrasuite_description
self.crypto_function = crypto_function
self.data_input = data_input
def __call__(self, key, **kwargs):
data_input = self.ocrasuite_description + '\0' \
+ self.data_input(**kwargs)
return self.crypto_function(key, data_input)
def accept(self, response, key, **kwargs):
return str(response) == self(key, **kwargs)
def __str__(self):
return '<OcraSuite crypto_function:%s data_input:%s>' % (self.crypto_function,
self.data_input)
def str2ocrasuite(ocrasuite_description):
elements = ocrasuite_description.split(':')
if len(elements) != 3:
raise ValueError, ('Bad OcraSuite description', ocrasuite_description)
if elements[0] != OCRA_1:
raise ValueError, ('Unsupported OCRA identifier', elements[0])
crypto_function = str2cryptofunction(elements[1])
data_input = str2datainput(elements[2])
return OcraSuite(ocrasuite_description, crypto_function, data_input)
class StateException(Exception):
pass
DEFAULT_LENGTH = 20
class OCRAChallengeResponse(object):
state = 1
def __init__(self, key, ocrasuite_description, remote_ocrasuite_description=None):
self.key = key
self.ocrasuite = str2ocrasuite(ocrasuite_description)
self.remote_ocrasuite = remote_ocrasuite_description is not None \
and str2ocrasuite(remote_ocrasuite_description)
if not self.ocrasuite.data_input.Q:
raise ValueError, ('Ocrasuite must have a Q descriptor',)
def compute_challenge(Q):
kind, length = Q
r = xrange(0, length)
if kind == 'N':
c = ''.join([random.choice(string.digits) for i in r])
elif kind == 'A':
alphabet = string.digits + string.letters
c = ''.join([random.choice(alphabet) for i in r])
elif kind == 'H':
c = ''.join([random.choice(string.hexdigits) for i in r])
else:
raise ValueError, ('Q kind is unknown:', kind)
return c
class OCRAChallengeResponseServer(OCRAChallengeResponse):
SERVER_STATE_COMPUTE_CHALLENGE = 1
SERVER_STATE_VERIFY_RESPONSE = 2
SERVER_STATE_FINISHED = 3
def compute_challenge(self):
if self.state != self.SERVER_STATE_COMPUTE_CHALLENGE:
raise StateException()
ocrasuite = self.remote_ocrasuite or self.ocrasuite
self.challenge = compute_challenge(ocrasuite.data_input.Q)
self.state = self.SERVER_STATE_VERIFY_RESPONSE
return self.challenge
def verify_response(self, response, **kwargs):
if self.state != self.SERVER_STATE_VERIFY_RESPONSE:
return StateException()
ocrasuite = self.remote_ocrasuite or self.ocrasuite
c = ocrasuite(self.key, Q=self.challenge, **kwargs) == response
if c:
self.state = self.SERVER_STATE_FINISHED
return c
class OCRAChallengeResponseClient(OCRAChallengeResponse):
def compute_response(self, challenge, **kwargs):
return self.ocrasuite(self.key, Q=self.challenge, **kwargs)
class OCRAMutualChallengeResponseClient(OCRAChallengeResponse):
CLIENT_STATE_COMPUTE_CLIENT_CHALLENGE = 1
CLIENT_STATE_VERIFY_SERVER_RESPONSE = 2
CLIENT_STATE_COMPUTE_CLIENT_RESPONSE = 3
CLIENT_STATE_FINISHED = 4
def compute_client_challenge(self, Qc=None):
if self.state != self.CLIENT_STATE_COMPUTE_CLIENT_CHALLENGE:
raise StateException()
ocrasuite = self.remote_ocrasuite or self.ocrasuite
self.client_challenge = Qc or compute_challenge(ocrasuite.data_input.Q)
self.state = self.CLIENT_STATE_VERIFY_SERVER_RESPONSE
return self.client_challenge
def verify_server_response(self, response, challenge, **kwargs):
if self.state != self.CLIENT_STATE_VERIFY_SERVER_RESPONSE:
return StateException()
self.server_challenge = challenge
q = self.client_challenge+self.server_challenge
ocrasuite = self.remote_ocrasuite or self.ocrasuite
c = ocrasuite(self.key, Qsc=q, **kwargs) == response
if c:
self.state = self.CLIENT_STATE_COMPUTE_CLIENT_RESPONSE
return c
def compute_client_response(self, **kwargs):
if self.state != self.CLIENT_STATE_COMPUTE_CLIENT_RESPONSE:
return StateException()
q = self.server_challenge+self.client_challenge
rc = self.ocrasuite(self.key, Qsc=q, **kwargs)
self.state = self.CLIENT_STATE_FINISHED
return rc
class OCRAMutualChallengeResponseServer(OCRAChallengeResponse):
SERVER_STATE_COMPUTE_SERVER_RESPONSE = 1
SERVER_STATE_VERIFY_CLIENT_RESPONSE = 2
SERVER_STATE_FINISHED = 3
def compute_server_response(self, challenge, Qs=None, **kwargs):
if self.state != self.SERVER_STATE_COMPUTE_SERVER_RESPONSE:
raise StateException()
self.client_challenge = challenge
self.server_challenge = Qs or compute_challenge(self.ocrasuite.data_input.Q)
q = self.client_challenge+self.server_challenge
# no need for pin with server mode
kwargs.pop('P', None)
kwargs.pop('P_digest', None)
rs = self.ocrasuite(self.key, Qsc=q, **kwargs)
self.state = self.SERVER_STATE_VERIFY_CLIENT_RESPONSE
return rs, self.server_challenge
def verify_client_response(self, response, **kwargs):
if self.state != self.SERVER_STATE_VERIFY_CLIENT_RESPONSE:
raise StateException()
q = self.server_challenge+self.client_challenge
ocrasuite = self.remote_ocrasuite or self.ocrasuite
c = ocrasuite(self.key, Qsc=q, **kwargs) == response
if c:
self.state = self.SERVER_STATE_FINISHED
return c
import time
import hashlib
import datetime
import calendar
'''
:mod:`totp` -- RFC6238 - OATH TOTP implementation
=================================================
.. module:: parrot
:platform: any
:synosis: implement a time indexed one-time password algorithm based on a HMAC crypto function as specified in RFC6238
.. moduleauthor:: Benjamin Dauvergne <benjamin.dauvergne@gmail.com>
'''
from hotp import hotp
__ALL__ = ('totp', 'accept_totp')
def totp(key, format='dec6', period=30, t=None, hash=hashlib.sha1):
'''
Compute a TOTP value as prescribed by OATH specifications.
:param key:
the TOTP key given as an hexadecimal string
:param format:
the output format, can be:
- hex40, for a 40 characters hexadecimal format,
- dec4, for a 4 characters decimal format,
- dec6,
- dec7, or
- dec8
it default to dec6.
:param period:
a positive integer giving the period between changes of the OTP
value, as seconds, it defaults to 30.
:param t:
a positive integer giving the current time as seconds since EPOCH
(1st January 1970 at 00:00 GMT), if None we use time.time(); it
defaults to None;
:param hash:
the hash module (usually from the hashlib package) to use,
it defaults to hashlib.sha1.
:returns:
a string representation of the OTP value (as instructed by the format parameter).
:type: str
'''
if t is None:
t = int(time.time())
else:
if isinstance(t, datetime.datetime):
t = calendar.timegm(t.utctimetuple())
else:
t = int(t)
T = int(t/period)
return hotp(key, T, format=format, hash=hash)
def accept_totp(key, response, format='dec6', period=30, t=None,
hash=hashlib.sha1, forward_drift=1, backward_drift=1, drift=0):
'''
Validate a TOTP value inside a window of
[drift-bacward_drift:drift+forward_drift] of time steps.
Where drift is the drift obtained during the last call to accept_totp.
:param response:
a string representing the OTP to check, its format should correspond
to the format parameter (it's not mandatory, it is part of the
checks),
:param key:
the TOTP key given as an hexadecimal string
:param format:
the output format, can be:
- hex40, for a 40 characters hexadecimal format,
- dec4, for a 4 characters decimal format,
- dec6,
- dec7, or
- dec8
it default to dec6.
:param period:
a positive integer giving the period between changes of the OTP
value, as seconds, it defaults to 30.
:param t:
a positive integer giving the current time as seconds since EPOCH
(1st January 1970 at 00:00 GMT), if None we use time.time(); it
defaults to None;
:param hash:
the hash module (usually from the hashlib package) to use,
it defaults to hashlib.sha1.
:param forward_drift:
how much we accept the client clock to advance, as a number of
periods, i.e. if the period is 30 seconds, a forward_drift of 2,
allows at most a clock a drift of 90 seconds;
Schema:
.___ Current time
|
0 v + 30s +60s +90s
[ current_period | period+1 | period+2 [
it defaults to 1.
:param backward_drift:
how much we accept the client clock to backstep; it defaults to 1.
:param drift:
an absolute drift of the local clock to the client clock; use it to
keep track of an augmenting drift with a client without augmenting
the size of the window given by forward_drift and backward_dript; it
defaults to 0, you should usually give as value the last value
returned by accept_totp for this client (read further).
:returns:
a pair (v,d) where v is a boolean giving the result, and d the
needed drift to validate the value. The drift value should be saved
relative to the current client. This saved value SHOULD be used in
later calls to accept_totp in order to accept a slowly accumulating
drift in the client token clock; on the server side you should use
reliable source of time like an NTP server.
:rtype: a two element tuple
'''
if t is None:
t = int(time.time())
for i in range(max(-divmod(t, period)[0],-backward_drift),forward_drift+1):
d = (drift+i) * period
if totp(key, format=format, period=period, hash=hash, t=t+d) == str(response):
return True, drift+i
return False, 0
from authserv.test import *
from authserv.auth import authenticate
from authserv import protocol
from oath import totp
from authserv.oath import totp
class AuthTest(unittest.TestCase):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment