aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2024-03-02 22:30:50 -0800
committerZac Medico <zmedico@gentoo.org>2024-03-02 22:30:50 -0800
commitc3ebdbb42e72335ca65335c855a82b99537c7606 (patch)
treed21e59a3bfa5ba38fb294b04438302a268063a94 /lib
parentbinarytree._populate_remote: Fix UnboundLocalError for binpkg-request-signature (diff)
downloadportage-c3ebdbb42e72335ca65335c855a82b99537c7606.tar.gz
portage-c3ebdbb42e72335ca65335c855a82b99537c7606.tar.bz2
portage-c3ebdbb42e72335ca65335c855a82b99537c7606.zip
elog/mod_custom: Spawn processes in background
Since elog_process is typically called while the event loop is running, hold references to spawned processes and wait for them asynchronously, ultimately waiting for them if necessary when the AsyncioEventLoop _close_main method calls _async_finalize via portage.process.run_coroutine_exitfuncs(). ConfigProtectTestCase is useful for exercising this code, and this little make.globals patch can be used to test failure during finalize with this error message: !!! PORTAGE_ELOG_COMMAND failed with exitcode 1 --- a/cnf/make.globals +++ b/cnf/make.globals @@ -144 +144,2 @@ PORTAGE_ELOG_CLASSES="log warn error" -PORTAGE_ELOG_SYSTEM="save_summary:log,warn,error,qa echo" +PORTAGE_ELOG_SYSTEM="save_summary:log,warn,error,qa echo custom" +PORTAGE_ELOG_COMMAND="/bin/false" Bug: https://bugs.gentoo.org/925907 Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/portage/elog/mod_custom.py73
-rw-r--r--lib/portage/process.py29
-rw-r--r--lib/portage/util/_eventloop/asyncio_event_loop.py8
3 files changed, 105 insertions, 5 deletions
diff --git a/lib/portage/elog/mod_custom.py b/lib/portage/elog/mod_custom.py
index e0ae77e10..a3e199bcb 100644
--- a/lib/portage/elog/mod_custom.py
+++ b/lib/portage/elog/mod_custom.py
@@ -1,10 +1,33 @@
# elog/mod_custom.py - elog dispatch module
-# Copyright 2006-2020 Gentoo Authors
+# Copyright 2006-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import types
+
+import portage
import portage.elog.mod_save
import portage.exception
import portage.process
+from portage.util.futures import asyncio
+
+# Since elog_process is typically called while the event loop is
+# running, hold references to spawned processes and wait for them
+# asynchronously, ultimately waiting for them if necessary when
+# the AsyncioEventLoop _close_main method calls _async_finalize
+# via portage.process.run_coroutine_exitfuncs().
+_proc_refs = None
+
+
+def _get_procs() -> list[tuple[portage.process.MultiprocessingProcess, asyncio.Future]]:
+ """
+ Return list of (proc, asyncio.ensure_future(proc.wait())) which is not
+ inherited from the parent after fork.
+ """
+ global _proc_refs
+ if _proc_refs is None or _proc_refs.pid != portage.getpid():
+ _proc_refs = types.SimpleNamespace(pid=portage.getpid(), procs=[])
+ portage.process.atexit_register(_async_finalize)
+ return _proc_refs.procs
def process(mysettings, key, logentries, fulltext):
@@ -18,8 +41,50 @@ def process(mysettings, key, logentries, fulltext):
mylogcmd = mysettings["PORTAGE_ELOG_COMMAND"]
mylogcmd = mylogcmd.replace("${LOGFILE}", elogfilename)
mylogcmd = mylogcmd.replace("${PACKAGE}", key)
- retval = portage.process.spawn_bash(mylogcmd)
- if retval != 0:
+ loop = asyncio.get_event_loop()
+ proc = portage.process.spawn_bash(mylogcmd, returnproc=True)
+ procs = _get_procs()
+ procs.append((proc, asyncio.ensure_future(proc.wait(), loop=loop)))
+ for index, (proc, waiter) in reversed(list(enumerate(procs))):
+ if not waiter.done():
+ continue
+ del procs[index]
+ if waiter.result() != 0:
+ raise portage.exception.PortageException(
+ f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}"
+ )
+
+
+async def _async_finalize():
+ """
+ Async finalize is preferred, since we can wait for process exit status.
+ """
+ procs = _get_procs()
+ while procs:
+ proc, waiter = procs.pop()
+ if (await waiter) != 0:
+ raise portage.exception.PortageException(
+ f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}"
+ )
+
+
+def finalize():
+ """
+ NOTE: This raises PortageException if there are any processes
+ still running, so it's better to use _async_finalize instead
+ (invoked via portage.process.run_coroutine_exitfuncs() in
+ the AsyncioEventLoop _close_main method).
+ """
+ procs = _get_procs()
+ while procs:
+ proc, waiter = procs.pop()
+ if not waiter.done():
+ waiter.cancel()
+ proc.terminate()
+ raise portage.exception.PortageException(
+ f"!!! PORTAGE_ELOG_COMMAND was killed after it was found running in the background (pid {proc.pid})"
+ )
+ elif waiter.result() != 0:
raise portage.exception.PortageException(
- "!!! PORTAGE_ELOG_COMMAND failed with exitcode %d" % retval
+ f"!!! PORTAGE_ELOG_COMMAND failed with exitcode {waiter.result()}"
)
diff --git a/lib/portage/process.py b/lib/portage/process.py
index b652e3294..1bc0c507c 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -6,6 +6,7 @@
import atexit
import errno
import fcntl
+import io
import logging
import multiprocessing
import platform
@@ -226,6 +227,34 @@ def run_exitfuncs():
raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
+async def run_coroutine_exitfuncs():
+ """
+ This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction
+ to check which functions to run. It is called by the AsyncioEventLoop
+ _close_main method just before the loop is closed.
+ """
+ tasks = []
+ for index, (func, targs, kargs) in reversed(list(enumerate(_exithandlers))):
+ if asyncio.iscoroutinefunction(func):
+ del _exithandlers[index]
+ tasks.append(asyncio.ensure_future(func(*targs, **kargs)))
+ tracebacks = []
+ exc_info = None
+ for task in tasks:
+ try:
+ await task
+ except Exception:
+ file = io.StringIO()
+ traceback.print_exc(file=file)
+ tracebacks.append(file.getvalue())
+ exc_info = sys.exc_info()
+ if len(tracebacks) > 1:
+ for tb in tracebacks[:-1]:
+ print(tb, file=sys.stderr, flush=True)
+ if exc_info is not None:
+ raise exc_info[1].with_traceback(exc_info[2])
+
+
atexit.register(run_exitfuncs)
# It used to be necessary for API consumers to remove pids from spawned_pids,
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index ee9e4c60e..a598b1b51 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -79,8 +79,14 @@ class AsyncioEventLoop(_AbstractEventLoop):
# we can properly wait for it and avoid messages like this:
# [ERROR] Task was destroyed but it is pending!
if socks5.proxy.is_running():
- await socks5.proxy.stop()
+ # TODO: Convert socks5.proxy.stop() to a regular coroutine
+ # function so that it doesn't need to be wrapped like this.
+ async def stop_socks5_proxy():
+ await socks5.proxy.stop()
+ portage.process.atexit_register(stop_socks5_proxy)
+
+ await portage.process.run_coroutine_exitfuncs()
portage.process.run_exitfuncs()
@staticmethod