aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Harder <radhermit@gmail.com>2018-04-11 05:42:50 -0400
committerTim Harder <radhermit@gmail.com>2018-04-11 06:00:12 -0400
commit4c2ba4079bbd542523baf37970e4c583e29537a8 (patch)
treeac2c099f3576f580420a6591287bc1af9bdb1779 /tests/test_plugin.py
parentmove to >=py3.6 only (diff)
downloadpkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.gz
pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.bz2
pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.zip
move tests to root dir
Diffstat (limited to 'tests/test_plugin.py')
-rw-r--r--tests/test_plugin.py297
1 files changed, 297 insertions, 0 deletions
diff --git a/tests/test_plugin.py b/tests/test_plugin.py
new file mode 100644
index 000000000..b667c0dd3
--- /dev/null
+++ b/tests/test_plugin.py
@@ -0,0 +1,297 @@
+# Copyright: 2011 Brian Harring <ferringb@gmail.com>
+# Copyright: 2006 Marien Zwart <marienz@gentoo.org>
+# License: BSD/GPL2
+
+import logging
+import os
+import shutil
+import sys
+import tempfile
+
+from pkgcore import plugin
+from pkgcore.test import silence_logging
+from snakeoil.osutils import pjoin
+from snakeoil.sequences import stable_unique
+from snakeoil.test import TestCase
+
+
+class LowPlug(object):
+ priority = 1
+
+
+class ModulesTest(TestCase):
+
+ def setUp(self):
+ # Set up some test modules for our use.
+ self.dir = tempfile.mkdtemp()
+ self.dir2 = tempfile.mkdtemp()
+ self.packdir = pjoin(self.dir, 'mod_testplug')
+ self.packdir2 = pjoin(self.dir2, 'mod_testplug')
+ os.mkdir(self.packdir)
+ os.mkdir(self.packdir2)
+ with open(pjoin(self.packdir, '__init__.py'), 'w') as init:
+ init.write('''
+from pkgcore.plugins import extend_path
+
+extend_path(__path__, __name__)
+''')
+ filename = pjoin(self.packdir, 'plug.py')
+ with open(filename, 'w') as plug:
+ plug.write('''
+class DisabledPlug(object):
+ disabled = True
+
+class HighPlug(object):
+ priority = 7
+
+class LowPlug(object):
+ priority = 1
+
+low_plug = LowPlug()
+high_plug = HighPlug()
+
+pkgcore_plugins = {
+ 'plugtest': [
+ DisabledPlug,
+ high_plug,
+ 'pkgcore.test.test_plugin.LowPlug',
+ ]
+}
+''')
+ # Move the mtime 2 seconds into the past so the .pyc file has
+ # a different mtime.
+ st = os.stat(filename)
+ os.utime(filename, (st.st_atime, st.st_mtime - 2))
+ with open(pjoin(self.packdir, 'plug2.py'), 'w') as plug2:
+ plug2.write('# I do not have any pkgcore_plugins for you!\n')
+ with open(pjoin(self.packdir2, 'plug.py'), 'w') as plug:
+ plug.write('''
+# This file is later on sys.path than the plug.py in packdir, so it should
+# not have any effect on the tests.
+
+class HiddenPlug(object):
+ priority = 8
+
+pkgcore_plugins = {'plugtest': [HiddenPlug]}
+''')
+ # Append it to the path
+ sys.path.insert(0, self.dir2)
+ sys.path.insert(0, self.dir)
+
+ def tearDown(self):
+ # pop the test module dir from path
+ sys.path.pop(0)
+ sys.path.pop(0)
+ # and kill it
+ shutil.rmtree(self.dir)
+ shutil.rmtree(self.dir2)
+ # make sure we don't keep the sys.modules entries around
+ sys.modules.pop('mod_testplug', None)
+ sys.modules.pop('mod_testplug.plug', None)
+ sys.modules.pop('mod_testplug.plug2', None)
+
+ def test_extend_path(self):
+ import mod_testplug
+ expected = stable_unique(
+ pjoin(p, 'mod_testplug')
+ for p in sys.path if os.path.isdir(p))
+ self.assertEqual(
+ expected, mod_testplug.__path__,
+ set(expected) ^ set(mod_testplug.__path__))
+
+ def _runit(self, method):
+ plugin._global_cache.clear()
+ method()
+ mtime = os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME))
+ method()
+ plugin._global_cache.clear()
+ method()
+ method()
+ self.assertEqual(
+ mtime,
+ os.path.getmtime(pjoin(self.packdir, plugin.CACHE_FILENAME)))
+ # We cannot write this since it contains an unimportable plugin.
+ self.assertFalse(
+ os.path.exists(pjoin(self.packdir2, plugin.CACHE_FILENAME)))
+
+ def _test_plug(self):
+ import mod_testplug
+ self.assertIdentical(None, plugin.get_plugin('spork', mod_testplug))
+ plugins = list(plugin.get_plugins('plugtest', mod_testplug))
+ self.assertEqual(2, len(plugins), plugins)
+ plugin.get_plugin('plugtest', mod_testplug)
+ self.assertEqual(
+ 'HighPlug',
+ plugin.get_plugin('plugtest', mod_testplug).__class__.__name__)
+ with open(pjoin(self.packdir, plugin.CACHE_FILENAME)) as f:
+ lines = f.readlines()
+ self.assertEqual(3, len(lines))
+ self.assertEqual(plugin.CACHE_HEADER + "\n", lines[0])
+ lines.pop(0)
+ lines.sort()
+ mtime = int(os.path.getmtime(pjoin(self.packdir, 'plug2.py')))
+ self.assertEqual('plug2:%s:\n' % (mtime,), lines[0])
+ mtime = int(os.path.getmtime(pjoin(self.packdir, 'plug.py')))
+ self.assertEqual(
+ 'plug:%s:plugtest,7,1:plugtest,1,pkgcore.test.test_plugin.LowPlug:'
+ 'plugtest,0,0\n' % (mtime,),
+ lines[1])
+
+ def test_plug(self):
+ self._runit(self._test_plug)
+
+ def _test_no_unneeded_import(self):
+ import mod_testplug
+ list(plugin.get_plugins('spork', mod_testplug))
+ sys.modules.pop('mod_testplug.plug')
+ # This one is not loaded if we are testing with a good cache.
+ sys.modules.pop('mod_testplug.plug2', None)
+ list(plugin.get_plugins('plugtest', mod_testplug))
+ # Extra messages since getting all of sys.modules printed is annoying.
+ self.assertIn('mod_testplug.plug', sys.modules, 'plug not loaded')
+ self.assertNotIn('mod_testplug.plug2', sys.modules, 'plug2 loaded')
+
+ def test_no_unneeded_import(self):
+ self._runit(self._test_no_unneeded_import)
+
+ @silence_logging(logging.root)
+ def test_cache_corruption(self):
+ import mod_testplug
+ list(plugin.get_plugins('spork', mod_testplug))
+ filename = pjoin(self.packdir, plugin.CACHE_FILENAME)
+ cachefile = open(filename, 'a')
+ try:
+ cachefile.write('corruption\n')
+ finally:
+ cachefile.close()
+ # Shift the file into the past a little or the rewritten file
+ # will occasionally have the same mtime as the corrupt one.
+ st = os.stat(filename)
+ corrupt_mtime = st.st_mtime - 2
+ os.utime(filename, (st.st_atime, corrupt_mtime))
+ plugin._global_cache.clear()
+ self._test_plug()
+ good_mtime = os.path.getmtime(
+ pjoin(self.packdir, plugin.CACHE_FILENAME))
+ plugin._global_cache.clear()
+ self._test_plug()
+ self.assertEqual(good_mtime, os.path.getmtime(
+ pjoin(self.packdir, plugin.CACHE_FILENAME)))
+ self.assertNotEqual(good_mtime, corrupt_mtime)
+
+ def test_rewrite_on_remove(self):
+ filename = pjoin(self.packdir, 'extra.py')
+ plug = open(filename, 'w')
+ try:
+ plug.write('pkgcore_plugins = {"plugtest": [object()]}\n')
+ finally:
+ plug.close()
+
+ plugin._global_cache.clear()
+ import mod_testplug
+ self.assertEqual(
+ 3, len(list(plugin.get_plugins('plugtest', mod_testplug))))
+
+ os.unlink(filename)
+
+ plugin._global_cache.clear()
+ self._test_plug()
+
+ @silence_logging(logging.root)
+ def test_priority_caching(self):
+ plug3 = open(pjoin(self.packdir, 'plug3.py'), 'w')
+ try:
+ plug3.write('''
+class LowPlug(object):
+ priority = 6
+
+pkgcore_plugins = {
+ 'plugtest': [LowPlug()],
+}
+''')
+ finally:
+ plug3.close()
+ plug4 = open(pjoin(self.packdir, 'plug4.py'), 'w')
+ try:
+ plug4.write('''
+# First file tried, only a disabled plugin.
+class HighDisabledPlug(object):
+ priority = 15
+ disabled = True
+
+pkgcore_plugins = {
+ 'plugtest': [HighDisabledPlug()],
+}
+''')
+ finally:
+ plug4.close()
+ plug5 = open(pjoin(self.packdir, 'plug5.py'), 'w')
+ try:
+ plug5.write('''
+# Second file tried, with a skipped low priority plugin.
+class HighDisabledPlug(object):
+ priority = 12
+ disabled = True
+
+class LowPlug(object):
+ priority = 6
+
+pkgcore_plugins = {
+ 'plugtest': [HighDisabledPlug(), LowPlug()],
+}
+''')
+ finally:
+ plug5.close()
+ plug6 = open(pjoin(self.packdir, 'plug6.py'), 'w')
+ try:
+ plug6.write('''
+# Not tried, bogus priority.
+class BogusPlug(object):
+ priority = 'spoon'
+
+pkgcore_plugins = {
+ 'plugtest': [BogusPlug()],
+}
+''')
+ finally:
+ plug6.close()
+ self._runit(self._test_priority_caching)
+
+ def _test_priority_caching(self):
+ import mod_testplug
+ list(plugin.get_plugins('spork', mod_testplug))
+ sys.modules.pop('mod_testplug.plug', None)
+ sys.modules.pop('mod_testplug.plug2', None)
+ sys.modules.pop('mod_testplug.plug3', None)
+ sys.modules.pop('mod_testplug.plug4', None)
+ sys.modules.pop('mod_testplug.plug5', None)
+ sys.modules.pop('mod_testplug.plug6', None)
+ best_plug = plugin.get_plugin('plugtest', mod_testplug)
+ from mod_testplug import plug
+ self.assertEqual(plug.high_plug, best_plug)
+ # Extra messages since getting all of sys.modules printed is annoying.
+ self.assertIn('mod_testplug.plug', sys.modules, 'plug not loaded')
+ self.assertNotIn('mod_testplug.plug2', sys.modules, 'plug2 loaded')
+ self.assertNotIn('mod_testplug.plug3', sys.modules, 'plug3 loaded')
+ self.assertIn('mod_testplug.plug4', sys.modules, 'plug4 not loaded')
+ self.assertIn('mod_testplug.plug5', sys.modules, 'plug4 not loaded')
+ self.assertNotIn('mod_testplug.plug6', sys.modules, 'plug6 loaded')
+
+ @silence_logging(logging.root)
+ def test_header_change_invalidates_cache(self):
+ # Write the cache
+ plugin._global_cache.clear()
+ import mod_testplug
+ list(plugin.get_plugins('testplug', mod_testplug))
+
+ # Modify the cache.
+ filename = pjoin(self.packdir, plugin.CACHE_FILENAME)
+ with open(filename) as f:
+ cache = f.readlines()
+ cache[0] = 'not really a pkgcore plugin cache\n'
+ with open(filename, 'w') as f:
+ f.write(''.join(cache))
+
+ # And test if it is properly rewritten.
+ plugin._global_cache.clear()
+ self._test_plug()