173

How can I handle KeyboardInterrupt events with python's multiprocessing Pools? Here is a simple example:

from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
 sleep(1)
 return i*i
def go():
 pool = Pool(8)
 try:
 results = pool.map(slowly_square, range(40))
 except KeyboardInterrupt:
 # **** THIS PART NEVER EXECUTES. ****
 pool.terminate()
 print "You cancelled the program!"
 sys.exit(1)
 print "\nFinally, here are the results: ", results
if __name__ == "__main__":
 go()

When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.

I want to be able to press ^C at any time and cause all of the processes to exit gracefully.

asked Sep 10, 2009 at 23:59
1

11 Answers 11

148

This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

 results = pool.map(slowly_square, range(40))

with

 results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.

answered Sep 11, 2009 at 0:45
Sign up to request clarification or add additional context in comments.

11 Comments

Is this bug in the official python tracker anywhere? I'm having trouble finding it but I'm probably just not using the best search terms.
This bug has been filed as [Issue 8296][1]. [1]: bugs.python.org/issue8296
This doesn't quite fix things. Sometimes I get the expected behavior when I press Control+C, other times not. I'm not sure why, but it looks like maybe The KeyboardInterrupt is received by one of the processes at random, and I only get the correct behavior if the parent process is the one that catches it.
This doesn't work for me with Python 3.6.1 on Windows. I get tons of stack traces and other garbage when I do Ctrl-C, i.e. same as without such workaround. In fact none of the solutions I've tried from this thread seem to work...
Jehejj, it's still not fixed in 2019. Like doing IO in paralel is a novel idea :/
|
67

From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal
...
def init_worker():
 signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
 pool = multiprocessing.Pool(size, init_worker)
 ...
 except KeyboardInterrupt:
 pool.terminate()
 pool.join()

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.

evandrix
6,2464 gold badges30 silver badges38 bronze badges
answered May 31, 2011 at 18:39

8 Comments

Hi John. Your solution doesn't accomplish the same thing as my, yes unfortunately complicated, solution. It hides behind the time.sleep(10) in the main process. If you were to remove that sleep, or if you wait until the process attempts to join on the pool, which you have to do in order to guarantee the jobs are complete, then you still suffer from the same problem which is the main process doesn't receive the KeyboardInterrupt while it it waiting on a the poll join operation.
In the case of where I used this code in production, the time.sleep() was part of a loop that would check the status of each child process, and then restart certain processes on a delay if necessary. Rather than join() that would wait on all processes to complete, it would check on them individually, ensuring that the master process stayed responsive.
So it was more a busy wait (maybe with small sleeps between checks) that polled for process completion via another method rather than join? If that's the case, perhaps it would be better to include this code in your blog post, since you can then guarantee that all the workers have completed before attempting to join.
This doesn't work. Only the children are sent the signal. The parent never receives it, so pool.terminate() never gets executed. Having the children ignore the signal accomplishes nothing. @Glenn's answer solves the problem.
My version of this is at gist.github.com/admackin/003dd646e5fadee8b8d6 ; it doesn't call .join() except on interrupt - it simply manually checks the result of .apply_async() using AsyncResult.ready() to see if it is ready, meaning we've cleanly finished.
|
34

For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
 try:
 time.sleep(x)
 return x
 except KeyboardInterrupt:
 raise KeyboardInterruptError()
def main():
 p = Pool(processes=4)
 try:
 print 'starting the pool map'
 print p.map(f, range(10))
 p.close()
 print 'pool map complete'
 except KeyboardInterrupt:
 print 'got ^C while pool mapping, terminating the pool'
 p.terminate()
 print 'pool is terminated'
 except Exception, e:
 print 'got exception: %r, terminating the pool' % (e,)
 p.terminate()
 print 'pool is terminated'
 finally:
 print 'joining pool processes'
 p.join()
 print 'join complete'
 print 'the end'
if __name__ == '__main__':
 main()

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
answered Apr 1, 2010 at 16:06

2 Comments

It seems that this is not a complete solution. If a KeyboardInterrupt is arrived while multiprocessing is performing its own IPC data exchange then the try..catch will not be activated (obviously).
You could replace raise KeyboardInterruptError with a return. You just have to make sure that the child process ends as soon as KeyboardInterrupt is received. The return value seems to be ignored, in main still the KeyboardInterrupt is received.
20

The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool
def initializer():
 """Ignore CTRL+C in the worker process."""
 signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
 pool.map(perform_download, dowloads)
except KeyboardInterrupt:
 pool.terminate()
 pool.join()
Carlos Mermingas
3,9122 gold badges25 silver badges41 bronze badges
answered Jul 2, 2017 at 9:43

5 Comments

I've found that ProcessPoolExecutor also has the same issue. The only fix I was able to find was to call os.setpgrp() from inside the future
Sure, the only difference is that ProcessPoolExecutor does not support initializer functions. On Unix, you could leverage the fork strategy by disabling the sighandler on the main process before creating the Pool and re-enabling it afterwards. In pebble, I silence SIGINT on the child processes by default. I am not aware of the reason they don't do the same with the Python Pools. At the end, the user could re-set the SIGINT handler in case he/she wants to hurt himself/herself.
This solution seems to prevent Ctrl-C from interrupting the main process as well.
I just tested on Python 3.5 and it works, what version of Python are you using? What OS?
This should probably be the accepted answer; it's the simplest answer in my eyes, and modifies the behavior to be what the OP (and apparently many others) expect.
19

Many of these answers are old and/or they do not seem to work with later versions of Python (I am running 3.8.5) on Windows if you are executing a method such as Pool.map, which blocks until all the submitted tasks have completed. The following is my solution.

  1. Issue a call to signal.signal(signal.SIGINT, signal.SIG_IGN) in the main process to ignore Ctrl-C altogether.
  2. The processing pool will be initialized with a pool initializer that will initialize each processor thusly: Global variable ctrl_c_entered will be set to False and a a call to signal.signal(signal.SIGINT, signal.SIG_IGN) will be issued to initially ignore Ctrl-C. The return value from this call will be saved; this is the original, default handler that when re-established allows handing of KyboardInterrupt exceptions.
  3. A decorator, handle_ctrl_c, can be used to decorate multiprocessing functions and methods that should exit immediately on Ctrl-C being entered. This decorator will test to see if the global ctrl_c_entered flag is set and if so ,not even bother to run the function/method and instead will return a KeyboardInterrupt exception instance. Otherwise a try/catch handler for a KeyboardInterrupt will be established and the decorated function/method will be invoked. If Ctrl-C is entered, global ctrl_c_entered will be set to True and a KeyboardInterrupt exception instance will be returned. In any event, before returning the decorator will re-establish the SIG_IGN handler.

In essence all submitted tasks will be allowed to start but will immediately terminate with a return value of a KeyBoardInterrupt exception once a Ctrl-C has been entered. The main process can test the return values for the presence of such a return value to detect whether a Ctrl-C was entered.

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps
def handle_ctrl_c(func):
 @wraps(func)
 def wrapper(*args, **kwargs):
 global ctrl_c_entered
 if not ctrl_c_entered:
 signal.signal(signal.SIGINT, default_sigint_handler) # the default
 try:
 return func(*args, **kwargs)
 except KeyboardInterrupt:
 ctrl_c_entered = True
 return KeyboardInterrupt()
 finally:
 signal.signal(signal.SIGINT, pool_ctrl_c_handler)
 else:
 return KeyboardInterrupt()
 return wrapper
@handle_ctrl_c
def slowly_square(i):
 sleep(1)
 return i*i
def pool_ctrl_c_handler(*args, **kwargs):
 global ctrl_c_entered
 ctrl_c_entered = True
def init_pool():
 # set global variable for each process in the pool:
 global ctrl_c_entered
 global default_sigint_handler
 ctrl_c_entered = False
 default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
def main():
 signal.signal(signal.SIGINT, signal.SIG_IGN)
 pool = Pool(initializer=init_pool)
 results = pool.map(slowly_square, range(10))
 if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
 print('Ctrl-C was entered.')
 print(results)
 pool.close()
 pool.join()
if __name__ == '__main__':
 main()

Prints:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
answered Aug 7, 2021 at 19:14

3 Comments

Confirmed this works as expected on Python 3.7.7 on Windows. Thanks for posting!
As shown above, this returns KeyboardInterrupt() for all unprocessed members of the list, which will evaluate to True and print nothing. I found it helpful to return None instead.
@JonathanRys And what if the worker function being used (which is slowly_square in my example ) happens to return None.? Then it is more useful to be able to distinguish the worker function completing normally vs, not being run at all due to a previous keyboard interrupt as is done in function main. It doesn't really matter what is returned by handle_ctrl_c when there has been a keyboard interrupt as long as it can be distinguished from any possible value a worker function might return. Perhaps this should be an instance of a specially defined empty class, e.g. UnProcessed.
10

Usually this simple structure works for Ctrl-C on Pool :

def signal_handle(_signal, frame):
 print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)

As was stated in few similar posts:

Capture keyboardinterrupt in Python without try-except

answered Oct 31, 2012 at 13:44

1 Comment

This would have to be done on each of the worker processes as well, and may still fail if the KeyboardInterrupt is raised while the multiprocessing library is initializing.
5

It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don't finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn't catch exceptions that don't inherit from Exception (e.g., SystemExit). So here's my solution that deals with both of these:

import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
 """Run function under the pool
 Wrapper around function to catch exceptions that don't inherit from
 Exception (which aren't caught by multiprocessing, so that you end
 up hitting the timeout).
 """
 try:
 return function(arg)
 except:
 cls, exc, tb = sys.exc_info()
 if issubclass(cls, Exception):
 raise # No worries
 # Need to wrap the exception with something multiprocessing will recognise
 import traceback
 print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
 raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
 """Run the pool
 Wrapper around pool.map_async, to handle timeout. This is required so as to
 trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
 http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
 Further wraps the function in _poolFunctionWrapper to catch exceptions
 that don't inherit from Exception.
 """
 return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
 """Run the function on the iterable, optionally with multiprocessing"""
 if numProcesses > 1:
 pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
 mapFunc = functools.partial(_runPool, pool, timeout)
 else:
 pool = None
 mapFunc = map
 results = mapFunc(function, iterable)
 if pool is not None:
 pool.close()
 pool.join()
 return results
answered May 15, 2014 at 15:23

2 Comments

I've not noticed any performance penalty, but in my case the function is fairly long-lived (hundreds of seconds).
This actually isn't the case anymore, at least from my eyes and experience. If you catch the keyboard exception in the individual child processes and catch it once more in the main process, then you can continue using map and all is good. @Linux Cli Aik provided a solution below that produces this behavior. Using map_async is not always desired if the main thread is depended on the results from the child processes.
5

I'm a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author's code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It's close to what I wanna achieve.

I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.

#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
 try:
 print "<slowly_square> Sleeping and later running a square calculation..."
 sleep(1)
 return i * i
 except KeyboardInterrupt:
 print "<child processor> Don't care if you say CtrlC"
 pass
def go():
 pool = Pool(8)
 try:
 results = pool.map(slowly_square, range(40))
 except KeyboardInterrupt:
 pool.terminate()
 pool.close()
 print "You cancelled the program!"
 exit(1)
 print "Finally, here are the results", results
if __name__ == '__main__':
 go()

The part starting at pool.terminate() never seems to execute.

fmw42
54.1k10 gold badges81 silver badges95 bronze badges
answered May 15, 2017 at 9:02

1 Comment

I just figured this out as well! I honestly think this is the best solution for a problem like this. The accepted solution forces map_async onto the user, which I don't particularly like. In many situations, like mine, the main thread needs to wait for the individual processes to finish. This is one of the reasons why map exists!
4

I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

answered Aug 26, 2010 at 17:16

2 Comments

Works like a charm. It's a clean solution and not some kind of hack (/me thinks).btw, the trick with .get(99999) as proposed by others hurts performance badly.
I've not noticed any performance penalty from using a timeout, though I have been using 9999 instead of 999999. The exception is when an exception that doesn't inherit from the Exception class is raised: then you have to wait until the timeout is hit. The solution to that is to catch all exceptions (see my solution).
4

You can try using the apply_async method of a Pool object, like this:

import multiprocessing
import time
from datetime import datetime
def test_func(x):
 time.sleep(2)
 return x**2
def apply_multiprocessing(input_list, input_function):
 pool_size = 5
 pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
 try:
 jobs = {}
 for value in input_list:
 jobs[value] = pool.apply_async(input_function, [value])
 results = {}
 for value, result in jobs.items():
 try:
 results[value] = result.get()
 except KeyboardInterrupt:
 print "Interrupted by user"
 pool.terminate()
 break
 except Exception as e:
 results[value] = e
 return results
 except Exception:
 raise
 finally:
 pool.close()
 pool.join()
if __name__ == "__main__":
 iterations = range(100)
 t0 = datetime.now()
 results1 = apply_multiprocessing(iterations, test_func)
 t1 = datetime.now()
 print results1
 print "Multi: {}".format(t1 - t0)
 t2 = datetime.now()
 results2 = {i: test_func(i) for i in iterations}
 t3 = datetime.now()
 print results2
 print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

An advantage of this method is that results processed before interruption will be returned in the results dictionary:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
answered Aug 14, 2018 at 5:30

4 Comments

Glorious and complete example
Excellent example.
Thank you. I'm trying to figure out how this generalizes to multiple arguments. In particular, why do you pass [value] rather than value in jobs[value] = pool.apply_async(input_function, [value])?
Would it be possible to have interrupted processes return an intermediate result instead?
-4

Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written... try changing slowly_square to:

def slowly_square(i):
 try:
 sleep(1)
 return i * i
 except KeyboardInterrupt:
 print 'You EVIL bastard!'
 return 0

That should work as you expected.

answered Sep 11, 2009 at 0:26

2 Comments

I tried this, and it doesn't actually terminate the entire set of jobs. It terminates the currently-running jobs, but the script still assigns the remaining jobs in the pool.map call as if everything is normal.
this is OK, but yuo may lose track of errors that occur. returning the error with a stacktrace might work so the parent process can tell that an error occurred, but it still doesn't exit immediately when the error occurs.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.