[Python-checkins] cpython: Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.

charles-francois.natali python-checkins at python.org
Mon Mar 25 18:21:20 CET 2013


http://hg.python.org/cpython/rev/5022ee7e13a2
changeset: 82954:5022ee7e13a2
user: Charles-François Natali <cf.natali at gmail.com>
date: Mon Mar 25 18:20:40 2013 +0100
summary:
 Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.
files:
 Lib/multiprocessing/queues.py | 69 ++++++++--------------
 Misc/NEWS | 2 +
 2 files changed, 28 insertions(+), 43 deletions(-)
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,7 +22,7 @@
 from multiprocessing.connection import Pipe
 from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
 from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from multiprocessing.forking import assert_spawning, ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -69,8 +69,8 @@
 self._joincancelled = False
 self._closed = False
 self._close = None
- self._send = self._writer.send
- self._recv = self._reader.recv
+ self._send_bytes = self._writer.send_bytes
+ self._recv_bytes = self._reader.recv_bytes
 self._poll = self._reader.poll
 
 def put(self, obj, block=True, timeout=None):
@@ -89,14 +89,9 @@
 
 def get(self, block=True, timeout=None):
 if block and timeout is None:
- self._rlock.acquire()
- try:
- res = self._recv()
- self._sem.release()
- return res
- finally:
- self._rlock.release()
-
+ with self._rlock:
+ res = self._recv_bytes()
+ self._sem.release()
 else:
 if block:
 deadline = time.time() + timeout
@@ -109,11 +104,12 @@
 raise Empty
 elif not self._poll():
 raise Empty
- res = self._recv()
+ res = self._recv_bytes()
 self._sem.release()
- return res
 finally:
 self._rlock.release()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
 
 def qsize(self):
 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -158,7 +154,7 @@
 self._buffer.clear()
 self._thread = threading.Thread(
 target=Queue._feed,
- args=(self._buffer, self._notempty, self._send,
+ args=(self._buffer, self._notempty, self._send_bytes,
 self._wlock, self._writer.close, self._ignore_epipe),
 name='QueueFeederThread'
 )
@@ -210,7 +206,7 @@
 notempty.release()
 
 @staticmethod
- def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
+ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
 debug('starting thread to feed data to pipe')
 from .util import is_exiting
 
@@ -241,16 +237,14 @@
 close()
 return
 
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
 if wacquire is None:
- send(obj)
- # Delete references to object. See issue16284
- del obj
+ send_bytes(obj)
 else:
 wacquire()
 try:
- send(obj)
- # Delete references to object. See issue16284
- del obj
+ send_bytes(obj)
 finally:
 wrelease()
 except IndexError:
@@ -344,7 +338,6 @@
 self._wlock = None
 else:
 self._wlock = Lock()
- self._make_methods()
 
 def empty(self):
 return not self._poll()
@@ -355,29 +348,19 @@
 
 def __setstate__(self, state):
 (self._reader, self._writer, self._rlock, self._wlock) = state
- self._make_methods()
 
- def _make_methods(self):
- recv = self._reader.recv
- racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get():
- racquire()
- try:
- return recv()
- finally:
- rrelease()
- self.get = get
+ def get(self):
+ with self._rlock:
+ res = self._reader.recv_bytes()
+ # unserialize the data after having released the lock
+ return ForkingPickler.loads(res)
 
+ def put(self, obj):
+ # serialize the data before acquiring the lock
+ obj = ForkingPickler.dumps(obj)
 if self._wlock is None:
 # writes to a message oriented win32 pipe are atomic
- self.put = self._writer.send
+ self._writer.send_bytes(obj)
 else:
- send = self._writer.send
- wacquire, wrelease = self._wlock.acquire, self._wlock.release
- def put(obj):
- wacquire()
- try:
- return send(obj)
- finally:
- wrelease()
- self.put = put
+ with self._wlock:
+ self._writer.send_bytes(obj)
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -294,6 +294,8 @@
 Library
 -------
 
+- Issue #17025: multiprocessing: Reduce Queue and SimpleQueue contention.
+
 - Issue #17536: Add to webbrowser's browser list: www-browser, x-www-browser,
 iceweasel, iceape.
 
-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /