[Python-checkins] cpython (2.7): Issue #20556: Used specific assert methods in threading tests.

serhiy.storchaka python-checkins at python.org
Mon Mar 14 04:40:43 EDT 2016


https://hg.python.org/cpython/rev/9f8db4d1e149
changeset: 100522:9f8db4d1e149
branch: 2.7
parent: 100517:e3833106f612
user: Serhiy Storchaka <storchaka at gmail.com>
date: Mon Mar 14 10:28:59 2016 +0200
summary:
 Issue #20556: Used specific assert methods in threading tests.
files:
 Lib/test/test_dummy_thread.py | 16 ++++++++--------
 Lib/test/test_threading.py | 20 ++++++++++----------
 Lib/test/test_threading_local.py | 2 +-
 3 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
--- a/Lib/test/test_dummy_thread.py
+++ b/Lib/test/test_dummy_thread.py
@@ -24,14 +24,14 @@
 
 def test_initlock(self):
 #Make sure locks start locked
- self.assertTrue(not self.lock.locked(),
+ self.assertFalse(self.lock.locked(),
 "Lock object is not initialized unlocked.")
 
 def test_release(self):
 # Test self.lock.release()
 self.lock.acquire()
 self.lock.release()
- self.assertTrue(not self.lock.locked(),
+ self.assertFalse(self.lock.locked(),
 "Lock object did not release properly.")
 
 def test_improper_release(self):
@@ -46,7 +46,7 @@
 def test_cond_acquire_fail(self):
 #Test acquiring locked lock returns False
 self.lock.acquire(0)
- self.assertTrue(not self.lock.acquire(0),
+ self.assertFalse(self.lock.acquire(0),
 "Conditional acquiring of a locked lock incorrectly "
 "succeeded.")
 
@@ -58,9 +58,9 @@
 
 def test_uncond_acquire_return_val(self):
 #Make sure that an unconditional locking returns True.
- self.assertTrue(self.lock.acquire(1) is True,
+ self.assertIs(self.lock.acquire(1), True,
 "Unconditional locking did not return True.")
- self.assertTrue(self.lock.acquire() is True)
+ self.assertIs(self.lock.acquire(), True)
 
 def test_uncond_acquire_blocking(self):
 #Make sure that unconditional acquiring of a locked lock blocks.
@@ -80,7 +80,7 @@
 end_time = int(time.time())
 if test_support.verbose:
 print "done"
- self.assertTrue((end_time - start_time) >= DELAY,
+ self.assertGreaterEqual(end_time - start_time, DELAY,
 "Blocking by unconditional acquiring failed.")
 
 class MiscTests(unittest.TestCase):
@@ -94,7 +94,7 @@
 #Test sanity of _thread.get_ident()
 self.assertIsInstance(_thread.get_ident(), int,
 "_thread.get_ident() returned a non-integer")
- self.assertTrue(_thread.get_ident() != 0,
+ self.assertNotEqual(_thread.get_ident(), 0,
 "_thread.get_ident() returned 0")
 
 def test_LockType(self):
@@ -164,7 +164,7 @@
 time.sleep(DELAY)
 if test_support.verbose:
 print 'done'
- self.assertTrue(testing_queue.qsize() == thread_count,
+ self.assertEqual(testing_queue.qsize(), thread_count,
 "Not all %s threads executed properly after %s sec." %
 (thread_count, DELAY))
 
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -51,7 +51,7 @@
 self.nrunning.inc()
 if verbose:
 print self.nrunning.get(), 'tasks are running'
- self.testcase.assertTrue(self.nrunning.get() <= 3)
+ self.testcase.assertLessEqual(self.nrunning.get(), 3)
 
 time.sleep(delay)
 if verbose:
@@ -59,7 +59,7 @@
 
 with self.mutex:
 self.nrunning.dec()
- self.testcase.assertTrue(self.nrunning.get() >= 0)
+ self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
 if verbose:
 print '%s is finished. %d tasks are running' % (
 self.name, self.nrunning.get())
@@ -92,25 +92,25 @@
 for i in range(NUMTASKS):
 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
 threads.append(t)
- self.assertEqual(t.ident, None)
- self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
+ self.assertIsNone(t.ident)
+ self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
 t.start()
 
 if verbose:
 print 'waiting for all tasks to complete'
 for t in threads:
 t.join(NUMTASKS)
- self.assertTrue(not t.is_alive())
+ self.assertFalse(t.is_alive())
 self.assertNotEqual(t.ident, 0)
- self.assertFalse(t.ident is None)
- self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+ self.assertIsNotNone(t.ident)
+ self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
 if verbose:
 print 'all tasks done'
 self.assertEqual(numrunning.get(), 0)
 
 def test_ident_of_no_threading_threads(self):
 # The ident still must work for the main thread and dummy threads.
- self.assertFalse(threading.currentThread().ident is None)
+ self.assertIsNotNone(threading.currentThread().ident)
 def f():
 ident.append(threading.currentThread().ident)
 done.set()
@@ -118,7 +118,7 @@
 ident = []
 thread.start_new_thread(f, ())
 done.wait()
- self.assertFalse(ident[0] is None)
+ self.assertIsNotNone(ident[0])
 # Kill the "immortal" _DummyThread
 del threading._active[ident[0]]
 
@@ -236,7 +236,7 @@
 self.assertTrue(ret)
 if verbose:
 print " verifying worker hasn't exited"
- self.assertTrue(not t.finished)
+ self.assertFalse(t.finished)
 if verbose:
 print " attempting to raise asynch exception in worker"
 result = set_async_exc(ctypes.c_long(t.id), exception)
diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py
--- a/Lib/test/test_threading_local.py
+++ b/Lib/test/test_threading_local.py
@@ -196,7 +196,7 @@
 wr = weakref.ref(x)
 del x
 gc.collect()
- self.assertIs(wr(), None)
+ self.assertIsNone(wr())
 
 class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
 _local = _threading_local.local
-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list

AltStyle によって変換されたページ (->オリジナル) /