[Python-checkins] cpython: Issue #28053: Applying refactorings, docs and other cleanup to follow.

davin.potts python-checkins at python.org
Fri Sep 9 19:03:27 EDT 2016


https://hg.python.org/cpython/rev/7381b1b50e00
changeset: 103497:7381b1b50e00
user: Davin Potts <python at discontinuity.net>
date: Fri Sep 09 18:03:10 2016 -0500
summary:
 Issue #28053: Applying refactorings, docs and other cleanup to follow.
files:
 Lib/multiprocessing/connection.py | 8 +-
 Lib/multiprocessing/context.py | 13 +++-
 Lib/multiprocessing/forkserver.py | 2 +-
 Lib/multiprocessing/heap.py | 5 +-
 Lib/multiprocessing/managers.py | 5 +-
 Lib/multiprocessing/popen_forkserver.py | 7 +-
 Lib/multiprocessing/popen_spawn_posix.py | 7 +-
 Lib/multiprocessing/popen_spawn_win32.py | 9 +-
 Lib/multiprocessing/queues.py | 10 +-
 Lib/multiprocessing/reduction.py | 34 ++++++++++++
 Lib/multiprocessing/resource_sharer.py | 2 +-
 Lib/multiprocessing/sharedctypes.py | 6 +-
 Lib/multiprocessing/spawn.py | 9 +-
 13 files changed, 77 insertions(+), 40 deletions(-)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -20,11 +20,11 @@
 
 import _multiprocessing
 
-from . import reduction
 from . import util
 
 from . import AuthenticationError, BufferTooShort
-from .reduction import ForkingPickler
+from .context import reduction
+_ForkingPickler = reduction.ForkingPickler
 
 try:
 import _winapi
@@ -203,7 +203,7 @@
 """Send a (picklable) object"""
 self._check_closed()
 self._check_writable()
- self._send_bytes(ForkingPickler.dumps(obj))
+ self._send_bytes(_ForkingPickler.dumps(obj))
 
 def recv_bytes(self, maxlength=None):
 """
@@ -248,7 +248,7 @@
 self._check_closed()
 self._check_readable()
 buf = self._recv_bytes()
- return ForkingPickler.loads(buf.getbuffer())
+ return _ForkingPickler.loads(buf.getbuffer())
 
 def poll(self, timeout=0.0):
 """Whether there is any input available to be read"""
diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py
--- a/Lib/multiprocessing/context.py
+++ b/Lib/multiprocessing/context.py
@@ -3,6 +3,7 @@
 import threading
 
 from . import process
+from . import reduction
 
 __all__ = [] # things are copied from here to __init__.py
 
@@ -198,6 +199,16 @@
 def set_start_method(self, method=None):
 raise ValueError('cannot set start method of concrete context')
 
+ @property
+ def reducer(self):
+ '''Controls how objects will be reduced to a form that can be
+ shared with other processes.'''
+ return globals().get('reduction')
+
+ @reducer.setter
+ def reducer(self, reduction):
+ globals()['reduction'] = reduction
+
 def _check_available(self):
 pass
 
@@ -245,7 +256,6 @@
 if sys.platform == 'win32':
 return ['spawn']
 else:
- from . import reduction
 if reduction.HAVE_SEND_HANDLE:
 return ['fork', 'spawn', 'forkserver']
 else:
@@ -292,7 +302,6 @@
 _name = 'forkserver'
 Process = ForkServerProcess
 def _check_available(self):
- from . import reduction
 if not reduction.HAVE_SEND_HANDLE:
 raise ValueError('forkserver start method not available')
 
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -9,7 +9,7 @@
 
 from . import connection
 from . import process
-from . import reduction
+from .context import reduction
 from . import semaphore_tracker
 from . import spawn
 from . import util
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -14,8 +14,7 @@
 import tempfile
 import threading
 
-from . import context
-from . import reduction
+from .context import reduction, assert_spawning
 from . import util
 
 __all__ = ['BufferWrapper']
@@ -48,7 +47,7 @@
 self._state = (self.size, self.name)
 
 def __getstate__(self):
- context.assert_spawning(self)
+ assert_spawning(self)
 return self._state
 
 def __setstate__(self, state):
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -23,10 +23,9 @@
 from traceback import format_exc
 
 from . import connection
-from . import context
+from .context import reduction, get_spawning_popen
 from . import pool
 from . import process
-from . import reduction
 from . import util
 from . import get_context
 
@@ -833,7 +832,7 @@
 
 def __reduce__(self):
 kwds = {}
- if context.get_spawning_popen() is not None:
+ if get_spawning_popen() is not None:
 kwds['authkey'] = self._authkey
 
 if getattr(self, '_isauto', False):
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
--- a/Lib/multiprocessing/popen_forkserver.py
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -1,10 +1,9 @@
 import io
 import os
 
-from . import reduction
+from .context import reduction, set_spawning_popen
 if not reduction.HAVE_SEND_HANDLE:
 raise ImportError('No support for sending fds between processes')
-from . import context
 from . import forkserver
 from . import popen_fork
 from . import spawn
@@ -42,12 +41,12 @@
 def _launch(self, process_obj):
 prep_data = spawn.get_preparation_data(process_obj._name)
 buf = io.BytesIO()
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
 try:
 reduction.dump(prep_data, buf)
 reduction.dump(process_obj, buf)
 finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
 
 self.sentinel, w = forkserver.connect_to_new_process(self._fds)
 util.Finalize(self, os.close, (self.sentinel,))
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -1,9 +1,8 @@
 import io
 import os
 
-from . import context
+from .context import reduction, set_spawning_popen
 from . import popen_fork
-from . import reduction
 from . import spawn
 from . import util
 
@@ -42,12 +41,12 @@
 self._fds.append(tracker_fd)
 prep_data = spawn.get_preparation_data(process_obj._name)
 fp = io.BytesIO()
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
 try:
 reduction.dump(prep_data, fp)
 reduction.dump(process_obj, fp)
 finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
 
 parent_r = child_w = child_r = parent_w = None
 try:
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -4,9 +4,8 @@
 import sys
 import _winapi
 
-from . import context
+from .context import reduction, get_spawning_popen, set_spawning_popen
 from . import spawn
-from . import reduction
 from . import util
 
 __all__ = ['Popen']
@@ -60,15 +59,15 @@
 util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
 # send information to child
- context.set_spawning_popen(self)
+ set_spawning_popen(self)
 try:
 reduction.dump(prep_data, to_child)
 reduction.dump(process_obj, to_child)
 finally:
- context.set_spawning_popen(None)
+ set_spawning_popen(None)
 
 def duplicate_for_child(self, handle):
- assert self is context.get_spawning_popen()
+ assert self is get_spawning_popen()
 return reduction.duplicate(handle, self.sentinel)
 
 def wait(self, timeout=None):
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -23,9 +23,9 @@
 
 from . import connection
 from . import context
+_ForkingPickler = context.reduction.ForkingPickler
 
 from .util import debug, info, Finalize, register_after_fork, is_exiting
-from .reduction import ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -110,7 +110,7 @@
 finally:
 self._rlock.release()
 # unserialize the data after having released the lock
- return ForkingPickler.loads(res)
+ return _ForkingPickler.loads(res)
 
 def qsize(self):
 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -238,7 +238,7 @@
 return
 
 # serialize the data before acquiring the lock
- obj = ForkingPickler.dumps(obj)
+ obj = _ForkingPickler.dumps(obj)
 if wacquire is None:
 send_bytes(obj)
 else:
@@ -342,11 +342,11 @@
 with self._rlock:
 res = self._reader.recv_bytes()
 # unserialize the data after having released the lock
- return ForkingPickler.loads(res)
+ return _ForkingPickler.loads(res)
 
 def put(self, obj):
 # serialize the data before acquiring the lock
- obj = ForkingPickler.dumps(obj)
+ obj = _ForkingPickler.dumps(obj)
 if self._wlock is None:
 # writes to a message oriented win32 pipe are atomic
 self._writer.send_bytes(obj)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -7,6 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
+from abc import ABCMeta, abstractmethod
 import copyreg
 import functools
 import io
@@ -238,3 +239,36 @@
 fd = df.detach()
 return socket.socket(family, type, proto, fileno=fd)
 register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+ '''Abstract base class for use in implementing a Reduction class
+ suitable for use in replacing the standard reduction mechanism
+ used in multiprocessing.'''
+ ForkingPickler = ForkingPickler
+ register = register
+ dump = dump
+ send_handle = send_handle
+ recv_handle = recv_handle
+
+ if sys.platform == 'win32':
+ steal_handle = steal_handle
+ duplicate = duplicate
+ DupHandle = DupHandle
+ else:
+ sendfds = sendfds
+ recvfds = recvfds
+ DupFd = DupFd
+
+ _reduce_method = _reduce_method
+ _reduce_method_descriptor = _reduce_method_descriptor
+ _rebuild_partial = _rebuild_partial
+ _reduce_socket = _reduce_socket
+ _rebuild_socket = _rebuild_socket
+
+ def __init__(self, *args):
+ register(type(_C().f), _reduce_method)
+ register(type(list.append), _reduce_method_descriptor)
+ register(type(int.__add__), _reduce_method_descriptor)
+ register(functools.partial, _reduce_partial)
+ register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
--- a/Lib/multiprocessing/resource_sharer.py
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -15,7 +15,7 @@
 import threading
 
 from . import process
-from . import reduction
+from .context import reduction
 from . import util
 
 __all__ = ['stop']
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -13,8 +13,8 @@
 from . import heap
 from . import get_context
 
-from .context import assert_spawning
-from .reduction import ForkingPickler
+from .context import reduction, assert_spawning
+_ForkingPickler = reduction.ForkingPickler
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
@@ -134,7 +134,7 @@
 def rebuild_ctype(type_, wrapper, length):
 if length is not None:
 type_ = type_ * length
- ForkingPickler.register(type_, reduce_ctype)
+ _ForkingPickler.register(type_, reduce_ctype)
 buf = wrapper.create_memoryview()
 obj = type_.from_buffer(buf)
 obj._wrapper = wrapper
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -9,13 +9,13 @@
 #
 
 import os
-import pickle
 import sys
 import runpy
 import types
 
 from . import get_start_method, set_start_method
 from . import process
+from .context import reduction
 from . import util
 
 __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
@@ -96,8 +96,7 @@
 assert is_forking(sys.argv)
 if sys.platform == 'win32':
 import msvcrt
- from .reduction import steal_handle
- new_handle = steal_handle(parent_pid, pipe_handle)
+ new_handle = reduction.steal_handle(parent_pid, pipe_handle)
 fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
 else:
 from . import semaphore_tracker
@@ -111,9 +110,9 @@
 with os.fdopen(fd, 'rb', closefd=True) as from_parent:
 process.current_process()._inheriting = True
 try:
- preparation_data = pickle.load(from_parent)
+ preparation_data = reduction.pickle.load(from_parent)
 prepare(preparation_data)
- self = pickle.load(from_parent)
+ self = reduction.pickle.load(from_parent)
 finally:
 del process.current_process()._inheriting
 return self._bootstrap()
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /