How can I handle KeyboardInterrupt events with python's multiprocessing Pools? Here is a simple example:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.
I want to be able to press ^C at any time and cause all of the processes to exit gracefully.
-
I solved my problem using psutil, you can see the solution here: stackoverflow.com/questions/32160054/…Tiago Albineli Motta– Tiago Albineli Motta2017年07月22日 22:50:05 +00:00Commented Jul 22, 2017 at 22:50
11 Answers 11
This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.
Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace
results = pool.map(slowly_square, range(40))
with
results = pool.map_async(slowly_square, range(40)).get(9999999)
or similar.
11 Comments
From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.
8 Comments
time.sleep(10) in the main process. If you were to remove that sleep, or if you wait until the process attempts to join on the pool, which you have to do in order to guarantee the jobs are complete, then you still suffer from the same problem which is the main process doesn't receive the KeyboardInterrupt while it it waiting on a the poll join operation.pool.terminate() never gets executed. Having the children ignore the signal accomplishes nothing. @Glenn's answer solves the problem..join() except on interrupt - it simply manually checks the result of .apply_async() using AsyncResult.ready() to see if it is ready, meaning we've cleanly finished.For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
Normally you would get the following output:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
So if you hit ^C, you will get:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
2 Comments
KeyboardInterrupt is arrived while multiprocessing is performing its own IPC data exchange then the try..catch will not be activated (obviously).raise KeyboardInterruptError with a return. You just have to make sure that the child process ends as soon as KeyboardInterrupt is received. The return value seems to be ignored, in main still the KeyboardInterrupt is received.The voted answer does not tackle the core issue but a similar side effect.
Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
5 Comments
os.setpgrp() from inside the futureProcessPoolExecutor does not support initializer functions. On Unix, you could leverage the fork strategy by disabling the sighandler on the main process before creating the Pool and re-enabling it afterwards. In pebble, I silence SIGINT on the child processes by default. I am not aware of the reason they don't do the same with the Python Pools. At the end, the user could re-set the SIGINT handler in case he/she wants to hurt himself/herself.Many of these answers are old and/or they do not seem to work with later versions of Python (I am running 3.8.5) on Windows if you are executing a method such as Pool.map, which blocks until all the submitted tasks have completed. The following is my solution.
- Issue a call to
signal.signal(signal.SIGINT, signal.SIG_IGN)in the main process to ignore Ctrl-C altogether. - The processing pool will be initialized with a pool initializer that will initialize each processor thusly: Global variable
ctrl_c_enteredwill be set toFalseand a a call tosignal.signal(signal.SIGINT, signal.SIG_IGN)will be issued to initially ignore Ctrl-C. The return value from this call will be saved; this is the original, default handler that when re-established allows handing ofKyboardInterruptexceptions. - A decorator,
handle_ctrl_c, can be used to decorate multiprocessing functions and methods that should exit immediately on Ctrl-C being entered. This decorator will test to see if the globalctrl_c_enteredflag is set and if so ,not even bother to run the function/method and instead will return aKeyboardInterruptexception instance. Otherwise a try/catch handler for aKeyboardInterruptwill be established and the decorated function/method will be invoked. If Ctrl-C is entered, globalctrl_c_enteredwill be set toTrueand aKeyboardInterruptexception instance will be returned. In any event, before returning the decorator will re-establish the SIG_IGN handler.
In essence all submitted tasks will be allowed to start but will immediately terminate with a return value of a KeyBoardInterrupt exception once a Ctrl-C has been entered. The main process can test the return values for the presence of such a return value to detect whether a Ctrl-C was entered.
from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
@handle_ctrl_c
def slowly_square(i):
sleep(1)
return i*i
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(slowly_square, range(10))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Prints:
Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
3 Comments
KeyboardInterrupt() for all unprocessed members of the list, which will evaluate to True and print nothing. I found it helpful to return None instead.slowly_square in my example ) happens to return None.? Then it is more useful to be able to distinguish the worker function completing normally vs, not being run at all due to a previous keyboard interrupt as is done in function main. It doesn't really matter what is returned by handle_ctrl_c when there has been a keyboard interrupt as long as it can be distinguished from any possible value a worker function might return. Perhaps this should be an instance of a specially defined empty class, e.g. UnProcessed.Usually this simple structure works for Ctrl-C on Pool :
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
As was stated in few similar posts:
1 Comment
It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don't finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn't catch exceptions that don't inherit from Exception (e.g., SystemExit). So here's my solution that deals with both of these:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
2 Comments
function is fairly long-lived (hundreds of seconds).map and all is good. @Linux Cli Aik provided a solution below that produces this behavior. Using map_async is not always desired if the main thread is depended on the results from the child processes.I'm a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author's code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It's close to what I wanna achieve.
I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
The part starting at pool.terminate() never seems to execute.
1 Comment
map_async onto the user, which I don't particularly like. In many situations, like mine, the main thread needs to wait for the individual processes to finish. This is one of the reasons why map exists!I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
2 Comments
You can try using the apply_async method of a Pool object, like this:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
Output:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
An advantage of this method is that results processed before interruption will be returned in the results dictionary:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
4 Comments
[value] rather than value in jobs[value] = pool.apply_async(input_function, [value])?Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written... try changing slowly_square to:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
That should work as you expected.
2 Comments
Explore related questions
See similar questions with these tags.