changeset: 76712:26bbff4562a7 branch: 2.7 parent: 76708:3b2aa777b725 user: Richard Oudkerk date: Wed May 02 16:36:26 2012 +0100 files: Lib/multiprocessing/pool.py Lib/test/test_multiprocessing.py description: Issue #9400: Partial backport of fix for #9244 In multiprocessing, a pool worker process would die if the result/error could not be pickled. This could cause pool methods to hang. In 3.x this was fixed by 0aa8af79359d (which also added an error_callback argument to some methods), but the fix was not back ported. diff -r 3b2aa777b725 -r 26bbff4562a7 Lib/multiprocessing/pool.py --- a/Lib/multiprocessing/pool.py Wed May 02 07:59:36 2012 +0800 +++ b/Lib/multiprocessing/pool.py Wed May 02 16:36:26 2012 +0100 @@ -68,6 +68,23 @@ # Code run by worker processes # +class MaybeEncodingError(Exception): + """Wraps possible unpickleable errors, so they can be + safely sent through the socket.""" + + def __init__(self, exc, value): + self.exc = repr(exc) + self.value = repr(value) + super(MaybeEncodingError, self).__init__(self.exc, self.value) + + def __str__(self): + return "Error sending result: '%s'. Reason: '%s'" % (self.value, + self.exc) + + def __repr__(self): + return "" % str(self) + + def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks> 0) put = outqueue.put @@ -96,7 +113,13 @@ result = (True, func(*args, **kwds)) except Exception, e: result = (False, e) - put((job, i, result)) + try: + put((job, i, result)) + except Exception as e: + wrapped = MaybeEncodingError(e, result[1]) + debug("Possible encoding error while sending result: %s" % ( + wrapped)) + put((job, i, (False, wrapped))) completed += 1 debug('worker exiting after %d tasks' % completed) diff -r 3b2aa777b725 -r 26bbff4562a7 Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Wed May 02 07:59:36 2012 +0800 +++ b/Lib/test/test_multiprocessing.py Wed May 02 16:36:26 2012 +0100 @@ -1152,6 +1152,24 @@ join() self.assertTrue(join.elapsed < 0.2) +def unpickleable_result(): + return lambda: 42 + +class _TestPoolWorkerErrors(BaseTestCase): + ALLOWED_TYPES = ('processes', ) + + def test_unpickleable_result(self): + from multiprocessing.pool import MaybeEncodingError + p = multiprocessing.Pool(2) + + # Make sure we don't lose pool processes because of encoding errors. + for iteration in range(20): + res = p.apply_async(unpickleable_result) + self.assertRaises(MaybeEncodingError, res.get) + + p.close() + p.join() + class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', )

AltStyle によって変換されたページ (->オリジナル) /