aboutsummaryrefslogtreecommitdiff
blob: e6c1136ffaf24d1fb5978bfe56532ed6a2f53b7b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import os
import shutil
import sys
import tempfile
from unittest import mock

from pkgcore import plugin
from snakeoil.osutils import pjoin
from snakeoil.sequences import stable_unique


class LowPlug:
    priority = 1


class TestModules:
    def setup_method(self, method):
        self.dir = tempfile.mkdtemp()
        self.dir2 = tempfile.mkdtemp()

        # force plugin module to use package dir for cache dir by setting
        # system/user cache dirs to nonexistent paths
        self.patcher = mock.patch("pkgcore.plugin.const")
        const = self.patcher.start()
        const.SYSTEM_CACHE_PATH = pjoin(self.dir, "nonexistent")
        const.USER_CACHE_PATH = pjoin(self.dir, "nonexistent")

        # Set up some test modules for our use.
        self.packdir = pjoin(self.dir, "mod_testplug")
        self.packdir2 = pjoin(self.dir2, "mod_testplug")
        os.mkdir(self.packdir)
        os.mkdir(self.packdir2)
        with open(pjoin(self.packdir, "__init__.py"), "w") as init:
            init.write(
                """
from pkgcore.plugins import extend_path

extend_path(__path__, __name__)
"""
            )
        filename = pjoin(self.packdir, "plug.py")
        with open(filename, "w") as plug:
            plug.write(
                """
class DisabledPlug:
    disabled = True

class HighPlug:
    priority = 7

class LowPlug:
    priority = 1

low_plug = LowPlug()
high_plug = HighPlug()

pkgcore_plugins = {
    'plugtest': [
        DisabledPlug,
        high_plug,
        'tests.test_plugin.LowPlug',
    ]
}
"""
            )
        # Move the mtime 2 seconds into the past so the .pyc file has
        # a different mtime.
        st = os.stat(filename)
        os.utime(filename, (st.st_atime, st.st_mtime - 2))
        with open(pjoin(self.packdir, "plug2.py"), "w") as plug2:
            plug2.write("# I do not have any pkgcore_plugins for you!\n")
        with open(pjoin(self.packdir2, "plug.py"), "w") as plug:
            plug.write(
                """
# This file is later on sys.path than the plug.py in packdir, so it should
# not have any effect on the tests.

class HiddenPlug:
    priority = 8

pkgcore_plugins = {'plugtest': [HiddenPlug]}
"""
            )
        # Append it to the path
        sys.path.insert(0, self.dir2)
        sys.path.insert(0, self.dir)

    def teardown_method(self):
        # stop mocked patcher
        self.patcher.stop()
        # pop the test module dir from path
        sys.path.pop(0)
        sys.path.pop(0)
        # and kill it
        shutil.rmtree(self.dir)
        shutil.rmtree(self.dir2)
        # make sure we don't keep the sys.modules entries around
        sys.modules.pop("mod_testplug", None)
        sys.modules.pop("mod_testplug.plug", None)
        sys.modules.pop("mod_testplug.plug2", None)

    def test_extend_path(self):
        import mod_testplug

        expected = stable_unique(
            pjoin(p, "mod_testplug") for p in sys.path if os.path.isdir(p)
        )
        assert expected == mod_testplug.__path__, set(expected) ^ set(
            mod_testplug.__path__
        )

    def _runit(self, method):
        plugin._global_cache.clear()
        method()
        mtime = os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME))
        method()
        plugin._global_cache.clear()
        method()
        method()
        assert mtime == os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME))
        # We cannot write this since it contains an unimportable plugin.
        assert not os.path.exists(pjoin(self.packdir2, plugin.CACHE_FILENAME))

    def _test_plug(self):
        import mod_testplug

        assert plugin.get_plugin("spork", mod_testplug) is None
        plugins = list(plugin.get_plugins("plugtest", mod_testplug))
        assert len(plugins) == 2, plugins
        plugin.get_plugin("plugtest", mod_testplug)
        assert (
            "HighPlug" == plugin.get_plugin("plugtest", mod_testplug).__class__.__name__
        )
        with open(pjoin(self.packdir, plugin.CACHE_FILENAME)) as f:
            lines = f.readlines()
        assert len(lines) == 3
        assert plugin.CACHE_HEADER + "\n" == lines[0]
        lines.pop(0)
        lines.sort()
        mtime = int(os.path.getmtime(pjoin(self.packdir, "plug2.py")))
        assert f"plug2:{mtime}:\n" == lines[0]
        mtime = int(os.path.getmtime(pjoin(self.packdir, "plug.py")))
        assert (
            f"plug:{mtime}:plugtest,7,1:plugtest,1,tests.test_plugin.LowPlug:plugtest,0,0\n"
            == lines[1]
        )

    def test_plug(self):
        self._runit(self._test_plug)

    def _test_no_unneeded_import(self):
        import mod_testplug

        list(plugin.get_plugins("spork", mod_testplug))
        sys.modules.pop("mod_testplug.plug")
        # This one is not loaded if we are testing with a good cache.
        sys.modules.pop("mod_testplug.plug2", None)
        list(plugin.get_plugins("plugtest", mod_testplug))
        # Extra messages since getting all of sys.modules printed is annoying.
        assert "mod_testplug.plug" in sys.modules, "plug not loaded"
        assert "mod_testplug.plug2" not in sys.modules, "plug2 loaded"

    def test_no_unneeded_import(self):
        self._runit(self._test_no_unneeded_import)

    def test_cache_corruption(self):
        print(plugin.const)
        print("wheeeeee")
        import mod_testplug

        list(plugin.get_plugins("spork", mod_testplug))
        filename = pjoin(self.packdir, plugin.CACHE_FILENAME)
        cachefile = open(filename, "a")
        try:
            cachefile.write("corruption\n")
        finally:
            cachefile.close()
        # Shift the file into the past a little or the rewritten file
        # will occasionally have the same mtime as the corrupt one.
        st = os.stat(filename)
        corrupt_mtime = st.st_mtime - 2
        os.utime(filename, (st.st_atime, corrupt_mtime))
        plugin._global_cache.clear()
        self._test_plug()
        good_mtime = os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME))
        plugin._global_cache.clear()
        self._test_plug()
        assert good_mtime == os.path.getmtime(
            pjoin(self.packdir, plugin.CACHE_FILENAME)
        )
        assert good_mtime != corrupt_mtime

    def test_rewrite_on_remove(self):
        filename = pjoin(self.packdir, "extra.py")
        plug = open(filename, "w")
        try:
            plug.write('pkgcore_plugins = {"plugtest": [object()]}\n')
        finally:
            plug.close()

        plugin._global_cache.clear()
        import mod_testplug

        assert len(list(plugin.get_plugins("plugtest", mod_testplug))) == 3

        os.unlink(filename)

        plugin._global_cache.clear()
        self._test_plug()

    def test_priority_caching(self):
        plug3 = open(pjoin(self.packdir, "plug3.py"), "w")
        try:
            plug3.write(
                """
class LowPlug:
    priority = 6

pkgcore_plugins = {
    'plugtest': [LowPlug()],
}
"""
            )
        finally:
            plug3.close()
        plug4 = open(pjoin(self.packdir, "plug4.py"), "w")
        try:
            plug4.write(
                """
# First file tried, only a disabled plugin.
class HighDisabledPlug:
    priority = 15
    disabled = True

pkgcore_plugins = {
    'plugtest': [HighDisabledPlug()],
}
"""
            )
        finally:
            plug4.close()
        plug5 = open(pjoin(self.packdir, "plug5.py"), "w")
        try:
            plug5.write(
                """
# Second file tried, with a skipped low priority plugin.
class HighDisabledPlug:
    priority = 12
    disabled = True

class LowPlug:
    priority = 6

pkgcore_plugins = {
    'plugtest': [HighDisabledPlug(), LowPlug()],
}
"""
            )
        finally:
            plug5.close()
        plug6 = open(pjoin(self.packdir, "plug6.py"), "w")
        try:
            plug6.write(
                """
# Not tried, bogus priority.
class BogusPlug:
    priority = 'spoon'

pkgcore_plugins = {
    'plugtest': [BogusPlug()],
}
"""
            )
        finally:
            plug6.close()
        self._runit(self._test_priority_caching)

    def _test_priority_caching(self):
        import mod_testplug

        list(plugin.get_plugins("spork", mod_testplug))
        sys.modules.pop("mod_testplug.plug", None)
        sys.modules.pop("mod_testplug.plug2", None)
        sys.modules.pop("mod_testplug.plug3", None)
        sys.modules.pop("mod_testplug.plug4", None)
        sys.modules.pop("mod_testplug.plug5", None)
        sys.modules.pop("mod_testplug.plug6", None)
        best_plug = plugin.get_plugin("plugtest", mod_testplug)
        from mod_testplug import plug

        assert plug.high_plug == best_plug
        # Extra messages since getting all of sys.modules printed is annoying.
        assert "mod_testplug.plug" in sys.modules, "plug not loaded"
        assert "mod_testplug.plug2" not in sys.modules, "plug2 loaded"
        assert "mod_testplug.plug3" not in sys.modules, "plug3 loaded"
        assert "mod_testplug.plug4" in sys.modules, "plug4 not loaded"
        assert "mod_testplug.plug5" in sys.modules, "plug4 not loaded"
        assert "mod_testplug.plug6" not in sys.modules, "plug6 loaded"

    def test_header_change_invalidates_cache(self):
        # Write the cache
        plugin._global_cache.clear()
        import mod_testplug

        list(plugin.get_plugins("testplug", mod_testplug))

        # Modify the cache.
        filename = pjoin(self.packdir, plugin.CACHE_FILENAME)
        with open(filename) as f:
            cache = f.readlines()
        cache[0] = "not really a pkgcore plugin cache\n"
        with open(filename, "w") as f:
            f.write("".join(cache))

        # And test if it is properly rewritten.
        plugin._global_cache.clear()
        self._test_plug()