[Python-checkins] bpo-32355: Optimize asyncio.gather() (#4913)

Yury Selivanov webhook-mailer at python.org
Tue Dec 19 07:19:56 EST 2017


https://github.com/python/cpython/commit/36c2c044782997520df7fc5604742a615ccf6b17
commit: 36c2c044782997520df7fc5604742a615ccf6b17
branch: master
author: Yury Selivanov <yury at magic.io>
committer: GitHub <noreply at github.com>
date: 2017年12月19日T07:19:53-05:00
summary:
bpo-32355: Optimize asyncio.gather() (#4913)
files:
A Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst
M Lib/asyncio/base_events.py
M Lib/asyncio/tasks.py
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index bd5bb32302a..a7f8edd8cfd 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -139,11 +139,12 @@ def _ipaddr_info(host, port, family, type, proto):
 
 
 def _run_until_complete_cb(fut):
- exc = fut._exception
- if isinstance(exc, BaseException) and not isinstance(exc, Exception):
- # Issue #22429: run_forever() already finished, no need to
- # stop it.
- return
+ if not fut.cancelled():
+ exc = fut.exception()
+ if isinstance(exc, BaseException) and not isinstance(exc, Exception):
+ # Issue #22429: run_forever() already finished, no need to
+ # stop it.
+ return
 fut._loop.stop()
 
 
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 275141c65e7..ff8a486b544 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -575,8 +575,7 @@ def cancel(self):
 
 
 def gather(*coros_or_futures, loop=None, return_exceptions=False):
- """Return a future aggregating results from the given coroutines
- or futures.
+ """Return a future aggregating results from the given coroutines/futures.
 
 Coroutines will be wrapped in a future and scheduled in the event
 loop. They will not necessarily be scheduled in the same order as
@@ -605,56 +604,76 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
 outer.set_result([])
 return outer
 
- arg_to_fut = {}
- for arg in set(coros_or_futures):
- if not futures.isfuture(arg):
- fut = ensure_future(arg, loop=loop)
- if loop is None:
- loop = fut._loop
- # The caller cannot control this future, the "destroy pending task"
- # warning should not be emitted.
- fut._log_destroy_pending = False
- else:
- fut = arg
- if loop is None:
- loop = fut._loop
- elif fut._loop is not loop:
- raise ValueError("futures are tied to different event loops")
- arg_to_fut[arg] = fut
-
- children = [arg_to_fut[arg] for arg in coros_or_futures]
- nchildren = len(children)
- outer = _GatheringFuture(children, loop=loop)
- nfinished = 0
- results = [None] * nchildren
-
- def _done_callback(i, fut):
+ def _done_callback(fut):
 nonlocal nfinished
+ nfinished += 1
+
 if outer.done():
 if not fut.cancelled():
 # Mark exception retrieved.
 fut.exception()
 return
 
- if fut.cancelled():
- res = futures.CancelledError()
- if not return_exceptions:
- outer.set_exception(res)
- return
- elif fut._exception is not None:
- res = fut.exception() # Mark exception retrieved.
- if not return_exceptions:
- outer.set_exception(res)
+ if not return_exceptions:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ exc = futures.CancelledError()
+ outer.set_exception(exc)
 return
- else:
- res = fut._result
- results[i] = res
- nfinished += 1
- if nfinished == nchildren:
+ else:
+ exc = fut.exception()
+ if exc is not None:
+ outer.set_exception(exc)
+ return
+
+ if nfinished == nfuts:
+ # All futures are done; create a list of results
+ # and set it to the 'outer' future.
+ results = []
+
+ for fut in children:
+ if fut.cancelled():
+ # Check if 'fut' is cancelled first, as
+ # 'fut.exception()' will *raise* a CancelledError
+ # instead of returning it.
+ res = futures.CancelledError()
+ else:
+ res = fut.exception()
+ if res is None:
+ res = fut.result()
+ results.append(res)
+
 outer.set_result(results)
 
- for i, fut in enumerate(children):
- fut.add_done_callback(functools.partial(_done_callback, i))
+ arg_to_fut = {}
+ children = []
+ nfuts = 0
+ nfinished = 0
+ for arg in coros_or_futures:
+ if arg not in arg_to_fut:
+ fut = ensure_future(arg, loop=loop)
+ if loop is None:
+ loop = fut._loop
+ if fut is not arg:
+ # 'arg' was not a Future, therefore, 'fut' is a new
+ # Future created specifically for 'arg'. Since the caller
+ # can't control it, disable the "destroy pending task"
+ # warning.
+ fut._log_destroy_pending = False
+
+ nfuts += 1
+ arg_to_fut[arg] = fut
+ fut.add_done_callback(_done_callback)
+
+ else:
+ # There's a duplicate Future object in coros_or_futures.
+ fut = arg_to_fut[arg]
+
+ children.append(fut)
+
+ outer = _GatheringFuture(children, loop=loop)
 return outer
 
 
diff --git a/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst b/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst
new file mode 100644
index 00000000000..ca908e97803
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-12-17-21-42-24.bpo-32355.tbaTWA.rst
@@ -0,0 +1 @@
+Optimize asyncio.gather(); now up to 15% faster.


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /