[Python-checkins] cpython (2.7): Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now

serhiy.storchaka python-checkins at python.org
Fri Mar 13 07:32:25 CET 2015


https://hg.python.org/cpython/rev/311d52878a65
changeset: 94973:311d52878a65
branch: 2.7
parent: 94951:1c45047c5102
user: Serhiy Storchaka <storchaka at gmail.com>
date: Fri Mar 13 08:31:34 2015 +0200
summary:
 Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
Potts.
files:
 Lib/multiprocessing/pool.py | 37 +++++++++-----
 Lib/test/test_multiprocessing.py | 47 ++++++++++++++++++++
 Misc/ACKS | 1 +
 Misc/NEWS | 4 +
 4 files changed, 75 insertions(+), 14 deletions(-)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -334,25 +334,34 @@
 thread = threading.current_thread()
 
 for taskseq, set_length in iter(taskqueue.get, None):
+ task = None
 i = -1
- for i, task in enumerate(taskseq):
- if thread._state:
- debug('task handler found thread._state != RUN')
- break
- try:
- put(task)
- except Exception as e:
- job, ind = task[:2]
+ try:
+ for i, task in enumerate(taskseq):
+ if thread._state:
+ debug('task handler found thread._state != RUN')
+ break
 try:
- cache[job]._set(ind, (False, e))
- except KeyError:
- pass
- else:
+ put(task)
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
+ else:
+ if set_length:
+ debug('doing set_length()')
+ set_length(i+1)
+ continue
+ break
+ except Exception as ex:
+ job, ind = task[:2] if task else (0, 0)
+ if job in cache:
+ cache[job]._set(ind + 1, (False, ex))
 if set_length:
 debug('doing set_length()')
 set_length(i+1)
- continue
- break
 else:
 debug('task handler got sentinel')
 
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1122,6 +1122,15 @@
 def sqr(x, wait=0.0):
 time.sleep(wait)
 return x*x
+
+class SayWhenError(ValueError): pass
+
+def exception_throwing_generator(total, when):
+ for i in range(total):
+ if i == when:
+ raise SayWhenError("Somebody said when")
+ yield i
+
 class _TestPool(BaseTestCase):
 
 def test_apply(self):
@@ -1177,6 +1186,25 @@
 self.assertEqual(it.next(), i*i)
 self.assertRaises(StopIteration, it.next)
 
+ def test_imap_handle_iterable_exception(self):
+ if self.TYPE == 'manager':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
+ for i in range(3):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.next)
+
+ # SayWhenError seen at start of problematic chunk's results
+ it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
+ for i in range(6):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.next)
+ it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
+ for i in range(4):
+ self.assertEqual(next(it), i*i)
+ self.assertRaises(SayWhenError, it.next)
+
 def test_imap_unordered(self):
 it = self.pool.imap_unordered(sqr, range(1000))
 self.assertEqual(sorted(it), map(sqr, range(1000)))
@@ -1184,6 +1212,25 @@
 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
 self.assertEqual(sorted(it), map(sqr, range(1000)))
 
+ def test_imap_unordered_handle_iterable_exception(self):
+ if self.TYPE == 'manager':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ it = self.pool.imap_unordered(sqr,
+ exception_throwing_generator(10, 3),
+ 1)
+ with self.assertRaises(SayWhenError):
+ # imap_unordered makes it difficult to anticipate the SayWhenError
+ for i in range(10):
+ self.assertEqual(next(it), i*i)
+
+ it = self.pool.imap_unordered(sqr,
+ exception_throwing_generator(20, 7),
+ 2)
+ with self.assertRaises(SayWhenError):
+ for i in range(20):
+ self.assertEqual(next(it), i*i)
+
 def test_make_pool(self):
 self.assertRaises(ValueError, multiprocessing.Pool, -1)
 self.assertRaises(ValueError, multiprocessing.Pool, 0)
diff --git a/Misc/ACKS b/Misc/ACKS
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -325,6 +325,7 @@
 Caleb Deveraux
 Catherine Devlin
 Scott Dial
+Alon Diamant
 Toby Dickenson
 Mark Dickinson
 Jack Diederich
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -21,6 +21,10 @@
 Library
 -------
 
+- Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
+ handle exceptions raised by an iterator. Patch by Alon Diamant and Davin
+ Potts.
+
 - Issue #22928: Disabled HTTP header injections in httplib.
 Original patch by Demian Brecht.
 
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /