aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTheo Chatzimichos <tampakrap@gmail.com>2013-08-21 15:00:10 -0700
committerTheo Chatzimichos <tampakrap@gmail.com>2013-08-21 15:00:10 -0700
commit891760c38dcb9241e22cea9b9e2a79e089d203a7 (patch)
tree89219895b92d8bc428ecaee2b71524c5bfaf7c9c
parentMerge pull request #71 from tampakrap/tests (diff)
parentSwitch ciphers to output urlsafe base64. (diff)
downloadidentity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.tar.gz
identity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.tar.bz2
identity.gentoo.org-891760c38dcb9241e22cea9b9e2a79e089d203a7.zip
Merge pull request #74 from mgorny/cipher-cleanup
Cipher cleanup
-rw-r--r--okupy/accounts/forms.py10
-rw-r--r--okupy/accounts/models.py2
-rw-r--r--okupy/accounts/urls.py2
-rw-r--r--okupy/accounts/views.py31
-rw-r--r--okupy/common/crypto.py71
-rw-r--r--okupy/common/ldap_helpers.py2
-rw-r--r--okupy/common/models.py36
-rw-r--r--okupy/crypto/__init__.py0
-rw-r--r--okupy/crypto/ciphers.py156
-rw-r--r--okupy/crypto/codecs.py27
-rw-r--r--okupy/crypto/models.py86
-rw-r--r--okupy/otp/models.py47
-rw-r--r--okupy/otp/totp/models.py10
-rw-r--r--okupy/settings/__init__.py2
-rw-r--r--okupy/tests/settings.py2
-rw-r--r--okupy/tests/unit/test_cipher.py96
-rw-r--r--okupy/tests/unit/test_login.py2
-rw-r--r--okupy/tests/unit/test_secondary_password.py2
-rw-r--r--okupy/tests/unit/test_signup.py2
19 files changed, 374 insertions, 212 deletions
diff --git a/okupy/accounts/forms.py b/okupy/accounts/forms.py
index ae235f1..be6e1e8 100644
--- a/okupy/accounts/forms.py
+++ b/okupy/accounts/forms.py
@@ -3,6 +3,7 @@
from django import forms
from .models import OpenID_Attributes
+from ..crypto.ciphers import sessionrefcipher
class LoginForm(forms.Form):
@@ -22,10 +23,17 @@ class OpenIDLoginForm(LoginForm):
class SSLCertLoginForm(forms.Form):
- session_id = forms.CharField(max_length=200, widget=forms.HiddenInput())
+ session = forms.CharField(max_length=200, widget=forms.HiddenInput())
next = forms.CharField(max_length=254, widget=forms.HiddenInput())
login_uri = forms.CharField(max_length=254, widget=forms.HiddenInput())
+ def clean_session(self):
+ try:
+ return sessionrefcipher.decrypt(
+ self.cleaned_data['session'])
+ except ValueError:
+ raise forms.ValidationError('Invalid session id')
+
class OTPForm(forms.Form):
otp_token = forms.CharField(max_length=10, label='OTP token:')
diff --git a/okupy/accounts/models.py b/okupy/accounts/models.py
index db5c0c7..3f41705 100644
--- a/okupy/accounts/models.py
+++ b/okupy/accounts/models.py
@@ -6,7 +6,7 @@ from ldapdb.models.fields import (CharField, IntegerField, ListField,
FloatField, ACLField, DateField)
import ldapdb.models
-from ..common.models import EncryptedPKModel
+from ..crypto.models import EncryptedPKModel
class Queue(EncryptedPKModel):
diff --git a/okupy/accounts/urls.py b/okupy/accounts/urls.py
index 7b86357..56e56a6 100644
--- a/okupy/accounts/urls.py
+++ b/okupy/accounts/urls.py
@@ -12,7 +12,7 @@ accounts_urlpatterns = patterns('',
url(r'^former-devlist/$', v.lists, {'acc_list': 'former-devlist'}, name="former_developers"),
url(r'^foundation-members/$', v.lists, {'acc_list': 'foundation-members'}, name="foundation_members"),
url(r'^signup/$', v.signup),
- url(r'^activate/(?P<token>[a-zA-Z0-9]+)/$', v.activate),
+ url(r'^activate/(?P<token>[a-zA-Z0-9-_]+)/$', v.activate),
url(r'^otp-setup/$', v.otp_setup),
url(r'^otp-qrcode.png$', v.otp_qrcode),
url(r'^endpoint/$', v.openid_endpoint),
diff --git a/okupy/accounts/views.py b/okupy/accounts/views.py
index d3b0793..103b267 100644
--- a/okupy/accounts/views.py
+++ b/okupy/accounts/views.py
@@ -4,7 +4,6 @@ from django.conf import settings
from django.contrib import messages
from django.contrib.auth import (login as _login, logout as _logout,
authenticate)
-from django.contrib.sessions.backends.cache import SessionStore
from django.core.mail import send_mail
from django.core.urlresolvers import reverse
from django.db import IntegrityError
@@ -35,19 +34,18 @@ from .openid_store import DjangoDBOpenIDStore
from ..common.ldap_helpers import (get_bound_ldapuser,
set_secondary_password,
remove_secondary_password)
-from ..common.crypto import cipher
from ..common.decorators import strong_auth_required, anonymous_required
from ..common.exceptions import OkupyError
from ..common.log import log_extra_data
+from ..crypto.ciphers import sessionrefcipher
+from ..crypto.models import RevokedToken
from ..otp import init_otp
-from ..otp.models import RevokedToken
from ..otp.sotp.models import SOTPDevice
from ..otp.totp.models import TOTPDevice
# the following two are for exceptions
import openid.yadis.discover
import openid.fetchers
-import base64
import django_otp
import io
import ldap
@@ -117,7 +115,7 @@ def login(request):
raise OkupyError('OTP verification failed')
# prevent replay attacks and race conditions
- if not RevokedToken.add(request.user, token):
+ if not RevokedToken.add(token, request.user):
raise OkupyError('OTP verification failed')
dev = django_otp.match_token(request.user, token)
if not dev:
@@ -176,24 +174,13 @@ def login(request):
ssl_auth_form = None
ssl_auth_uri = None
else:
- if 'encrypted_id' not in request.session:
- # .cache_key is a very good property since it ensures
- # that the cache is actually created, and works from first
- # request
- session_id = request.session.cache_key
-
- # since it always starts with the backend module name
- # and __init__() expects pure id, we can strip that
- assert(session_id.startswith('django.contrib.sessions.cache'))
- session_id = session_id[29:]
- request.session['encrypted_id'] = base64.b64encode(
- cipher.encrypt(session_id))
+ encrypted_id = sessionrefcipher.encrypt(request.session)
# TODO: it fails when:
# 1. site is accessed via IP (auth.127.0.0.1),
# 2. HTTP used on non-standard port (https://...:8000).
ssl_auth_form = SSLCertLoginForm({
- 'session_id': request.session['encrypted_id'],
+ 'session': encrypted_id,
'next': request.build_absolute_uri(next),
'login_uri': request.build_absolute_uri(request.get_full_path()),
})
@@ -221,10 +208,7 @@ def ssl_auth(request):
if not ssl_auth_form.is_valid():
return HttpResponseBadRequest('400 Bad Request')
- session_id = cipher.decrypt(
- base64.b64decode(ssl_auth_form.cleaned_data['session_id']),
- 32)
-
+ session = ssl_auth_form.cleaned_data['session']
next_uri = ssl_auth_form.cleaned_data['login_uri']
user = authenticate(request=request)
@@ -238,7 +222,6 @@ def ssl_auth(request):
# so, django will always start a new session for us. we need to copy
# the data to the original session and preferably flush the new one.
- session = SessionStore(session_key=session_id)
session.update(request.session)
# always logout automatically from SSL-based auth
@@ -393,7 +376,7 @@ def otp_setup(request):
token = conf_form.cleaned_data['otp_token']
# prevent reusing the same token to login
- if not RevokedToken.add(request.user, token):
+ if not RevokedToken.add(token, request.user):
raise OkupyError()
if not dev.verify_token(token, secret):
raise OkupyError()
diff --git a/okupy/common/crypto.py b/okupy/common/crypto.py
deleted file mode 100644
index 2596680..0000000
--- a/okupy/common/crypto.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
-
-from django.conf import settings
-
-from Crypto.Cipher.Blowfish import BlowfishCipher
-from Crypto.Hash.SHA384 import SHA384Hash
-
-import Crypto.Random
-
-import binascii
-import struct
-
-
-class OkupyCipher(object):
- """ Symmetric cipher using django's SECRET_KEY. """
-
- _hasher_algo = SHA384Hash
- _cipher_algo = BlowfishCipher
- _cipher_block_size = 8
-
- def __init__(self):
- hasher = self._hasher_algo()
- hasher.update(settings.SECRET_KEY)
- key_hash = hasher.digest()
- self.cipher = self._cipher_algo(key_hash)
- self.rng = Crypto.Random.new()
-
- def encrypt(self, data):
- """
- Encrypt random-length data block padding it with random data
- if necessary.
- """
-
- # ensure it's bytestring before we append random bits
- data = bytes(data)
- # minus is intentional. (-X % S) == S - (X % S)
- padding = -len(data) % self._cipher_block_size
- if padding:
- data += self.rng.read(padding)
- return self.cipher.encrypt(data)
-
- def decrypt(self, data, length):
- """
- Decrypt the data block of given length. Removes padding if any.
- """
-
- return self.cipher.decrypt(data)[:length]
-
-
-cipher = OkupyCipher()
-
-
-class IDCipher(object):
- """
- A cipher to create 'encrypted database IDs'. It is specifically fit
- to encrypt an integer into constant-length hexstring.
- """
-
- def encrypt(self, id):
- byte_id = struct.pack('!I', id)
- byte_eid = cipher.encrypt(byte_id)
- return binascii.b2a_hex(byte_eid)
-
- def decrypt(self, eid):
- byte_eid = binascii.a2b_hex(eid)
- byte_id = cipher.decrypt(byte_eid, 4)
- id = struct.unpack('!I', byte_id)[0]
- return id
-
-
-idcipher = IDCipher()
diff --git a/okupy/common/ldap_helpers.py b/okupy/common/ldap_helpers.py
index 1bcfa69..4970e6a 100644
--- a/okupy/common/ldap_helpers.py
+++ b/okupy/common/ldap_helpers.py
@@ -4,7 +4,7 @@ from base64 import b64encode
from Crypto import Random
from passlib.hash import ldap_md5_crypt
-from .crypto import cipher
+from ..crypto.ciphers import cipher
from ..accounts.models import LDAPUser
diff --git a/okupy/common/models.py b/okupy/common/models.py
deleted file mode 100644
index 4a7ee73..0000000
--- a/okupy/common/models.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
-
-from django.conf import settings
-from django.db import models
-
-from .crypto import idcipher
-
-
-# based on https://gist.github.com/treyhunner/735861
-
-class EncryptedPKModelManager(models.Manager):
- def get(self, *args, **kwargs):
- eid = kwargs.pop('encrypted_id', None)
- if eid is not None:
- kwargs['id'] = idcipher.decrypt(eid)
- return super(EncryptedPKModelManager, self).get(*args, **kwargs)
-
-
-class EncryptedPKModel(models.Model):
- """
- A model with built-in identifier encryption (for secure tokens).
- """
-
- objects = EncryptedPKModelManager()
-
- @property
- def encrypted_id(self):
- """
- The object identifier encrypted using IDCipher, as a hex-string.
- """
- if self.id is None:
- return None
- return idcipher.encrypt(self.id)
-
- class Meta:
- abstract = True
diff --git a/okupy/crypto/__init__.py b/okupy/crypto/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/okupy/crypto/__init__.py
diff --git a/okupy/crypto/ciphers.py b/okupy/crypto/ciphers.py
new file mode 100644
index 0000000..667d8d1
--- /dev/null
+++ b/okupy/crypto/ciphers.py
@@ -0,0 +1,156 @@
+# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
+
+from django.conf import settings
+from django.contrib.sessions.backends.cache import SessionStore
+
+from Crypto.Cipher.AES import AESCipher
+from Crypto.Hash.SHA256 import SHA256Hash
+
+import Crypto.Random
+
+import struct
+
+from .codecs import ub64encode, ub64decode
+
+
+class OkupyCipher(object):
+ """ Symmetric cipher using django's SECRET_KEY. """
+
+ _hasher_algo = SHA256Hash
+ _cipher_algo = AESCipher
+
+ def __init__(self):
+ hasher = self._hasher_algo()
+ hasher.update(settings.SECRET_KEY)
+ key_hash = hasher.digest()
+ self.cipher = self._cipher_algo(key_hash)
+ self.rng = Crypto.Random.new()
+
+ @property
+ def block_size(self):
+ """
+ Cipher's block size.
+ """
+ return self.cipher.block_size
+
+ def encrypt(self, data):
+ """
+ Encrypt random-length data block padding it with random data
+ if necessary.
+ """
+
+ # ensure it's bytestring before we append random bits
+ data = bytes(data)
+ # minus is intentional. (-X % S) == S - (X % S)
+ padding = -len(data) % self.block_size
+ if padding:
+ data += self.rng.read(padding)
+ return self.cipher.encrypt(data)
+
+ def decrypt(self, data, length):
+ """
+ Decrypt the data block of given length. Removes padding if any.
+ """
+
+ if len(data) < length:
+ raise ValueError('Ciphertext too short for requested length')
+ return self.cipher.decrypt(data)[:length]
+
+
+class IDCipher(object):
+ """
+ A cipher to create 'encrypted database IDs'. It is specifically fit
+ to encrypt an integer into constant-length hexstring.
+ """
+
+ def encrypt(self, id):
+ byte_id = struct.pack('!I', id)
+ byte_eid = cipher.encrypt(byte_id)
+ return ub64encode(byte_eid)
+
+ def decrypt(self, eid):
+ byte_eid = ub64decode(eid)
+ byte_id = cipher.decrypt(byte_eid, 4)
+ id = struct.unpack('!I', byte_id)[0]
+ return id
+
+
+class SessionRefCipher(object):
+ """
+ A cipher to provide encrypted identifiers to sessions.
+
+ The encrypted session ID is stored in session for additional
+ security. Only previous encryption result may be used in decrypt().
+ """
+
+ cache_key_prefix = 'django.contrib.sessions.cache'
+ session_id_length = 32
+ random_prefix_bytes = 4
+ ciphertext_length = session_id_length*3/4 + random_prefix_bytes
+
+ def encrypt(self, session):
+ """
+ Return an encrypted reference to the session. The encrypted
+ identifier will be stored in the session for verification
+ and caching. Therefore, further calls to this method will reuse
+ the previously cached identifier.
+ """
+
+ if 'encrypted_id' not in session:
+ # .cache_key is a very good property since it ensures
+ # that the cache is actually created, and works from first
+ # request
+ session_id = session.cache_key
+
+ # since it always starts with the backend module name
+ # and __init__() expects pure id, we can strip that
+ assert(session_id.startswith(self.cache_key_prefix))
+ session_id = session_id[len(self.cache_key_prefix):]
+ assert(len(session_id) == self.session_id_length)
+
+ # now's another curious trick: session id consists
+ # of [a-z][0-9]. it's basically base36 but since decoding
+ # that is harsh, let's just treat it as base64. that's
+ # going to pack it into 3/4 original size, that is 24 bytes.
+ # then, with random prefix prepended we will fit into one
+ # block of ciphertext less.
+ session_id = ub64decode(session_id)
+
+ data = (cipher.rng.read(self.random_prefix_bytes)
+ + session_id)
+ assert(len(data) == self.ciphertext_length)
+ session['encrypted_id'] = ub64encode(
+ cipher.encrypt(data))
+ session.save()
+ return session['encrypted_id']
+
+ def decrypt(self, eid):
+ """
+ Return the SessionStore to which the encrypted identifier is
+ pointing. Raises ValueError if the identifier is invalid.
+ """
+
+ try:
+ session_id = cipher.decrypt(ub64decode(eid),
+ self.ciphertext_length)
+ except (TypeError, ValueError):
+ pass
+ else:
+ session_id = session_id[self.random_prefix_bytes:]
+ session_id = ub64encode(session_id)
+ session = SessionStore(session_key=session_id)
+ if session.get('encrypted_id') == eid:
+ # circular import
+ from .models import RevokedToken
+
+ # revoke to prevent replay attacks
+ if RevokedToken.add(eid):
+ del session['encrypted_id']
+ session.save()
+ return session
+ raise ValueError('Invalid session id')
+
+
+cipher = OkupyCipher()
+idcipher = IDCipher()
+sessionrefcipher = SessionRefCipher()
diff --git a/okupy/crypto/codecs.py b/okupy/crypto/codecs.py
new file mode 100644
index 0000000..7095e34
--- /dev/null
+++ b/okupy/crypto/codecs.py
@@ -0,0 +1,27 @@
+# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
+
+import base64
+
+
+def ub32encode(text):
+ """ Encode text as unpadded base32. """
+ return base64.b32encode(text).rstrip('=')
+
+
+def ub32decode(text):
+ """ Decode text from unpadded base32. """
+ # add missing padding if necessary
+ text += '=' * (-len(text) % 8)
+ return base64.b32decode(text, casefold=True)
+
+
+def ub64encode(text):
+ """ Encode text as unpadded, url-safe base64. """
+ return base64.urlsafe_b64encode(text).rstrip('=')
+
+
+def ub64decode(text):
+ """ decode text from unpadded, url-safe base64. """
+ # add missing padding if necessary
+ text += '=' * (-len(text) % 4)
+ return base64.urlsafe_b64decode(bytes(text))
diff --git a/okupy/crypto/models.py b/okupy/crypto/models.py
new file mode 100644
index 0000000..b2eaa08
--- /dev/null
+++ b/okupy/crypto/models.py
@@ -0,0 +1,86 @@
+# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
+
+from django.contrib.auth.models import User
+from django.db import models, IntegrityError
+from django.utils.timezone import now
+
+from .ciphers import idcipher
+
+from datetime import timedelta
+
+
+# based on https://gist.github.com/treyhunner/735861
+
+class EncryptedPKModelManager(models.Manager):
+ def get(self, *args, **kwargs):
+ eid = kwargs.pop('encrypted_id', None)
+ if eid is not None:
+ kwargs['id'] = idcipher.decrypt(eid)
+ return super(EncryptedPKModelManager, self).get(*args, **kwargs)
+
+
+class EncryptedPKModel(models.Model):
+ """
+ A model with built-in identifier encryption (for secure tokens).
+ """
+
+ objects = EncryptedPKModelManager()
+
+ @property
+ def encrypted_id(self):
+ """
+ The object identifier encrypted using IDCipher, as a hex-string.
+ """
+ if self.id is None:
+ return None
+ return idcipher.encrypt(self.id)
+
+ class Meta:
+ abstract = True
+
+
+class RevokedToken(models.Model):
+ """
+ A model that guarantees atomic token revocation.
+
+ We can use a single table for various kinds of tokens as long
+ as they don't interfere (e.g. are of different length).
+ """
+
+ user = models.ForeignKey(User, db_index=False, null=True)
+ token = models.CharField(max_length=64)
+ ts = models.DateTimeField(auto_now_add=True)
+
+ @classmethod
+ def cleanup(cls):
+ """
+ Remove tokens old enough to be no longer valid.
+ """
+
+ # we use this just to enforce atomicity and prevent replay
+ # for SOTP, we can clean up old tokens quite fast
+ # (as soon as .delete() is effective)
+ # for TOTP, we should wait till the token drifts away
+ old = now() - timedelta(minutes=3)
+ cls.objects.filter(ts__lt=old).delete()
+
+ @classmethod
+ def add(cls, token, user=None):
+ """
+ Use and revoke the given token, for the given user. User
+ can be None if irrelevant.
+
+ Returns True if the token is fine, False if it was used
+ already.
+ """
+ cls.cleanup()
+
+ t = cls(user=user, token=token)
+ try:
+ t.save()
+ except IntegrityError:
+ return False
+ return True
+
+ class Meta:
+ unique_together = ('user', 'token')
diff --git a/okupy/otp/models.py b/okupy/otp/models.py
deleted file mode 100644
index a43fb4e..0000000
--- a/okupy/otp/models.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
-
-from django.contrib.auth.models import User
-from django.db import models, IntegrityError
-
-from datetime import datetime, timedelta
-
-
-class RevokedToken(models.Model):
- """ A model that guarantees atomic token revocation. """
-
- user = models.ForeignKey(User)
- token = models.CharField(max_length=10)
- ts = models.DateTimeField(auto_now_add=True)
-
- @classmethod
- def cleanup(cls):
- """
- Remove tokens old enough to be no longer valid.
- """
-
- # we use this just to enforce atomicity and prevent replay
- # for SOTP, we can clean up old tokens quite fast
- # (as soon as .delete() is effective)
- # for TOTP, we should wait till the token drifts away
- old = datetime.now() - timedelta(minutes=3)
- cls.objects.filter(ts__lt=old).delete()
-
- @classmethod
- def add(cls, user, token):
- """
- Use and revoke the given token, for the given user.
-
- Returns True if the token is fine, False if it was used
- already.
- """
- cls.cleanup()
-
- t = cls(user=user, token=token)
- try:
- t.save()
- except IntegrityError:
- return False
- return True
-
- class Meta:
- unique_together = ('user', 'token')
diff --git a/okupy/otp/totp/models.py b/okupy/otp/totp/models.py
index a32baa6..72f5e3d 100644
--- a/okupy/otp/totp/models.py
+++ b/okupy/otp/totp/models.py
@@ -3,9 +3,8 @@
from django_otp import oath
from django_otp.models import Device
-from base64 import b32decode, b32encode
-
from ...accounts.models import LDAPUser
+from ...crypto.codecs import ub32decode, ub32encode
import Crypto.Random
@@ -45,7 +44,7 @@ class TOTPDevice(Device):
Returns 20-character base32 string (with padding stripped).
"""
rng = Crypto.Random.new()
- return b32encode(rng.read(12)).rstrip('=')
+ return ub32encode(rng.read(12))
@staticmethod
def get_uri(secret):
@@ -67,10 +66,7 @@ class TOTPDevice(Device):
return False
secret = u.otp_secret
- # add missing padding if necessary
- secret += '=' * (-len(secret) % 8)
-
- key = b32decode(secret, casefold=True)
+ key = ub32decode(secret)
try:
token = int(token)
except ValueError:
diff --git a/okupy/settings/__init__.py b/okupy/settings/__init__.py
index 9b59a2d..767bb22 100644
--- a/okupy/settings/__init__.py
+++ b/okupy/settings/__init__.py
@@ -50,7 +50,7 @@ INSTALLED_APPS = (
'django.contrib.messages',
'django.contrib.staticfiles',
'okupy.accounts',
- 'okupy.otp',
+ 'okupy.crypto',
'okupy.otp.sotp',
'okupy.otp.totp',
)
diff --git a/okupy/tests/settings.py b/okupy/tests/settings.py
index f4d0c07..ac4e9df 100644
--- a/okupy/tests/settings.py
+++ b/okupy/tests/settings.py
@@ -53,7 +53,7 @@ INSTALLED_APPS = (
'django_otp',
'discover_runner',
'okupy.accounts',
- 'okupy.otp',
+ 'okupy.crypto',
'okupy.otp.sotp',
'okupy.otp.totp',
'okupy.tests',
diff --git a/okupy/tests/unit/test_cipher.py b/okupy/tests/unit/test_cipher.py
index 31ac795..0589dfd 100644
--- a/okupy/tests/unit/test_cipher.py
+++ b/okupy/tests/unit/test_cipher.py
@@ -1,29 +1,89 @@
-#!/usr/bin/env python
+# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python
from Crypto import Random
-from unittest import TestCase
+from unittest import TestCase, SkipTest
-from ...common.crypto import cipher
+from django.contrib.sessions.backends.cache import SessionStore
+
+from ...crypto.ciphers import cipher, sessionrefcipher
class OkupyCipherTests(TestCase):
- def test_verify_password_less_than_8_chars(self):
- hash = cipher.encrypt('test1')
- self.assertEqual(cipher.decrypt(hash, 5), 'test1')
+ def setUp(self):
+ self._random_string = '123456abcdef' * int(cipher.block_size / 2)
+
+ def test_verify_password_less_than_block_size(self):
+ data = self._random_string[:cipher.block_size-3]
+ hash = cipher.encrypt(data)
+ self.assertEqual(cipher.decrypt(hash, len(data)), data)
- def test_verify_password_8_chars(self):
- hash = cipher.encrypt('testtest')
- self.assertEqual(cipher.decrypt(hash, 8), 'testtest')
+ def test_verify_password_exact_block_size(self):
+ data = self._random_string[:cipher.block_size]
+ hash = cipher.encrypt(data)
+ self.assertEqual(cipher.decrypt(hash, len(data)), data)
- def test_verify_password_more_than_8_chars(self):
- hash = cipher.encrypt('testtest123')
- self.assertEqual(cipher.decrypt(hash, 11), 'testtest123')
+ def test_verify_password_more_than_block_size(self):
+ data = self._random_string[:cipher.block_size+3]
+ hash = cipher.encrypt(data)
+ self.assertEqual(cipher.decrypt(hash, len(data)), data)
- def test_verify_password_more_than_16_chars(self):
- hash = cipher.encrypt('testtest123456789012')
- self.assertEqual(cipher.decrypt(hash, 20), 'testtest123456789012')
+ def test_verify_password_more_than_twice_block_size(self):
+ data = self._random_string[:cipher.block_size*2+3]
+ hash = cipher.encrypt(data)
+ self.assertEqual(cipher.decrypt(hash, len(data)), data)
def test_encrypt_random_bytes(self):
- password = Random.get_random_bytes(45)
- hash = cipher.encrypt(password)
- self.assertEqual(cipher.decrypt(hash, 45), password)
+ data = Random.get_random_bytes(45)
+ hash = cipher.encrypt(data)
+ self.assertEqual(cipher.decrypt(hash, len(data)), data)
+
+ def test_ciphertext_shorter_than_req_output_raises_valueerror(self):
+ data = self._random_string[:cipher.block_size*2]
+ hash = cipher.encrypt(data)[:cipher.block_size]
+ self.assertRaises(ValueError, cipher.decrypt, hash, len(data))
+
+ def test_ciphertext_not_multiple_of_block_size_raises_valueerror(self):
+ data = self._random_string[:cipher.block_size/2]
+ hash = cipher.encrypt(data)[:cipher.block_size/2]
+ self.assertRaises(ValueError, cipher.decrypt, hash, len(data))
+
+
+class SessionRefCipherTest(TestCase):
+ def test_encrypt_decrypt(self):
+ session = SessionStore()
+ session['test'] = 'in-test'
+ session.save()
+
+ eid = sessionrefcipher.encrypt(session)
+ sess = sessionrefcipher.decrypt(eid)
+ self.assertEqual(sess.get('test'), 'in-test')
+
+ def test_invalid_base64_raises_valueerror(self):
+ data = 'Azcd^%'
+ self.assertRaises(ValueError, sessionrefcipher.decrypt, data)
+
+ def test_invalid_ciphertext_raises_valueerror(self):
+ data = 'ZHVwYQo='
+ self.assertRaises(ValueError, sessionrefcipher.decrypt, data)
+
+ def test_unique_encrypted_are_generated_after_revocation(self):
+ session = SessionStore()
+ session['test'] = 'in-test'
+ session.save()
+
+ eid1 = sessionrefcipher.encrypt(session)
+ session = sessionrefcipher.decrypt(eid1)
+ eid2 = sessionrefcipher.encrypt(session)
+ self.assertNotEqual(eid1, eid2)
+
+ def test_revoked_encrypted_id_raises_valueerror(self):
+ session = SessionStore()
+ session['test'] = 'in-test'
+ session.save()
+
+ eid1 = sessionrefcipher.encrypt(session)
+ session = sessionrefcipher.decrypt(eid1)
+ eid2 = sessionrefcipher.encrypt(session)
+ if eid1 == eid2:
+ raise SkipTest('Non-unique encrypted IDs generated')
+ self.assertRaises(ValueError, sessionrefcipher.decrypt, eid1)
diff --git a/okupy/tests/unit/test_login.py b/okupy/tests/unit/test_login.py
index 58c01ea..8aaf2c1 100644
--- a/okupy/tests/unit/test_login.py
+++ b/okupy/tests/unit/test_login.py
@@ -15,8 +15,8 @@ from mockldap import MockLdap
from .. import vars
from ...accounts.views import login, logout
from ...accounts.forms import LoginForm
-from ...common.crypto import cipher
from ...common.test_helpers import OkupyTestCase, set_request, no_database, ldap_users, set_search_seed
+from ...crypto.ciphers import cipher
class LoginUnitTests(OkupyTestCase):
diff --git a/okupy/tests/unit/test_secondary_password.py b/okupy/tests/unit/test_secondary_password.py
index 8338827..baef415 100644
--- a/okupy/tests/unit/test_secondary_password.py
+++ b/okupy/tests/unit/test_secondary_password.py
@@ -9,9 +9,9 @@ from mockldap import MockLdap
from passlib.hash import ldap_md5_crypt
from .. import vars
-from ...common.crypto import cipher
from ...common.ldap_helpers import set_secondary_password, remove_secondary_password
from ...common.test_helpers import set_request, set_search_seed, ldap_users
+from ...crypto.ciphers import cipher
class SecondaryPassword(TestCase):
diff --git a/okupy/tests/unit/test_signup.py b/okupy/tests/unit/test_signup.py
index c32869a..e1f4d2b 100644
--- a/okupy/tests/unit/test_signup.py
+++ b/okupy/tests/unit/test_signup.py
@@ -137,7 +137,7 @@ class SignupUnitTests(OkupyTestCase):
self.assertEqual(vars.QUEUEDUSER.email, vars.SIGNUP_TESTUSER['email'])
self.assertEqual(vars.QUEUEDUSER.password, vars.SIGNUP_TESTUSER['password_origin'])
# note: this needs to be kept in line with used cipher
- self.assertRegexpMatches(vars.QUEUEDUSER.encrypted_id, '^[a-f0-9]{16}$')
+ self.assertRegexpMatches(vars.QUEUEDUSER.encrypted_id, '^[a-zA-Z0-9_-]{22}$')
@no_database()
def test_no_database_connection_raises_error_in_signup(self):