diff options
author | Theo Chatzimichos <tampakrap@gmail.com> | 2013-08-21 15:00:10 -0700 |
---|---|---|
committer | Theo Chatzimichos <tampakrap@gmail.com> | 2013-08-21 15:00:10 -0700 |
commit | 891760c38dcb9241e22cea9b9e2a79e089d203a7 (patch) | |
tree | 89219895b92d8bc428ecaee2b71524c5bfaf7c9c | |
parent | Merge pull request #71 from tampakrap/tests (diff) | |
parent | Switch ciphers to output urlsafe base64. (diff) | |
download | identity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.tar.gz identity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.tar.bz2 identity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.zip |
Merge pull request #74 from mgorny/cipher-cleanup
Cipher cleanup
-rw-r--r-- | okupy/accounts/forms.py | 10 | ||||
-rw-r--r-- | okupy/accounts/models.py | 2 | ||||
-rw-r--r-- | okupy/accounts/urls.py | 2 | ||||
-rw-r--r-- | okupy/accounts/views.py | 31 | ||||
-rw-r--r-- | okupy/common/crypto.py | 71 | ||||
-rw-r--r-- | okupy/common/ldap_helpers.py | 2 | ||||
-rw-r--r-- | okupy/common/models.py | 36 | ||||
-rw-r--r-- | okupy/crypto/__init__.py | 0 | ||||
-rw-r--r-- | okupy/crypto/ciphers.py | 156 | ||||
-rw-r--r-- | okupy/crypto/codecs.py | 27 | ||||
-rw-r--r-- | okupy/crypto/models.py | 86 | ||||
-rw-r--r-- | okupy/otp/models.py | 47 | ||||
-rw-r--r-- | okupy/otp/totp/models.py | 10 | ||||
-rw-r--r-- | okupy/settings/__init__.py | 2 | ||||
-rw-r--r-- | okupy/tests/settings.py | 2 | ||||
-rw-r--r-- | okupy/tests/unit/test_cipher.py | 96 | ||||
-rw-r--r-- | okupy/tests/unit/test_login.py | 2 | ||||
-rw-r--r-- | okupy/tests/unit/test_secondary_password.py | 2 | ||||
-rw-r--r-- | okupy/tests/unit/test_signup.py | 2 |
19 files changed, 374 insertions, 212 deletions
diff --git a/okupy/accounts/forms.py b/okupy/accounts/forms.py index ae235f1..be6e1e8 100644 --- a/okupy/accounts/forms.py +++ b/okupy/accounts/forms.py @@ -3,6 +3,7 @@ from django import forms from .models import OpenID_Attributes +from ..crypto.ciphers import sessionrefcipher class LoginForm(forms.Form): @@ -22,10 +23,17 @@ class OpenIDLoginForm(LoginForm): class SSLCertLoginForm(forms.Form): - session_id = forms.CharField(max_length=200, widget=forms.HiddenInput()) + session = forms.CharField(max_length=200, widget=forms.HiddenInput()) next = forms.CharField(max_length=254, widget=forms.HiddenInput()) login_uri = forms.CharField(max_length=254, widget=forms.HiddenInput()) + def clean_session(self): + try: + return sessionrefcipher.decrypt( + self.cleaned_data['session']) + except ValueError: + raise forms.ValidationError('Invalid session id') + class OTPForm(forms.Form): otp_token = forms.CharField(max_length=10, label='OTP token:') diff --git a/okupy/accounts/models.py b/okupy/accounts/models.py index db5c0c7..3f41705 100644 --- a/okupy/accounts/models.py +++ b/okupy/accounts/models.py @@ -6,7 +6,7 @@ from ldapdb.models.fields import (CharField, IntegerField, ListField, FloatField, ACLField, DateField) import ldapdb.models -from ..common.models import EncryptedPKModel +from ..crypto.models import EncryptedPKModel class Queue(EncryptedPKModel): diff --git a/okupy/accounts/urls.py b/okupy/accounts/urls.py index 7b86357..56e56a6 100644 --- a/okupy/accounts/urls.py +++ b/okupy/accounts/urls.py @@ -12,7 +12,7 @@ accounts_urlpatterns = patterns('', url(r'^former-devlist/$', v.lists, {'acc_list': 'former-devlist'}, name="former_developers"), url(r'^foundation-members/$', v.lists, {'acc_list': 'foundation-members'}, name="foundation_members"), url(r'^signup/$', v.signup), - url(r'^activate/(?P<token>[a-zA-Z0-9]+)/$', v.activate), + url(r'^activate/(?P<token>[a-zA-Z0-9-_]+)/$', v.activate), url(r'^otp-setup/$', v.otp_setup), url(r'^otp-qrcode.png$', v.otp_qrcode), url(r'^endpoint/$', v.openid_endpoint), diff --git a/okupy/accounts/views.py b/okupy/accounts/views.py index d3b0793..103b267 100644 --- a/okupy/accounts/views.py +++ b/okupy/accounts/views.py @@ -4,7 +4,6 @@ from django.conf import settings from django.contrib import messages from django.contrib.auth import (login as _login, logout as _logout, authenticate) -from django.contrib.sessions.backends.cache import SessionStore from django.core.mail import send_mail from django.core.urlresolvers import reverse from django.db import IntegrityError @@ -35,19 +34,18 @@ from .openid_store import DjangoDBOpenIDStore from ..common.ldap_helpers import (get_bound_ldapuser, set_secondary_password, remove_secondary_password) -from ..common.crypto import cipher from ..common.decorators import strong_auth_required, anonymous_required from ..common.exceptions import OkupyError from ..common.log import log_extra_data +from ..crypto.ciphers import sessionrefcipher +from ..crypto.models import RevokedToken from ..otp import init_otp -from ..otp.models import RevokedToken from ..otp.sotp.models import SOTPDevice from ..otp.totp.models import TOTPDevice # the following two are for exceptions import openid.yadis.discover import openid.fetchers -import base64 import django_otp import io import ldap @@ -117,7 +115,7 @@ def login(request): raise OkupyError('OTP verification failed') # prevent replay attacks and race conditions - if not RevokedToken.add(request.user, token): + if not RevokedToken.add(token, request.user): raise OkupyError('OTP verification failed') dev = django_otp.match_token(request.user, token) if not dev: @@ -176,24 +174,13 @@ def login(request): ssl_auth_form = None ssl_auth_uri = None else: - if 'encrypted_id' not in request.session: - # .cache_key is a very good property since it ensures - # that the cache is actually created, and works from first - # request - session_id = request.session.cache_key - - # since it always starts with the backend module name - # and __init__() expects pure id, we can strip that - assert(session_id.startswith('django.contrib.sessions.cache')) - session_id = session_id[29:] - request.session['encrypted_id'] = base64.b64encode( - cipher.encrypt(session_id)) + encrypted_id = sessionrefcipher.encrypt(request.session) # TODO: it fails when: # 1. site is accessed via IP (auth.127.0.0.1), # 2. HTTP used on non-standard port (https://...:8000). ssl_auth_form = SSLCertLoginForm({ - 'session_id': request.session['encrypted_id'], + 'session': encrypted_id, 'next': request.build_absolute_uri(next), 'login_uri': request.build_absolute_uri(request.get_full_path()), }) @@ -221,10 +208,7 @@ def ssl_auth(request): if not ssl_auth_form.is_valid(): return HttpResponseBadRequest('400 Bad Request') - session_id = cipher.decrypt( - base64.b64decode(ssl_auth_form.cleaned_data['session_id']), - 32) - + session = ssl_auth_form.cleaned_data['session'] next_uri = ssl_auth_form.cleaned_data['login_uri'] user = authenticate(request=request) @@ -238,7 +222,6 @@ def ssl_auth(request): # so, django will always start a new session for us. we need to copy # the data to the original session and preferably flush the new one. - session = SessionStore(session_key=session_id) session.update(request.session) # always logout automatically from SSL-based auth @@ -393,7 +376,7 @@ def otp_setup(request): token = conf_form.cleaned_data['otp_token'] # prevent reusing the same token to login - if not RevokedToken.add(request.user, token): + if not RevokedToken.add(token, request.user): raise OkupyError() if not dev.verify_token(token, secret): raise OkupyError() diff --git a/okupy/common/crypto.py b/okupy/common/crypto.py deleted file mode 100644 index 2596680..0000000 --- a/okupy/common/crypto.py +++ /dev/null @@ -1,71 +0,0 @@ -# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python - -from django.conf import settings - -from Crypto.Cipher.Blowfish import BlowfishCipher -from Crypto.Hash.SHA384 import SHA384Hash - -import Crypto.Random - -import binascii -import struct - - -class OkupyCipher(object): - """ Symmetric cipher using django's SECRET_KEY. """ - - _hasher_algo = SHA384Hash - _cipher_algo = BlowfishCipher - _cipher_block_size = 8 - - def __init__(self): - hasher = self._hasher_algo() - hasher.update(settings.SECRET_KEY) - key_hash = hasher.digest() - self.cipher = self._cipher_algo(key_hash) - self.rng = Crypto.Random.new() - - def encrypt(self, data): - """ - Encrypt random-length data block padding it with random data - if necessary. - """ - - # ensure it's bytestring before we append random bits - data = bytes(data) - # minus is intentional. (-X % S) == S - (X % S) - padding = -len(data) % self._cipher_block_size - if padding: - data += self.rng.read(padding) - return self.cipher.encrypt(data) - - def decrypt(self, data, length): - """ - Decrypt the data block of given length. Removes padding if any. - """ - - return self.cipher.decrypt(data)[:length] - - -cipher = OkupyCipher() - - -class IDCipher(object): - """ - A cipher to create 'encrypted database IDs'. It is specifically fit - to encrypt an integer into constant-length hexstring. - """ - - def encrypt(self, id): - byte_id = struct.pack('!I', id) - byte_eid = cipher.encrypt(byte_id) - return binascii.b2a_hex(byte_eid) - - def decrypt(self, eid): - byte_eid = binascii.a2b_hex(eid) - byte_id = cipher.decrypt(byte_eid, 4) - id = struct.unpack('!I', byte_id)[0] - return id - - -idcipher = IDCipher() diff --git a/okupy/common/ldap_helpers.py b/okupy/common/ldap_helpers.py index 1bcfa69..4970e6a 100644 --- a/okupy/common/ldap_helpers.py +++ b/okupy/common/ldap_helpers.py @@ -4,7 +4,7 @@ from base64 import b64encode from Crypto import Random from passlib.hash import ldap_md5_crypt -from .crypto import cipher +from ..crypto.ciphers import cipher from ..accounts.models import LDAPUser diff --git a/okupy/common/models.py b/okupy/common/models.py deleted file mode 100644 index 4a7ee73..0000000 --- a/okupy/common/models.py +++ /dev/null @@ -1,36 +0,0 @@ -# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python - -from django.conf import settings -from django.db import models - -from .crypto import idcipher - - -# based on https://gist.github.com/treyhunner/735861 - -class EncryptedPKModelManager(models.Manager): - def get(self, *args, **kwargs): - eid = kwargs.pop('encrypted_id', None) - if eid is not None: - kwargs['id'] = idcipher.decrypt(eid) - return super(EncryptedPKModelManager, self).get(*args, **kwargs) - - -class EncryptedPKModel(models.Model): - """ - A model with built-in identifier encryption (for secure tokens). - """ - - objects = EncryptedPKModelManager() - - @property - def encrypted_id(self): - """ - The object identifier encrypted using IDCipher, as a hex-string. - """ - if self.id is None: - return None - return idcipher.encrypt(self.id) - - class Meta: - abstract = True diff --git a/okupy/crypto/__init__.py b/okupy/crypto/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/okupy/crypto/__init__.py diff --git a/okupy/crypto/ciphers.py b/okupy/crypto/ciphers.py new file mode 100644 index 0000000..667d8d1 --- /dev/null +++ b/okupy/crypto/ciphers.py @@ -0,0 +1,156 @@ +# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python + +from django.conf import settings +from django.contrib.sessions.backends.cache import SessionStore + +from Crypto.Cipher.AES import AESCipher +from Crypto.Hash.SHA256 import SHA256Hash + +import Crypto.Random + +import struct + +from .codecs import ub64encode, ub64decode + + +class OkupyCipher(object): + """ Symmetric cipher using django's SECRET_KEY. """ + + _hasher_algo = SHA256Hash + _cipher_algo = AESCipher + + def __init__(self): + hasher = self._hasher_algo() + hasher.update(settings.SECRET_KEY) + key_hash = hasher.digest() + self.cipher = self._cipher_algo(key_hash) + self.rng = Crypto.Random.new() + + @property + def block_size(self): + """ + Cipher's block size. + """ + return self.cipher.block_size + + def encrypt(self, data): + """ + Encrypt random-length data block padding it with random data + if necessary. + """ + + # ensure it's bytestring before we append random bits + data = bytes(data) + # minus is intentional. (-X % S) == S - (X % S) + padding = -len(data) % self.block_size + if padding: + data += self.rng.read(padding) + return self.cipher.encrypt(data) + + def decrypt(self, data, length): + """ + Decrypt the data block of given length. Removes padding if any. + """ + + if len(data) < length: + raise ValueError('Ciphertext too short for requested length') + return self.cipher.decrypt(data)[:length] + + +class IDCipher(object): + """ + A cipher to create 'encrypted database IDs'. It is specifically fit + to encrypt an integer into constant-length hexstring. + """ + + def encrypt(self, id): + byte_id = struct.pack('!I', id) + byte_eid = cipher.encrypt(byte_id) + return ub64encode(byte_eid) + + def decrypt(self, eid): + byte_eid = ub64decode(eid) + byte_id = cipher.decrypt(byte_eid, 4) + id = struct.unpack('!I', byte_id)[0] + return id + + +class SessionRefCipher(object): + """ + A cipher to provide encrypted identifiers to sessions. + + The encrypted session ID is stored in session for additional + security. Only previous encryption result may be used in decrypt(). + """ + + cache_key_prefix = 'django.contrib.sessions.cache' + session_id_length = 32 + random_prefix_bytes = 4 + ciphertext_length = session_id_length*3/4 + random_prefix_bytes + + def encrypt(self, session): + """ + Return an encrypted reference to the session. The encrypted + identifier will be stored in the session for verification + and caching. Therefore, further calls to this method will reuse + the previously cached identifier. + """ + + if 'encrypted_id' not in session: + # .cache_key is a very good property since it ensures + # that the cache is actually created, and works from first + # request + session_id = session.cache_key + + # since it always starts with the backend module name + # and __init__() expects pure id, we can strip that + assert(session_id.startswith(self.cache_key_prefix)) + session_id = session_id[len(self.cache_key_prefix):] + assert(len(session_id) == self.session_id_length) + + # now's another curious trick: session id consists + # of [a-z][0-9]. it's basically base36 but since decoding + # that is harsh, let's just treat it as base64. that's + # going to pack it into 3/4 original size, that is 24 bytes. + # then, with random prefix prepended we will fit into one + # block of ciphertext less. + session_id = ub64decode(session_id) + + data = (cipher.rng.read(self.random_prefix_bytes) + + session_id) + assert(len(data) == self.ciphertext_length) + session['encrypted_id'] = ub64encode( + cipher.encrypt(data)) + session.save() + return session['encrypted_id'] + + def decrypt(self, eid): + """ + Return the SessionStore to which the encrypted identifier is + pointing. Raises ValueError if the identifier is invalid. + """ + + try: + session_id = cipher.decrypt(ub64decode(eid), + self.ciphertext_length) + except (TypeError, ValueError): + pass + else: + session_id = session_id[self.random_prefix_bytes:] + session_id = ub64encode(session_id) + session = SessionStore(session_key=session_id) + if session.get('encrypted_id') == eid: + # circular import + from .models import RevokedToken + + # revoke to prevent replay attacks + if RevokedToken.add(eid): + del session['encrypted_id'] + session.save() + return session + raise ValueError('Invalid session id') + + +cipher = OkupyCipher() +idcipher = IDCipher() +sessionrefcipher = SessionRefCipher() diff --git a/okupy/crypto/codecs.py b/okupy/crypto/codecs.py new file mode 100644 index 0000000..7095e34 --- /dev/null +++ b/okupy/crypto/codecs.py @@ -0,0 +1,27 @@ +# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python + +import base64 + + +def ub32encode(text): + """ Encode text as unpadded base32. """ + return base64.b32encode(text).rstrip('=') + + +def ub32decode(text): + """ Decode text from unpadded base32. """ + # add missing padding if necessary + text += '=' * (-len(text) % 8) + return base64.b32decode(text, casefold=True) + + +def ub64encode(text): + """ Encode text as unpadded, url-safe base64. """ + return base64.urlsafe_b64encode(text).rstrip('=') + + +def ub64decode(text): + """ decode text from unpadded, url-safe base64. """ + # add missing padding if necessary + text += '=' * (-len(text) % 4) + return base64.urlsafe_b64decode(bytes(text)) diff --git a/okupy/crypto/models.py b/okupy/crypto/models.py new file mode 100644 index 0000000..b2eaa08 --- /dev/null +++ b/okupy/crypto/models.py @@ -0,0 +1,86 @@ +# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python + +from django.contrib.auth.models import User +from django.db import models, IntegrityError +from django.utils.timezone import now + +from .ciphers import idcipher + +from datetime import timedelta + + +# based on https://gist.github.com/treyhunner/735861 + +class EncryptedPKModelManager(models.Manager): + def get(self, *args, **kwargs): + eid = kwargs.pop('encrypted_id', None) + if eid is not None: + kwargs['id'] = idcipher.decrypt(eid) + return super(EncryptedPKModelManager, self).get(*args, **kwargs) + + +class EncryptedPKModel(models.Model): + """ + A model with built-in identifier encryption (for secure tokens). + """ + + objects = EncryptedPKModelManager() + + @property + def encrypted_id(self): + """ + The object identifier encrypted using IDCipher, as a hex-string. + """ + if self.id is None: + return None + return idcipher.encrypt(self.id) + + class Meta: + abstract = True + + +class RevokedToken(models.Model): + """ + A model that guarantees atomic token revocation. + + We can use a single table for various kinds of tokens as long + as they don't interfere (e.g. are of different length). + """ + + user = models.ForeignKey(User, db_index=False, null=True) + token = models.CharField(max_length=64) + ts = models.DateTimeField(auto_now_add=True) + + @classmethod + def cleanup(cls): + """ + Remove tokens old enough to be no longer valid. + """ + + # we use this just to enforce atomicity and prevent replay + # for SOTP, we can clean up old tokens quite fast + # (as soon as .delete() is effective) + # for TOTP, we should wait till the token drifts away + old = now() - timedelta(minutes=3) + cls.objects.filter(ts__lt=old).delete() + + @classmethod + def add(cls, token, user=None): + """ + Use and revoke the given token, for the given user. User + can be None if irrelevant. + + Returns True if the token is fine, False if it was used + already. + """ + cls.cleanup() + + t = cls(user=user, token=token) + try: + t.save() + except IntegrityError: + return False + return True + + class Meta: + unique_together = ('user', 'token') diff --git a/okupy/otp/models.py b/okupy/otp/models.py deleted file mode 100644 index a43fb4e..0000000 --- a/okupy/otp/models.py +++ /dev/null @@ -1,47 +0,0 @@ -# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python - -from django.contrib.auth.models import User -from django.db import models, IntegrityError - -from datetime import datetime, timedelta - - -class RevokedToken(models.Model): - """ A model that guarantees atomic token revocation. """ - - user = models.ForeignKey(User) - token = models.CharField(max_length=10) - ts = models.DateTimeField(auto_now_add=True) - - @classmethod - def cleanup(cls): - """ - Remove tokens old enough to be no longer valid. - """ - - # we use this just to enforce atomicity and prevent replay - # for SOTP, we can clean up old tokens quite fast - # (as soon as .delete() is effective) - # for TOTP, we should wait till the token drifts away - old = datetime.now() - timedelta(minutes=3) - cls.objects.filter(ts__lt=old).delete() - - @classmethod - def add(cls, user, token): - """ - Use and revoke the given token, for the given user. - - Returns True if the token is fine, False if it was used - already. - """ - cls.cleanup() - - t = cls(user=user, token=token) - try: - t.save() - except IntegrityError: - return False - return True - - class Meta: - unique_together = ('user', 'token') diff --git a/okupy/otp/totp/models.py b/okupy/otp/totp/models.py index a32baa6..72f5e3d 100644 --- a/okupy/otp/totp/models.py +++ b/okupy/otp/totp/models.py @@ -3,9 +3,8 @@ from django_otp import oath from django_otp.models import Device -from base64 import b32decode, b32encode - from ...accounts.models import LDAPUser +from ...crypto.codecs import ub32decode, ub32encode import Crypto.Random @@ -45,7 +44,7 @@ class TOTPDevice(Device): Returns 20-character base32 string (with padding stripped). """ rng = Crypto.Random.new() - return b32encode(rng.read(12)).rstrip('=') + return ub32encode(rng.read(12)) @staticmethod def get_uri(secret): @@ -67,10 +66,7 @@ class TOTPDevice(Device): return False secret = u.otp_secret - # add missing padding if necessary - secret += '=' * (-len(secret) % 8) - - key = b32decode(secret, casefold=True) + key = ub32decode(secret) try: token = int(token) except ValueError: diff --git a/okupy/settings/__init__.py b/okupy/settings/__init__.py index 9b59a2d..767bb22 100644 --- a/okupy/settings/__init__.py +++ b/okupy/settings/__init__.py @@ -50,7 +50,7 @@ INSTALLED_APPS = ( 'django.contrib.messages', 'django.contrib.staticfiles', 'okupy.accounts', - 'okupy.otp', + 'okupy.crypto', 'okupy.otp.sotp', 'okupy.otp.totp', ) diff --git a/okupy/tests/settings.py b/okupy/tests/settings.py index f4d0c07..ac4e9df 100644 --- a/okupy/tests/settings.py +++ b/okupy/tests/settings.py @@ -53,7 +53,7 @@ INSTALLED_APPS = ( 'django_otp', 'discover_runner', 'okupy.accounts', - 'okupy.otp', + 'okupy.crypto', 'okupy.otp.sotp', 'okupy.otp.totp', 'okupy.tests', diff --git a/okupy/tests/unit/test_cipher.py b/okupy/tests/unit/test_cipher.py index 31ac795..0589dfd 100644 --- a/okupy/tests/unit/test_cipher.py +++ b/okupy/tests/unit/test_cipher.py @@ -1,29 +1,89 @@ -#!/usr/bin/env python +# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python from Crypto import Random -from unittest import TestCase +from unittest import TestCase, SkipTest -from ...common.crypto import cipher +from django.contrib.sessions.backends.cache import SessionStore + +from ...crypto.ciphers import cipher, sessionrefcipher class OkupyCipherTests(TestCase): - def test_verify_password_less_than_8_chars(self): - hash = cipher.encrypt('test1') - self.assertEqual(cipher.decrypt(hash, 5), 'test1') + def setUp(self): + self._random_string = '123456abcdef' * int(cipher.block_size / 2) + + def test_verify_password_less_than_block_size(self): + data = self._random_string[:cipher.block_size-3] + hash = cipher.encrypt(data) + self.assertEqual(cipher.decrypt(hash, len(data)), data) - def test_verify_password_8_chars(self): - hash = cipher.encrypt('testtest') - self.assertEqual(cipher.decrypt(hash, 8), 'testtest') + def test_verify_password_exact_block_size(self): + data = self._random_string[:cipher.block_size] + hash = cipher.encrypt(data) + self.assertEqual(cipher.decrypt(hash, len(data)), data) - def test_verify_password_more_than_8_chars(self): - hash = cipher.encrypt('testtest123') - self.assertEqual(cipher.decrypt(hash, 11), 'testtest123') + def test_verify_password_more_than_block_size(self): + data = self._random_string[:cipher.block_size+3] + hash = cipher.encrypt(data) + self.assertEqual(cipher.decrypt(hash, len(data)), data) - def test_verify_password_more_than_16_chars(self): - hash = cipher.encrypt('testtest123456789012') - self.assertEqual(cipher.decrypt(hash, 20), 'testtest123456789012') + def test_verify_password_more_than_twice_block_size(self): + data = self._random_string[:cipher.block_size*2+3] + hash = cipher.encrypt(data) + self.assertEqual(cipher.decrypt(hash, len(data)), data) def test_encrypt_random_bytes(self): - password = Random.get_random_bytes(45) - hash = cipher.encrypt(password) - self.assertEqual(cipher.decrypt(hash, 45), password) + data = Random.get_random_bytes(45) + hash = cipher.encrypt(data) + self.assertEqual(cipher.decrypt(hash, len(data)), data) + + def test_ciphertext_shorter_than_req_output_raises_valueerror(self): + data = self._random_string[:cipher.block_size*2] + hash = cipher.encrypt(data)[:cipher.block_size] + self.assertRaises(ValueError, cipher.decrypt, hash, len(data)) + + def test_ciphertext_not_multiple_of_block_size_raises_valueerror(self): + data = self._random_string[:cipher.block_size/2] + hash = cipher.encrypt(data)[:cipher.block_size/2] + self.assertRaises(ValueError, cipher.decrypt, hash, len(data)) + + +class SessionRefCipherTest(TestCase): + def test_encrypt_decrypt(self): + session = SessionStore() + session['test'] = 'in-test' + session.save() + + eid = sessionrefcipher.encrypt(session) + sess = sessionrefcipher.decrypt(eid) + self.assertEqual(sess.get('test'), 'in-test') + + def test_invalid_base64_raises_valueerror(self): + data = 'Azcd^%' + self.assertRaises(ValueError, sessionrefcipher.decrypt, data) + + def test_invalid_ciphertext_raises_valueerror(self): + data = 'ZHVwYQo=' + self.assertRaises(ValueError, sessionrefcipher.decrypt, data) + + def test_unique_encrypted_are_generated_after_revocation(self): + session = SessionStore() + session['test'] = 'in-test' + session.save() + + eid1 = sessionrefcipher.encrypt(session) + session = sessionrefcipher.decrypt(eid1) + eid2 = sessionrefcipher.encrypt(session) + self.assertNotEqual(eid1, eid2) + + def test_revoked_encrypted_id_raises_valueerror(self): + session = SessionStore() + session['test'] = 'in-test' + session.save() + + eid1 = sessionrefcipher.encrypt(session) + session = sessionrefcipher.decrypt(eid1) + eid2 = sessionrefcipher.encrypt(session) + if eid1 == eid2: + raise SkipTest('Non-unique encrypted IDs generated') + self.assertRaises(ValueError, sessionrefcipher.decrypt, eid1) diff --git a/okupy/tests/unit/test_login.py b/okupy/tests/unit/test_login.py index 58c01ea..8aaf2c1 100644 --- a/okupy/tests/unit/test_login.py +++ b/okupy/tests/unit/test_login.py @@ -15,8 +15,8 @@ from mockldap import MockLdap from .. import vars from ...accounts.views import login, logout from ...accounts.forms import LoginForm -from ...common.crypto import cipher from ...common.test_helpers import OkupyTestCase, set_request, no_database, ldap_users, set_search_seed +from ...crypto.ciphers import cipher class LoginUnitTests(OkupyTestCase): diff --git a/okupy/tests/unit/test_secondary_password.py b/okupy/tests/unit/test_secondary_password.py index 8338827..baef415 100644 --- a/okupy/tests/unit/test_secondary_password.py +++ b/okupy/tests/unit/test_secondary_password.py @@ -9,9 +9,9 @@ from mockldap import MockLdap from passlib.hash import ldap_md5_crypt from .. import vars -from ...common.crypto import cipher from ...common.ldap_helpers import set_secondary_password, remove_secondary_password from ...common.test_helpers import set_request, set_search_seed, ldap_users +from ...crypto.ciphers import cipher class SecondaryPassword(TestCase): diff --git a/okupy/tests/unit/test_signup.py b/okupy/tests/unit/test_signup.py index c32869a..e1f4d2b 100644 --- a/okupy/tests/unit/test_signup.py +++ b/okupy/tests/unit/test_signup.py @@ -137,7 +137,7 @@ class SignupUnitTests(OkupyTestCase): self.assertEqual(vars.QUEUEDUSER.email, vars.SIGNUP_TESTUSER['email']) self.assertEqual(vars.QUEUEDUSER.password, vars.SIGNUP_TESTUSER['password_origin']) # note: this needs to be kept in line with used cipher - self.assertRegexpMatches(vars.QUEUEDUSER.encrypted_id, '^[a-f0-9]{16}$') + self.assertRegexpMatches(vars.QUEUEDUSER.encrypted_id, '^[a-zA-Z0-9_-]{22}$') @no_database() def test_no_database_connection_raises_error_in_signup(self): |