diff options
author | Tim Harder <radhermit@gmail.com> | 2018-04-11 05:42:50 -0400 |
---|---|---|
committer | Tim Harder <radhermit@gmail.com> | 2018-04-11 06:00:12 -0400 |
commit | 4c2ba4079bbd542523baf37970e4c583e29537a8 (patch) | |
tree | ac2c099f3576f580420a6591287bc1af9bdb1779 /tests/test_plugin.py | |
parent | move to >=py3.6 only (diff) | |
download | pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.gz pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.bz2 pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.zip |
move tests to root dir
Diffstat (limited to 'tests/test_plugin.py')
-rw-r--r-- | tests/test_plugin.py | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/tests/test_plugin.py b/tests/test_plugin.py new file mode 100644 index 000000000..b667c0dd3 --- /dev/null +++ b/tests/test_plugin.py @@ -0,0 +1,297 @@ +# Copyright: 2011 Brian Harring <ferringb@gmail.com> +# Copyright: 2006 Marien Zwart <marienz@gentoo.org> +# License: BSD/GPL2 + +import logging +import os +import shutil +import sys +import tempfile + +from pkgcore import plugin +from pkgcore.test import silence_logging +from snakeoil.osutils import pjoin +from snakeoil.sequences import stable_unique +from snakeoil.test import TestCase + + +class LowPlug(object): + priority = 1 + + +class ModulesTest(TestCase): + + def setUp(self): + # Set up some test modules for our use. + self.dir = tempfile.mkdtemp() + self.dir2 = tempfile.mkdtemp() + self.packdir = pjoin(self.dir, 'mod_testplug') + self.packdir2 = pjoin(self.dir2, 'mod_testplug') + os.mkdir(self.packdir) + os.mkdir(self.packdir2) + with open(pjoin(self.packdir, '__init__.py'), 'w') as init: + init.write(''' +from pkgcore.plugins import extend_path + +extend_path(__path__, __name__) +''') + filename = pjoin(self.packdir, 'plug.py') + with open(filename, 'w') as plug: + plug.write(''' +class DisabledPlug(object): + disabled = True + +class HighPlug(object): + priority = 7 + +class LowPlug(object): + priority = 1 + +low_plug = LowPlug() +high_plug = HighPlug() + +pkgcore_plugins = { + 'plugtest': [ + DisabledPlug, + high_plug, + 'pkgcore.test.test_plugin.LowPlug', + ] +} +''') + # Move the mtime 2 seconds into the past so the .pyc file has + # a different mtime. + st = os.stat(filename) + os.utime(filename, (st.st_atime, st.st_mtime - 2)) + with open(pjoin(self.packdir, 'plug2.py'), 'w') as plug2: + plug2.write('# I do not have any pkgcore_plugins for you!\n') + with open(pjoin(self.packdir2, 'plug.py'), 'w') as plug: + plug.write(''' +# This file is later on sys.path than the plug.py in packdir, so it should +# not have any effect on the tests. + +class HiddenPlug(object): + priority = 8 + +pkgcore_plugins = {'plugtest': [HiddenPlug]} +''') + # Append it to the path + sys.path.insert(0, self.dir2) + sys.path.insert(0, self.dir) + + def tearDown(self): + # pop the test module dir from path + sys.path.pop(0) + sys.path.pop(0) + # and kill it + shutil.rmtree(self.dir) + shutil.rmtree(self.dir2) + # make sure we don't keep the sys.modules entries around + sys.modules.pop('mod_testplug', None) + sys.modules.pop('mod_testplug.plug', None) + sys.modules.pop('mod_testplug.plug2', None) + + def test_extend_path(self): + import mod_testplug + expected = stable_unique( + pjoin(p, 'mod_testplug') + for p in sys.path if os.path.isdir(p)) + self.assertEqual( + expected, mod_testplug.__path__, + set(expected) ^ set(mod_testplug.__path__)) + + def _runit(self, method): + plugin._global_cache.clear() + method() + mtime = os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME)) + method() + plugin._global_cache.clear() + method() + method() + self.assertEqual( + mtime, + os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME))) + # We cannot write this since it contains an unimportable plugin. + self.assertFalse( + os.path.exists(pjoin(self.packdir2, plugin.CACHE_FILENAME))) + + def _test_plug(self): + import mod_testplug + self.assertIdentical(None, plugin.get_plugin('spork', mod_testplug)) + plugins = list(plugin.get_plugins('plugtest', mod_testplug)) + self.assertEqual(2, len(plugins), plugins) + plugin.get_plugin('plugtest', mod_testplug) + self.assertEqual( + 'HighPlug', + plugin.get_plugin('plugtest', mod_testplug).__class__.__name__) + with open(pjoin(self.packdir, plugin.CACHE_FILENAME)) as f: + lines = f.readlines() + self.assertEqual(3, len(lines)) + self.assertEqual(plugin.CACHE_HEADER + "\n", lines[0]) + lines.pop(0) + lines.sort() + mtime = int(os.path.getmtime(pjoin(self.packdir, 'plug2.py'))) + self.assertEqual('plug2:%s:\n' % (mtime,), lines[0]) + mtime = int(os.path.getmtime(pjoin(self.packdir, 'plug.py'))) + self.assertEqual( + 'plug:%s:plugtest,7,1:plugtest,1,pkgcore.test.test_plugin.LowPlug:' + 'plugtest,0,0\n' % (mtime,), + lines[1]) + + def test_plug(self): + self._runit(self._test_plug) + + def _test_no_unneeded_import(self): + import mod_testplug + list(plugin.get_plugins('spork', mod_testplug)) + sys.modules.pop('mod_testplug.plug') + # This one is not loaded if we are testing with a good cache. + sys.modules.pop('mod_testplug.plug2', None) + list(plugin.get_plugins('plugtest', mod_testplug)) + # Extra messages since getting all of sys.modules printed is annoying. + self.assertIn('mod_testplug.plug', sys.modules, 'plug not loaded') + self.assertNotIn('mod_testplug.plug2', sys.modules, 'plug2 loaded') + + def test_no_unneeded_import(self): + self._runit(self._test_no_unneeded_import) + + @silence_logging(logging.root) + def test_cache_corruption(self): + import mod_testplug + list(plugin.get_plugins('spork', mod_testplug)) + filename = pjoin(self.packdir, plugin.CACHE_FILENAME) + cachefile = open(filename, 'a') + try: + cachefile.write('corruption\n') + finally: + cachefile.close() + # Shift the file into the past a little or the rewritten file + # will occasionally have the same mtime as the corrupt one. + st = os.stat(filename) + corrupt_mtime = st.st_mtime - 2 + os.utime(filename, (st.st_atime, corrupt_mtime)) + plugin._global_cache.clear() + self._test_plug() + good_mtime = os.path.getmtime( + pjoin(self.packdir, plugin.CACHE_FILENAME)) + plugin._global_cache.clear() + self._test_plug() + self.assertEqual(good_mtime, os.path.getmtime( + pjoin(self.packdir, plugin.CACHE_FILENAME))) + self.assertNotEqual(good_mtime, corrupt_mtime) + + def test_rewrite_on_remove(self): + filename = pjoin(self.packdir, 'extra.py') + plug = open(filename, 'w') + try: + plug.write('pkgcore_plugins = {"plugtest": [object()]}\n') + finally: + plug.close() + + plugin._global_cache.clear() + import mod_testplug + self.assertEqual( + 3, len(list(plugin.get_plugins('plugtest', mod_testplug)))) + + os.unlink(filename) + + plugin._global_cache.clear() + self._test_plug() + + @silence_logging(logging.root) + def test_priority_caching(self): + plug3 = open(pjoin(self.packdir, 'plug3.py'), 'w') + try: + plug3.write(''' +class LowPlug(object): + priority = 6 + +pkgcore_plugins = { + 'plugtest': [LowPlug()], +} +''') + finally: + plug3.close() + plug4 = open(pjoin(self.packdir, 'plug4.py'), 'w') + try: + plug4.write(''' +# First file tried, only a disabled plugin. +class HighDisabledPlug(object): + priority = 15 + disabled = True + +pkgcore_plugins = { + 'plugtest': [HighDisabledPlug()], +} +''') + finally: + plug4.close() + plug5 = open(pjoin(self.packdir, 'plug5.py'), 'w') + try: + plug5.write(''' +# Second file tried, with a skipped low priority plugin. +class HighDisabledPlug(object): + priority = 12 + disabled = True + +class LowPlug(object): + priority = 6 + +pkgcore_plugins = { + 'plugtest': [HighDisabledPlug(), LowPlug()], +} +''') + finally: + plug5.close() + plug6 = open(pjoin(self.packdir, 'plug6.py'), 'w') + try: + plug6.write(''' +# Not tried, bogus priority. +class BogusPlug(object): + priority = 'spoon' + +pkgcore_plugins = { + 'plugtest': [BogusPlug()], +} +''') + finally: + plug6.close() + self._runit(self._test_priority_caching) + + def _test_priority_caching(self): + import mod_testplug + list(plugin.get_plugins('spork', mod_testplug)) + sys.modules.pop('mod_testplug.plug', None) + sys.modules.pop('mod_testplug.plug2', None) + sys.modules.pop('mod_testplug.plug3', None) + sys.modules.pop('mod_testplug.plug4', None) + sys.modules.pop('mod_testplug.plug5', None) + sys.modules.pop('mod_testplug.plug6', None) + best_plug = plugin.get_plugin('plugtest', mod_testplug) + from mod_testplug import plug + self.assertEqual(plug.high_plug, best_plug) + # Extra messages since getting all of sys.modules printed is annoying. + self.assertIn('mod_testplug.plug', sys.modules, 'plug not loaded') + self.assertNotIn('mod_testplug.plug2', sys.modules, 'plug2 loaded') + self.assertNotIn('mod_testplug.plug3', sys.modules, 'plug3 loaded') + self.assertIn('mod_testplug.plug4', sys.modules, 'plug4 not loaded') + self.assertIn('mod_testplug.plug5', sys.modules, 'plug4 not loaded') + self.assertNotIn('mod_testplug.plug6', sys.modules, 'plug6 loaded') + + @silence_logging(logging.root) + def test_header_change_invalidates_cache(self): + # Write the cache + plugin._global_cache.clear() + import mod_testplug + list(plugin.get_plugins('testplug', mod_testplug)) + + # Modify the cache. + filename = pjoin(self.packdir, plugin.CACHE_FILENAME) + with open(filename) as f: + cache = f.readlines() + cache[0] = 'not really a pkgcore plugin cache\n' + with open(filename, 'w') as f: + f.write(''.join(cache)) + + # And test if it is properly rewritten. + plugin._global_cache.clear() + self._test_plug() |