Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Feb 13, 2025. It is now read-only.

Commit 13ed079

Browse files
bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (python#13630)
1 parent c529967 commit 13ed079

File tree

4 files changed

+262
-54
lines changed

4 files changed

+262
-54
lines changed

‎Lib/asyncio/unix_events.py

Lines changed: 212 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import errno
44
import io
5+
import itertools
56
import os
67
import selectors
78
import signal
@@ -29,7 +30,9 @@
2930
__all__ = (
3031
'SelectorEventLoop',
3132
'AbstractChildWatcher', 'SafeChildWatcher',
32-
'FastChildWatcher', 'DefaultEventLoopPolicy',
33+
'FastChildWatcher',
34+
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
35+
'DefaultEventLoopPolicy',
3336
)
3437

3538

@@ -184,6 +187,13 @@ async def _make_subprocess_transport(self, protocol, args, shell,
184187
stdin, stdout, stderr, bufsize,
185188
extra=None, **kwargs):
186189
with events.get_child_watcher() as watcher:
190+
if not watcher.is_active():
191+
# Check early.
192+
# Raising exception before process creation
193+
# prevents subprocess execution if the watcher
194+
# is not ready to handle it.
195+
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
196+
"subproccess support is not installed.")
187197
waiter = self.create_future()
188198
transp = _UnixSubprocessTransport(self, protocol, args, shell,
189199
stdin, stdout, stderr, bufsize,
@@ -838,6 +848,15 @@ def close(self):
838848
"""
839849
raise NotImplementedError()
840850

851+
def is_active(self):
852+
"""Watcher status.
853+
854+
Return True if the watcher is installed and ready to handle process exit
855+
notifications.
856+
857+
"""
858+
raise NotImplementedError()
859+
841860
def __enter__(self):
842861
"""Enter the watcher's context and allow starting new processes
843862
@@ -849,6 +868,20 @@ def __exit__(self, a, b, c):
849868
raise NotImplementedError()
850869

851870

871+
def _compute_returncode(status):
872+
if os.WIFSIGNALED(status):
873+
# The child process died because of a signal.
874+
return -os.WTERMSIG(status)
875+
elif os.WIFEXITED(status):
876+
# The child process exited (e.g sys.exit()).
877+
return os.WEXITSTATUS(status)
878+
else:
879+
# The child exited, but we don't understand its status.
880+
# This shouldn't happen, but if it does, let's just
881+
# return that status; perhaps that helps debug it.
882+
return status
883+
884+
852885
class BaseChildWatcher(AbstractChildWatcher):
853886

854887
def __init__(self):
@@ -858,6 +891,9 @@ def __init__(self):
858891
def close(self):
859892
self.attach_loop(None)
860893

894+
def is_active(self):
895+
return self._loop is not None and self._loop.is_running()
896+
861897
def _do_waitpid(self, expected_pid):
862898
raise NotImplementedError()
863899

@@ -898,19 +934,6 @@ def _sig_chld(self):
898934
'exception': exc,
899935
})
900936

901-
def _compute_returncode(self, status):
902-
if os.WIFSIGNALED(status):
903-
# The child process died because of a signal.
904-
return -os.WTERMSIG(status)
905-
elif os.WIFEXITED(status):
906-
# The child process exited (e.g sys.exit()).
907-
return os.WEXITSTATUS(status)
908-
else:
909-
# The child exited, but we don't understand its status.
910-
# This shouldn't happen, but if it does, let's just
911-
# return that status; perhaps that helps debug it.
912-
return status
913-
914937

915938
class SafeChildWatcher(BaseChildWatcher):
916939
"""'Safe' child watcher implementation.
@@ -934,11 +957,6 @@ def __exit__(self, a, b, c):
934957
pass
935958

936959
def add_child_handler(self, pid, callback, *args):
937-
if self._loop is None:
938-
raise RuntimeError(
939-
"Cannot add child handler, "
940-
"the child watcher does not have a loop attached")
941-
942960
self._callbacks[pid] = (callback, args)
943961

944962
# Prevent a race condition in case the child is already terminated.
@@ -974,7 +992,7 @@ def _do_waitpid(self, expected_pid):
974992
# The child process is still alive.
975993
return
976994

977-
returncode = self._compute_returncode(status)
995+
returncode = _compute_returncode(status)
978996
if self._loop.get_debug():
979997
logger.debug('process %s exited with returncode %s',
980998
expected_pid, returncode)
@@ -1035,11 +1053,6 @@ def __exit__(self, a, b, c):
10351053
def add_child_handler(self, pid, callback, *args):
10361054
assert self._forks, "Must use the context manager"
10371055

1038-
if self._loop is None:
1039-
raise RuntimeError(
1040-
"Cannot add child handler, "
1041-
"the child watcher does not have a loop attached")
1042-
10431056
with self._lock:
10441057
try:
10451058
returncode = self._zombies.pop(pid)
@@ -1072,7 +1085,7 @@ def _do_waitpid_all(self):
10721085
# A child process is still alive.
10731086
return
10741087

1075-
returncode = self._compute_returncode(status)
1088+
returncode = _compute_returncode(status)
10761089

10771090
with self._lock:
10781091
try:
@@ -1101,6 +1114,177 @@ def _do_waitpid_all(self):
11011114
callback(pid, returncode, *args)
11021115

11031116

1117+
class MultiLoopChildWatcher(AbstractChildWatcher):
1118+
# The class keeps compatibility with AbstractChildWatcher ABC
1119+
# To achieve this it has empty attach_loop() method
1120+
# and doesn't accept explicit loop argument
1121+
# for add_child_handler()/remove_child_handler()
1122+
# but retrieves the current loop by get_running_loop()
1123+
1124+
def __init__(self):
1125+
self._callbacks = {}
1126+
self._saved_sighandler = None
1127+
1128+
def is_active(self):
1129+
return self._saved_sighandler is not None
1130+
1131+
def close(self):
1132+
self._callbacks.clear()
1133+
if self._saved_sighandler is not None:
1134+
handler = signal.getsignal(signal.SIGCHLD)
1135+
if handler != self._sig_chld:
1136+
logger.warning("SIGCHLD handler was changed by outside code")
1137+
else:
1138+
signal.signal(signal.SIGCHLD, self._saved_sighandler)
1139+
self._saved_sighandler = None
1140+
1141+
def __enter__(self):
1142+
return self
1143+
1144+
def __exit__(self, exc_type, exc_val, exc_tb):
1145+
pass
1146+
1147+
def add_child_handler(self, pid, callback, *args):
1148+
loop = events.get_running_loop()
1149+
self._callbacks[pid] = (loop, callback, args)
1150+
1151+
# Prevent a race condition in case the child is already terminated.
1152+
self._do_waitpid(pid)
1153+
1154+
def remove_child_handler(self, pid):
1155+
try:
1156+
del self._callbacks[pid]
1157+
return True
1158+
except KeyError:
1159+
return False
1160+
1161+
def attach_loop(self, loop):
1162+
# Don't save the loop but initialize itself if called first time
1163+
# The reason to do it here is that attach_loop() is called from
1164+
# unix policy only for the main thread.
1165+
# Main thread is required for subscription on SIGCHLD signal
1166+
if self._saved_sighandler is None:
1167+
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1168+
if self._saved_sighandler is None:
1169+
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1170+
"restore to default handler on watcher close.")
1171+
self._saved_sighandler = signal.SIG_DFL
1172+
1173+
# Set SA_RESTART to limit EINTR occurrences.
1174+
signal.siginterrupt(signal.SIGCHLD, False)
1175+
1176+
def _do_waitpid_all(self):
1177+
for pid in list(self._callbacks):
1178+
self._do_waitpid(pid)
1179+
1180+
def _do_waitpid(self, expected_pid):
1181+
assert expected_pid > 0
1182+
1183+
try:
1184+
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1185+
except ChildProcessError:
1186+
# The child process is already reaped
1187+
# (may happen if waitpid() is called elsewhere).
1188+
pid = expected_pid
1189+
returncode = 255
1190+
logger.warning(
1191+
"Unknown child process pid %d, will report returncode 255",
1192+
pid)
1193+
debug_log = False
1194+
else:
1195+
if pid == 0:
1196+
# The child process is still alive.
1197+
return
1198+
1199+
returncode = _compute_returncode(status)
1200+
debug_log = True
1201+
try:
1202+
loop, callback, args = self._callbacks.pop(pid)
1203+
except KeyError: # pragma: no cover
1204+
# May happen if .remove_child_handler() is called
1205+
# after os.waitpid() returns.
1206+
logger.warning("Child watcher got an unexpected pid: %r",
1207+
pid, exc_info=True)
1208+
else:
1209+
if loop.is_closed():
1210+
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1211+
else:
1212+
if debug_log and loop.get_debug():
1213+
logger.debug('process %s exited with returncode %s',
1214+
expected_pid, returncode)
1215+
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1216+
1217+
def _sig_chld(self, signum, frame):
1218+
try:
1219+
self._do_waitpid_all()
1220+
except (SystemExit, KeyboardInterrupt):
1221+
raise
1222+
except BaseException:
1223+
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1224+
1225+
1226+
class ThreadedChildWatcher(AbstractChildWatcher):
1227+
# The watcher uses a thread per process
1228+
# for waiting for the process finish.
1229+
# It doesn't require subscription on POSIX signal
1230+
1231+
def __init__(self):
1232+
self._pid_counter = itertools.count(0)
1233+
1234+
def is_active(self):
1235+
return True
1236+
1237+
def close(self):
1238+
pass
1239+
1240+
def __enter__(self):
1241+
return self
1242+
1243+
def __exit__(self, exc_type, exc_val, exc_tb):
1244+
pass
1245+
1246+
def add_child_handler(self, pid, callback, *args):
1247+
loop = events.get_running_loop()
1248+
thread = threading.Thread(target=self._do_waitpid,
1249+
name=f"waitpid-{next(self._pid_counter)}",
1250+
args=(loop, pid, callback, args),
1251+
daemon=True)
1252+
thread.start()
1253+
1254+
def remove_child_handler(self, pid):
1255+
# asyncio never calls remove_child_handler() !!!
1256+
# The method is no-op but is implemented because
1257+
# abstract base classe requires it
1258+
return True
1259+
1260+
def attach_loop(self, loop):
1261+
pass
1262+
1263+
def _do_waitpid(self, loop, expected_pid, callback, args):
1264+
assert expected_pid > 0
1265+
1266+
try:
1267+
pid, status = os.waitpid(expected_pid, 0)
1268+
except ChildProcessError:
1269+
# The child process is already reaped
1270+
# (may happen if waitpid() is called elsewhere).
1271+
pid = expected_pid
1272+
returncode = 255
1273+
logger.warning(
1274+
"Unknown child process pid %d, will report returncode 255",
1275+
pid)
1276+
else:
1277+
returncode = _compute_returncode(status)
1278+
if loop.get_debug():
1279+
logger.debug('process %s exited with returncode %s',
1280+
expected_pid, returncode)
1281+
1282+
if loop.is_closed():
1283+
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1284+
else:
1285+
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1286+
1287+
11041288
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
11051289
"""UNIX event loop policy with a watcher for child processes."""
11061290
_loop_factory = _UnixSelectorEventLoop
@@ -1112,7 +1296,7 @@ def __init__(self):
11121296
def _init_watcher(self):
11131297
with events._lock:
11141298
if self._watcher is None: # pragma: no branch
1115-
self._watcher = SafeChildWatcher()
1299+
self._watcher = ThreadedChildWatcher()
11161300
if isinstance(threading.current_thread(),
11171301
threading._MainThread):
11181302
self._watcher.attach_loop(self._local._loop)
@@ -1134,7 +1318,7 @@ def set_event_loop(self, loop):
11341318
def get_child_watcher(self):
11351319
"""Get the watcher for child processes.
11361320
1137-
If not yet set, a SafeChildWatcher object is automatically created.
1321+
If not yet set, a ThreadedChildWatcher object is automatically created.
11381322
"""
11391323
if self._watcher is None:
11401324
self._init_watcher()

‎Lib/test/test_asyncio/test_subprocess.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ async def execute():
633633

634634
self.assertIsNone(self.loop.run_until_complete(execute()))
635635

636+
636637
if sys.platform != 'win32':
637638
# Unix
638639
class SubprocessWatcherMixin(SubprocessMixin):
@@ -648,7 +649,24 @@ def setUp(self):
648649
watcher = self.Watcher()
649650
watcher.attach_loop(self.loop)
650651
policy.set_child_watcher(watcher)
651-
self.addCleanup(policy.set_child_watcher, None)
652+
653+
def tearDown(self):
654+
super().setUp()
655+
policy = asyncio.get_event_loop_policy()
656+
watcher = policy.get_child_watcher()
657+
policy.set_child_watcher(None)
658+
watcher.attach_loop(None)
659+
watcher.close()
660+
661+
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
662+
test_utils.TestCase):
663+
664+
Watcher = unix_events.ThreadedChildWatcher
665+
666+
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
667+
test_utils.TestCase):
668+
669+
Watcher = unix_events.MultiLoopChildWatcher
652670

653671
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
654672
test_utils.TestCase):
@@ -670,5 +688,25 @@ def setUp(self):
670688
self.set_event_loop(self.loop)
671689

672690

691+
class GenericWatcherTests:
692+
693+
def test_create_subprocess_fails_with_inactive_watcher(self):
694+
695+
async def execute():
696+
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
697+
watcher.is_active.return_value = False
698+
asyncio.set_child_watcher(watcher)
699+
700+
with self.assertRaises(RuntimeError):
701+
await subprocess.create_subprocess_exec(
702+
support.FakePath(sys.executable), '-c', 'pass')
703+
704+
watcher.add_child_handler.assert_not_called()
705+
706+
self.assertIsNone(self.loop.run_until_complete(execute()))
707+
708+
709+
710+
673711
if __name__ == '__main__':
674712
unittest.main()

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /