# vim:fileencoding=utf-8 # (c) 2011-2012 Michał Górny # Released under the terms of the 2-clause BSD license. from __future__ import print_function import os, os.path, shlex import gobject from traceback import format_exc from optparse import OptionParser from . import PV from .dbus_handler import DBusHandler from .library import load_library from .repository import NewEbuildRepository from .repository.pms_eclass import get_common_eclass_files from .output import get_output_modules, TestResult from .pm import get_package_managers class PMSTestSuiteCLI(object): """ A class encapsulating the CLI routines used by the PMS Test Suite. >>> cli = PMSTestSuiteCLI() @ivar optparser: the option parser @type optparser: optparse.OptionParser @ivar library: the test library @type library: L{TestLibrary} @ivar pm: the active package manager @type pm: L{PackageManager} @ivar repository: the active ebuild repository @type repository: L{EbuildRepository} """ _defpm = 'portage' def __init__(self): """ Initialize the class. Set the option parser up. """ opt = OptionParser(version = PV) def add_pmopts(opt, optstr, optval, parser): try: p = parser.values.pm[-1] except IndexError: p = self._defpm if not hasattr(parser.values, 'pmopts'): setattr(parser.values, 'pmopts', {}) parser.values.pmopts[p] = optval opt.add_option('-C', '--create-repo-only', dest='create_repo_only', help='Create ebuild repository only', action='store_true', default=False) opt.add_option('-l', '--library', dest='library_name', help='Test library to use (default: standard)', default='standard') opt.add_option('-L', '--limit-packages', dest='limit_pkgs', type='int', help='Limit the number of packages merged at once') opt.add_option('-M', '--no-manifests', dest='no_manifests', help='Disable Manifest generation/updates', action='store_true', default=False) opt.add_option('-o', '--output-module', dest='outputmod', help='Output module to use', default='cli') opt.add_option('-O', '--output-file', dest='outputfile', help='File to write output to (may not be used)') opt.add_option('-p', '--package-manager', dest='pm', help='Package manager to use (can be specified multiple times)', action='append', default=[]) opt.add_option('-P', '--pm-options', callback=add_pmopts, help='Additional options to pass to the Package Manager (shell-quoted,' ' apply to the parser specified before it)', action='callback', type='string') opt.add_option('-r', '--repository-name', dest='repo_name', help='Name of the repository to store ebuilds in') opt.add_option('-R', '--repository-path', dest='repo_path', help='Path to the repository to store ebuilds in') opt.add_option('-t', '--tests', dest='tests', help='Limit the tests to be run (may be specified multiple times)', action='append', default=[]) opt.add_option('-T', '--thorough', dest='thorough', help='Run tests thoroughly (i.e. in all possible variants)', action='store_true', default=False) opt.add_option('-U', '--unsupported-eapis', dest='undefined', help='Run tests in undefined-behavior EAPIs as well', action='store_true', default=False) opt.add_option('-v', '--verbose', dest='verbose', help='Report test results verbosely (even if they succeed)', action='store_true', default=False) self.optparser = opt def _start(self, prog, *args): """ Initialize the program. Parse command-line args. Instantiate classes encapsulating the Package Manager, test library and repository. @param prog: the program name (C{argv[0]}) @type prog: string @param args: command line arguments (C{argv[1:]}) @type args: strings """ opt = self.optparser opt.prog = os.path.basename(prog) (opts, args) = opt.parse_args(list(args)) if not opts.pm: opts.pm = [self._defpm] if not hasattr(opts, 'pmopts'): setattr(opts, 'pmopts', {}) if opts.repo_path and opts.repo_name: opt.error('--repository-path and --repository-name are mutually exclusive') if not opts.create_repo_only: for x in get_output_modules(): if x.name == opts.outputmod: self.output = x(opts.outputfile) break else: opt.error('Output module not available: %s' % opts.outputmod) pmset = set(opts.pm) pms = [] for x in get_package_managers(): if x.name in pmset: try: pm = x.inst() except Exception as e: opt.error(e) pm.pm_options = shlex.split(opts.pmopts[x.name]) \ if x.name in opts.pmopts else [] pms.append(pm) pmset.remove(x.name) if pmset: opt.error('PM(s) not supported: %s' % ', '.join(pmset)) self.pms = sorted(pms, key = lambda x: opts.pm.index(x.name)) os.umask(0o22) try: if opts.repo_path: self.repository = NewEbuildRepository(opts.repo_path, 'pms-test-suite') for pm in self.pms: pm.append_repository(self.repository) else: if not opts.repo_name: opts.repo_name = 'pms-test-suite' for pm in self.pms: # XXX: all PMs should match it... self.repository = pm.get_repository(opts.repo_name) break except (EnvironmentError, KeyError, ValueError) as e: opt.error('Repository open failed: %s' % e) dbus_uid = pm.config.userpriv_uid if pm.config.userpriv_enabled \ else None self.dbus_hdlr = DBusHandler(uid = dbus_uid) try: try: self.test_library = load_library(opts.library_name, thorough = opts.thorough, undefined = opts.undefined, dbus_hdlr = self.dbus_hdlr) except (ImportError, TypeError) as e: opt.error('Test library load failed: %s' % e) for pm in self.pms: pm.package_limit = opts.limit_pkgs self.create_repo_only = opts.create_repo_only self.update_manifests = not opts.no_manifests self.verbose = opts.verbose limit_tests = set() for t in opts.tests: limit_tests.update(t.split()) if limit_tests: self.test_library.limit_tests(limit_tests) except Exception: dbus_hdlr.terminate() def _print_stage(self, text): print('-> [%s] %s...' % (self.pm.name, text)) def tests_done(self): try: self.pm.reload_config() self._print_stage('Checking test results') results = {} for t in self.test_library: tr = TestResult(t, self.pm) if tr: outc = '.' elif tr.exception: outc = 'E' self.exception = tr.exception self.loop.quit() return else: outc = 'F' print(outc, end='') results[t] = tr t.clean(self.pm) self.results[self.pm] = results print('') if self.pm.has_pending_actions: self._print_stage('Unmerging test ebuilds') self.pm.commit(self.prepare) else: self.prepare() except Exception as e: self.exception = format_exc() self.loop.quit() def all_done(self): ret = self.output(self.results, verbose = self.verbose) self.ret = 0 if ret else 1 self.loop.quit() def start_pm(self): self._print_stage('Running PM') for t in self.test_library: t.start(self.pm) self.pm.commit(self.tests_done) def pre_unmerge_done(self): try: self.pm.reload_config() for t in self.test_library: t.clean(self.pm) if self.pm.has_pending_actions: print('Failed to unmerge the following test ebuilds:') print(' '.join(self.pm.pkg_queue)) print('Refusing to proceed.') self.loop.quit() return self.start_pm() except Exception as e: self.exception = format_exc() self.loop.quit() def prepare(self, first = False): try: try: self.pm = next(self.pm_iter) except StopIteration: self.all_done() else: if not first: self.pm.reload_config() for t in self.test_library: t.clean(self.pm) if self.pm.has_pending_actions: print('-> Unmerging already-merged test ebuilds...') self.pm.commit(self.pre_unmerge_done) else: self.start_pm() except Exception as e: self.exception = format_exc() self.loop.quit() def generate_and_start(self): try: print('-> Generating ebuilds...') files = {} for t in self.test_library: files.update(t.get_output_files()) if len(self.test_library) == 0: print('No tests found (?!), refusing to proceed.') return 1 files.update(get_common_eclass_files()) files.update(self.test_library.get_common_files()) self.repository.write_files(files) if self.update_manifests: needs_manifests = False for pm in self.pms: needs_manifests |= pm.requires_manifests try: self.repository.remanifest(files, pm) except NotImplementedError: pass else: break else: if needs_manifests: print('No PM was able to do the Manifests, failing.') return 1 if self.create_repo_only: return 0 self.pm_iter = iter(self.pms) self.results = {} self.prepare(first = True) except Exception as e: self.exception = format_exc() self.loop.quit() return False def main(self, argv): self._start(*argv) self.exception = None try: gobject.idle_add(self.generate_and_start) self.ret = 1 self.loop = gobject.MainLoop() self.loop.run() finally: # Ensure to terminate the spawned D-Bus. self.dbus_hdlr.terminate() if self.exception is not None: return self.exception return self.ret