aboutsummaryrefslogtreecommitdiff
blob: cdafba60f83efab027c8a9ebff509a37cb703393 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# vim:fileencoding=utf8:et:ts=4:sts=4:sw=4:ft=python

from django.conf import settings
from django.contrib.sessions.backends.cache import SessionStore

from Crypto.Cipher.AES import AESCipher
from Crypto.Hash.SHA256 import SHA256Hash

import Crypto.Random

import struct

from okupy.crypto.codecs import ub64encode, ub64decode


class OkupyCipher(object):
    """ Symmetric cipher using django's SECRET_KEY. """

    _hasher_algo = SHA256Hash
    _cipher_algo = AESCipher

    def __init__(self):
        hasher = self._hasher_algo()
        hasher.update(settings.SECRET_KEY)
        key_hash = hasher.digest()
        self.cipher = self._cipher_algo(key_hash)
        self.rng = Crypto.Random.new()

    @property
    def block_size(self):
        """
        Cipher's block size.
        """
        return self.cipher.block_size

    def encrypt(self, data):
        """
        Encrypt random-length data block padding it with random data
        if necessary.
        """

        # ensure it's bytestring before we append random bits
        data = bytes(data)
        # minus is intentional. (-X % S) == S - (X % S)
        padding = -len(data) % self.block_size
        if padding:
            data += self.rng.read(padding)
        return self.cipher.encrypt(data)

    def decrypt(self, data, length):
        """
        Decrypt the data block of given length. Removes padding if any.
        """

        if len(data) < length:
            raise ValueError('Ciphertext too short for requested length')
        return self.cipher.decrypt(data)[:length]


class IDCipher(object):
    """
    A cipher to create 'encrypted database IDs'. It is specifically fit
    to encrypt an integer into constant-length hexstring.
    """

    def encrypt(self, id):
        byte_id = struct.pack('!I', id)
        byte_eid = cipher.encrypt(byte_id)
        return ub64encode(byte_eid)

    def decrypt(self, eid):
        byte_eid = ub64decode(eid)
        byte_id = cipher.decrypt(byte_eid, 4)
        id = struct.unpack('!I', byte_id)[0]
        return id


class SessionRefCipher(object):
    """
    A cipher to provide encrypted identifiers to sessions.

    The encrypted session ID is stored in session for additional
    security. Only previous encryption result may be used in decrypt().
    """

    cache_key_prefix = 'django.contrib.sessions.cache'
    session_id_length = 32
    random_prefix_bytes = 4
    ciphertext_length = session_id_length*3/4 + random_prefix_bytes

    def encrypt(self, session):
        """
        Return an encrypted reference to the session. The encrypted
        identifier will be stored in the session for verification
        and caching. Therefore, further calls to this method will reuse
        the previously cached identifier.
        """

        if 'encrypted_id' not in session:
            # .cache_key is a very good property since it ensures
            # that the cache is actually created, and works from first
            # request
            session_id = session.cache_key

            # since it always starts with the backend module name
            # and __init__() expects pure id, we can strip that
            assert(session_id.startswith(self.cache_key_prefix))
            session_id = session_id[len(self.cache_key_prefix):]
            assert(len(session_id) == self.session_id_length)

            # now's another curious trick: session id consists
            # of [a-z][0-9]. it's basically base36 but since decoding
            # that is harsh, let's just treat it as base64. that's
            # going to pack it into 3/4 original size, that is 24 bytes.
            # then, with random prefix prepended we will fit into one
            # block of ciphertext less.
            session_id = ub64decode(session_id)

            data = (cipher.rng.read(self.random_prefix_bytes)
                    + session_id)
            assert(len(data) == self.ciphertext_length)
            session['encrypted_id'] = ub64encode(
                cipher.encrypt(data))
            session.save()
        return session['encrypted_id']

    def decrypt(self, eid):
        """
        Return the SessionStore to which the encrypted identifier is
        pointing. Raises ValueError if the identifier is invalid.
        """

        try:
            session_id = cipher.decrypt(ub64decode(eid),
                                        self.ciphertext_length)
        except (TypeError, ValueError):
            pass
        else:
            session_id = session_id[self.random_prefix_bytes:]
            session_id = ub64encode(session_id)
            session = SessionStore(session_key=session_id)
            if session.get('encrypted_id') == eid:
                # circular import
                from .models import RevokedToken

                # revoke to prevent replay attacks
                if RevokedToken.add(eid):
                    del session['encrypted_id']
                    session.save()
                    return session
        raise ValueError('Invalid session id')


cipher = OkupyCipher()
idcipher = IDCipher()
sessionrefcipher = SessionRefCipher()