[Python-checkins] cpython: Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*

antoine.pitrou python-checkins at python.org
Sat Oct 4 20:21:28 CEST 2014


https://hg.python.org/cpython/rev/f87c2c4f03da
changeset: 92808:f87c2c4f03da
user: Antoine Pitrou <solipsis at pitrou.net>
date: Sat Oct 04 20:20:10 2014 +0200
summary:
 Issue #11271: concurrent.futures.Executor.map() now takes a *chunksize*
argument to allow batching of tasks in child processes and improve
performance of ProcessPoolExecutor. Patch by Dan O'Reilly.
files:
 Doc/library/concurrent.futures.rst | 13 +++-
 Lib/concurrent/futures/_base.py | 6 +-
 Lib/concurrent/futures/process.py | 51 +++++++++++++++++
 Lib/test/test_concurrent_futures.py | 16 +++++
 4 files changed, 83 insertions(+), 3 deletions(-)
diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -38,7 +38,7 @@
 future = executor.submit(pow, 323, 1235)
 print(future.result())
 
- .. method:: map(func, *iterables, timeout=None)
+ .. method:: map(func, *iterables, timeout=None, chunksize=1)
 
 Equivalent to :func:`map(func, *iterables) <map>` except *func* is executed
 asynchronously and several calls to *func* may be made concurrently. The
@@ -48,7 +48,16 @@
 *timeout* can be an int or a float. If *timeout* is not specified or
 ``None``, there is no limit to the wait time. If a call raises an
 exception, then that exception will be raised when its value is
- retrieved from the iterator.
+ retrieved from the iterator. When using :class:`ProcessPoolExecutor`, this
+ method chops *iterables* into a number of chunks which it submits to the
+ pool as separate tasks. The (approximate) size of these chunks can be
+ specified by setting *chunksize* to a positive integer. For very long
+ iterables, using a large value for *chunksize* can significantly improve
+ performance compared to the default size of 1. With :class:`ThreadPoolExecutor`,
+ *chunksize* has no effect.
+
+ .. versionchanged:: 3.5
+ Added the *chunksize* argument.
 
 .. method:: shutdown(wait=True)
 
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -520,7 +520,7 @@
 """
 raise NotImplementedError()
 
- def map(self, fn, *iterables, timeout=None):
+ def map(self, fn, *iterables, timeout=None, chunksize=1):
 """Returns a iterator equivalent to map(fn, iter).
 
 Args:
@@ -528,6 +528,10 @@
 passed iterables.
 timeout: The maximum number of seconds to wait. If None, then there
 is no limit on the wait time.
+ chunksize: The size of the chunks the iterable will be broken into
+ before being passed to a child process. This argument is only
+ used by ProcessPoolExecutor; it is ignored by
+ ThreadPoolExecutor.
 
 Returns:
 An iterator equivalent to: map(func, *iterables) but the calls may
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -55,6 +55,8 @@
 from multiprocessing.connection import wait
 import threading
 import weakref
+from functools import partial
+import itertools
 
 # Workers are created as daemon threads and processes. This is done to allow the
 # interpreter to exit when there are still idle processes in a
@@ -108,6 +110,26 @@
 self.args = args
 self.kwargs = kwargs
 
+def _get_chunks(*iterables, chunksize):
+ """ Iterates over zip()ed iterables in chunks. """
+ it = zip(*iterables)
+ while True:
+ chunk = tuple(itertools.islice(it, chunksize))
+ if not chunk:
+ return
+ yield chunk
+
+def _process_chunk(fn, chunk):
+ """ Processes a chunk of an iterable passed to map.
+
+ Runs the function passed to map() on a chunk of the
+ iterable passed to map.
+
+ This function is run in a separate process.
+
+ """
+ return [fn(*args) for args in chunk]
+
 def _process_worker(call_queue, result_queue):
 """Evaluates calls from call_queue and places the results in result_queue.
 
@@ -411,6 +433,35 @@
 return f
 submit.__doc__ = _base.Executor.submit.__doc__
 
+ def map(self, fn, *iterables, timeout=None, chunksize=1):
+ """Returns a iterator equivalent to map(fn, iter).
+
+ Args:
+ fn: A callable that will take as many arguments as there are
+ passed iterables.
+ timeout: The maximum number of seconds to wait. If None, then there
+ is no limit on the wait time.
+ chunksize: If greater than one, the iterables will be chopped into
+ chunks of size chunksize and submitted to the process pool.
+ If set to one, the items in the list will be sent one at a time.
+
+ Returns:
+ An iterator equivalent to: map(func, *iterables) but the calls may
+ be evaluated out-of-order.
+
+ Raises:
+ TimeoutError: If the entire result iterator could not be generated
+ before the given timeout.
+ Exception: If fn(*args) raises for any values.
+ """
+ if chunksize < 1:
+ raise ValueError("chunksize must be >= 1.")
+
+ results = super().map(partial(_process_chunk, fn),
+ _get_chunks(*iterables, chunksize=chunksize),
+ timeout=timeout)
+ return itertools.chain.from_iterable(results)
+
 def shutdown(self, wait=True):
 with self._shutdown_lock:
 self._shutdown_thread = True
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -464,6 +464,22 @@
 # Submitting other jobs fails as well.
 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
 
+ def test_map_chunksize(self):
+ def bad_map():
+ list(self.executor.map(pow, range(40), range(40), chunksize=-1))
+
+ ref = list(map(pow, range(40), range(40)))
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=6)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=50)),
+ ref)
+ self.assertEqual(
+ list(self.executor.map(pow, range(40), range(40), chunksize=40)),
+ ref)
+ self.assertRaises(ValueError, bad_map)
+
 
 class FutureTests(unittest.TestCase):
 def test_done_callback_with_result(self):
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /