[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)

victor.stinner python-checkins at python.org
Fri Dec 26 21:46:38 CET 2014


https://hg.python.org/cpython/rev/fc6afd14c026
changeset: 93974:fc6afd14c026
parent: 93970:e4bbd78beee9
parent: 93973:4e8ac4173b3c
user: Victor Stinner <victor.stinner at gmail.com>
date: Fri Dec 26 21:29:45 2014 +0100
summary:
 Merge 3.4 (asyncio)
files:
 Lib/asyncio/base_events.py | 38 ++--
 Lib/asyncio/proactor_events.py | 6 +-
 Lib/asyncio/selector_events.py | 2 +-
 Lib/test/test_asyncio/test_base_events.py | 78 +++++++--
 Lib/test/test_asyncio/test_events.py | 2 +-
 Lib/test/test_asyncio/test_futures.py | 2 +-
 Lib/test/test_asyncio/test_proactor_events.py | 7 +-
 Lib/test/test_asyncio/test_selector_events.py | 3 -
 Lib/test/test_asyncio/test_subprocess.py | 30 +--
 Lib/test/test_asyncio/test_tasks.py | 10 +-
 Lib/test/test_asyncio/test_unix_events.py | 2 -
 Lib/test/test_asyncio/test_windows_utils.py | 11 +-
 12 files changed, 106 insertions(+), 85 deletions(-)
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -22,6 +22,7 @@
 import os
 import socket
 import subprocess
+import threading
 import time
 import traceback
 import sys
@@ -168,7 +169,9 @@
 self._scheduled = []
 self._default_executor = None
 self._internal_fds = 0
- self._running = False
+ # Identifier of the thread running the event loop, or None if the
+ # event loop is not running
+ self._owner = None
 self._clock_resolution = time.get_clock_info('monotonic').resolution
 self._exception_handler = None
 self._debug = (not sys.flags.ignore_environment
@@ -246,9 +249,9 @@
 def run_forever(self):
 """Run until stop() is called."""
 self._check_closed()
- if self._running:
+ if self.is_running():
 raise RuntimeError('Event loop is running.')
- self._running = True
+ self._owner = threading.get_ident()
 try:
 while True:
 try:
@@ -256,7 +259,7 @@
 except _StopError:
 break
 finally:
- self._running = False
+ self._owner = None
 
 def run_until_complete(self, future):
 """Run until the Future is done.
@@ -311,7 +314,7 @@
 
 The event loop must not be running.
 """
- if self._running:
+ if self.is_running():
 raise RuntimeError("Cannot close a running event loop")
 if self._closed:
 return
@@ -331,7 +334,7 @@
 
 def is_running(self):
 """Returns True if the event loop is running."""
- return self._running
+ return (self._owner is not None)
 
 def time(self):
 """Return the time according to the event loop's clock.
@@ -373,7 +376,7 @@
 raise TypeError("coroutines cannot be used with call_at()")
 self._check_closed()
 if self._debug:
- self._assert_is_current_event_loop()
+ self._check_thread()
 timer = events.TimerHandle(when, callback, args, self)
 if timer._source_traceback:
 del timer._source_traceback[-1]
@@ -391,17 +394,17 @@
 Any positional arguments after the callback will be passed to
 the callback when it is called.
 """
- handle = self._call_soon(callback, args, check_loop=True)
+ if self._debug:
+ self._check_thread()
+ handle = self._call_soon(callback, args)
 if handle._source_traceback:
 del handle._source_traceback[-1]
 return handle
 
- def _call_soon(self, callback, args, check_loop):
+ def _call_soon(self, callback, args):
 if (coroutines.iscoroutine(callback)
 or coroutines.iscoroutinefunction(callback)):
 raise TypeError("coroutines cannot be used with call_soon()")
- if self._debug and check_loop:
- self._assert_is_current_event_loop()
 self._check_closed()
 handle = events.Handle(callback, args, self)
 if handle._source_traceback:
@@ -409,8 +412,8 @@
 self._ready.append(handle)
 return handle
 
- def _assert_is_current_event_loop(self):
- """Asserts that this event loop is the current event loop.
+ def _check_thread(self):
+ """Check that the current thread is the thread running the event loop.
 
 Non-thread-safe methods of this class make this assumption and will
 likely behave incorrectly when the assumption is violated.
@@ -418,18 +421,17 @@
 Should only be called when (self._debug == True). The caller is
 responsible for checking this condition for performance reasons.
 """
- try:
- current = events.get_event_loop()
- except RuntimeError:
+ if self._owner is None:
 return
- if current is not self:
+ thread_id = threading.get_ident()
+ if thread_id != self._owner:
 raise RuntimeError(
 "Non-thread-safe operation invoked on an event loop other "
 "than the current one")
 
 def call_soon_threadsafe(self, callback, *args):
 """Like call_soon(), but thread-safe."""
- handle = self._call_soon(callback, args, check_loop=False)
+ handle = self._call_soon(callback, args)
 if handle._source_traceback:
 del handle._source_traceback[-1]
 self._write_to_self()
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -383,7 +383,7 @@
 sock, protocol, waiter, extra)
 
 def close(self):
- if self._running:
+ if self.is_running():
 raise RuntimeError("Cannot close a running event loop")
 if self.is_closed():
 return
@@ -432,9 +432,7 @@
 self._ssock.setblocking(False)
 self._csock.setblocking(False)
 self._internal_fds += 1
- # don't check the current loop because _make_self_pipe() is called
- # from the event loop constructor
- self._call_soon(self._loop_self_reading, (), check_loop=False)
+ self.call_soon(self._loop_self_reading)
 
 def _loop_self_reading(self, f=None):
 try:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -68,7 +68,7 @@
 address, waiter, extra)
 
 def close(self):
- if self._running:
+ if self.is_running():
 raise RuntimeError("Cannot close a running event loop")
 if self.is_closed():
 return
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -5,6 +5,7 @@
 import math
 import socket
 import sys
+import threading
 import time
 import unittest
 from unittest import mock
@@ -14,8 +15,8 @@
 from asyncio import constants
 from asyncio import test_utils
 try:
+ from test import support
 from test.script_helper import assert_python_ok
- from test import support
 except ImportError:
 from asyncio import test_support as support
 from asyncio.test_support import assert_python_ok
@@ -148,28 +149,71 @@
 # are really slow
 self.assertLessEqual(dt, 0.9, dt)
 
- def test_assert_is_current_event_loop(self):
+ def check_thread(self, loop, debug):
 def cb():
 pass
 
- other_loop = base_events.BaseEventLoop()
- other_loop._selector = mock.Mock()
- asyncio.set_event_loop(other_loop)
+ loop.set_debug(debug)
+ if debug:
+ msg = ("Non-thread-safe operation invoked on an event loop other "
+ "than the current one")
+ with self.assertRaisesRegex(RuntimeError, msg):
+ loop.call_soon(cb)
+ with self.assertRaisesRegex(RuntimeError, msg):
+ loop.call_later(60, cb)
+ with self.assertRaisesRegex(RuntimeError, msg):
+ loop.call_at(loop.time() + 60, cb)
+ else:
+ loop.call_soon(cb)
+ loop.call_later(60, cb)
+ loop.call_at(loop.time() + 60, cb)
 
- # raise RuntimeError if the event loop is different in debug mode
- self.loop.set_debug(True)
- with self.assertRaises(RuntimeError):
- self.loop.call_soon(cb)
- with self.assertRaises(RuntimeError):
- self.loop.call_later(60, cb)
- with self.assertRaises(RuntimeError):
- self.loop.call_at(self.loop.time() + 60, cb)
+ def test_check_thread(self):
+ def check_in_thread(loop, event, debug, create_loop, fut):
+ # wait until the event loop is running
+ event.wait()
+
+ try:
+ if create_loop:
+ loop2 = base_events.BaseEventLoop()
+ try:
+ asyncio.set_event_loop(loop2)
+ self.check_thread(loop, debug)
+ finally:
+ asyncio.set_event_loop(None)
+ loop2.close()
+ else:
+ self.check_thread(loop, debug)
+ except Exception as exc:
+ loop.call_soon_threadsafe(fut.set_exception, exc)
+ else:
+ loop.call_soon_threadsafe(fut.set_result, None)
+
+ def test_thread(loop, debug, create_loop=False):
+ event = threading.Event()
+ fut = asyncio.Future(loop=loop)
+ loop.call_soon(event.set)
+ args = (loop, event, debug, create_loop, fut)
+ thread = threading.Thread(target=check_in_thread, args=args)
+ thread.start()
+ loop.run_until_complete(fut)
+ thread.join()
+
+ self.loop._process_events = mock.Mock()
+ self.loop._write_to_self = mock.Mock()
+
+ # raise RuntimeError if the thread has no event loop
+ test_thread(self.loop, True)
 
 # check disabled if debug mode is disabled
- self.loop.set_debug(False)
- self.loop.call_soon(cb)
- self.loop.call_later(60, cb)
- self.loop.call_at(self.loop.time() + 60, cb)
+ test_thread(self.loop, False)
+
+ # raise RuntimeError if the event loop of the thread is not the called
+ # event loop
+ test_thread(self.loop, True, create_loop=True)
+
+ # check disabled if debug mode is disabled
+ test_thread(self.loop, False, create_loop=True)
 
 def test_run_once_in_executor_handle(self):
 def cb():
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -27,7 +27,7 @@
 from asyncio import selector_events
 from asyncio import test_utils
 try:
- from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
+ from test import support
 except ImportError:
 from asyncio import test_support as support
 
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -10,7 +10,7 @@
 import asyncio
 from asyncio import test_utils
 try:
- from test import support # gc_collect
+ from test import support
 except ImportError:
 from asyncio import test_support as support
 
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -440,17 +440,16 @@
 self.loop = EventLoop(self.proactor)
 self.set_event_loop(self.loop, cleanup=False)
 
- @mock.patch.object(BaseProactorEventLoop, '_call_soon')
+ @mock.patch.object(BaseProactorEventLoop, 'call_soon')
 @mock.patch.object(BaseProactorEventLoop, '_socketpair')
- def test_ctor(self, socketpair, _call_soon):
+ def test_ctor(self, socketpair, call_soon):
 ssock, csock = socketpair.return_value = (
 mock.Mock(), mock.Mock())
 loop = BaseProactorEventLoop(self.proactor)
 self.assertIs(loop._ssock, ssock)
 self.assertIs(loop._csock, csock)
 self.assertEqual(loop._internal_fds, 1)
- _call_soon.assert_called_with(loop._loop_self_reading, (),
- check_loop=False)
+ call_soon.assert_called_with(loop._loop_self_reading)
 
 def test_close_self_pipe(self):
 self.loop._close_self_pipe()
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -1,10 +1,7 @@
 """Tests for selector_events.py"""
 
 import errno
-import gc
-import pprint
 import socket
-import sys
 import unittest
 from unittest import mock
 try:
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -6,12 +6,12 @@
 import asyncio
 from asyncio import subprocess
 from asyncio import test_utils
+try:
+ from test import support
+except ImportError:
+ from asyncio import test_support as support
 if sys.platform != 'win32':
 from asyncio import unix_events
-try:
- from test import support # PIPE_MAX_SIZE
-except ImportError:
- from asyncio import test_support as support
 
 # Program blocking
 PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
@@ -233,19 +233,12 @@
 def setUp(self):
 policy = asyncio.get_event_loop_policy()
 self.loop = policy.new_event_loop()
-
- # ensure that the event loop is passed explicitly in asyncio
- policy.set_event_loop(None)
+ self.set_event_loop(self.loop)
 
 watcher = self.Watcher()
 watcher.attach_loop(self.loop)
 policy.set_child_watcher(watcher)
-
- def tearDown(self):
- policy = asyncio.get_event_loop_policy()
- policy.set_child_watcher(None)
- self.loop.close()
- super().tearDown()
+ self.addCleanup(policy.set_child_watcher, None)
 
 class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
 test_utils.TestCase):
@@ -262,17 +255,8 @@
 class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 
 def setUp(self):
- policy = asyncio.get_event_loop_policy()
 self.loop = asyncio.ProactorEventLoop()
-
- # ensure that the event loop is passed explicitly in asyncio
- policy.set_event_loop(None)
-
- def tearDown(self):
- policy = asyncio.get_event_loop_policy()
- self.loop.close()
- policy.set_event_loop(None)
- super().tearDown()
+ self.set_event_loop(self.loop)
 
 
 if __name__ == '__main__':
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -7,17 +7,17 @@
 import unittest
 import weakref
 from unittest import mock
+
+import asyncio
+from asyncio import coroutines
+from asyncio import test_utils
 try:
- from test import support # gc_collect
+ from test import support
 from test.script_helper import assert_python_ok
 except ImportError:
 from asyncio import test_support as support
 from asyncio.test_support import assert_python_ok
 
-import asyncio
-from asyncio import coroutines
-from asyncio import test_utils
-
 
 PY34 = (sys.version_info >= (3, 4))
 PY35 = (sys.version_info >= (3, 5))
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -1,11 +1,9 @@
 """Tests for unix_events.py."""
 
 import collections
-import gc
 import errno
 import io
 import os
-import pprint
 import signal
 import socket
 import stat
diff --git a/Lib/test/test_asyncio/test_windows_utils.py b/Lib/test/test_asyncio/test_windows_utils.py
--- a/Lib/test/test_asyncio/test_windows_utils.py
+++ b/Lib/test/test_asyncio/test_windows_utils.py
@@ -5,18 +5,17 @@
 import unittest
 from unittest import mock
 
-try:
- from test import support # gc_collect, IPV6_ENABLED
-except ImportError:
- from asyncio import test_support as support
-
 if sys.platform != 'win32':
 raise unittest.SkipTest('Windows only')
 
 import _winapi
 
+from asyncio import _overlapped
 from asyncio import windows_utils
-from asyncio import _overlapped
+try:
+ from test import support
+except ImportError:
+ from asyncio import test_support as support
 
 
 class WinsocketpairTests(unittest.TestCase):
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /