[Python-checkins] python/dist/src/Lib/test test_dummy_thread.py,NONE,1.1 test_dummy_threading.py,NONE,1.1

gvanrossum@users.sourceforge.net gvanrossum@users.sourceforge.net
2002年12月30日 14:30:24 -0800


Update of /cvsroot/python/python/dist/src/Lib/test
In directory sc8-pr-cvs1:/tmp/cvs-serv13682/Lib/test
Added Files:
	test_dummy_thread.py test_dummy_threading.py 
Log Message:
Brett Cannon's dummy_thread and dummy_threading modules (SF patch
622537), with some nitpicking editorial changes.
--- NEW FILE: test_dummy_thread.py ---
"""Generic thread tests.
Meant to be used by dummy_thread and thread. To allow for different modules
to be used, test_main() can be called with the module to use as the thread
implementation as its sole argument.
"""
import dummy_thread as _thread
import time
import Queue
import random
import unittest
from test import test_support
class LockTests(unittest.TestCase):
 """Test lock objects."""
 def setUp(self):
 # Create a lock
 self.lock = _thread.allocate_lock()
 def test_initlock(self):
 #Make sure locks start locked
 self.failUnless(not self.lock.locked(),
 "Lock object is not initialized unlocked.")
 def test_release(self):
 # Test self.lock.release()
 self.lock.acquire()
 self.lock.release()
 self.failUnless(not self.lock.locked(),
 "Lock object did not release properly.")
 
 def test_improper_release(self):
 #Make sure release of an unlocked thread raises _thread.error
 self.failUnlessRaises(_thread.error, self.lock.release)
 def test_cond_acquire_success(self):
 #Make sure the conditional acquiring of the lock works.
 self.failUnless(self.lock.acquire(0),
 "Conditional acquiring of the lock failed.")
 def test_cond_acquire_fail(self):
 #Test acquiring locked lock returns False
 self.lock.acquire(0)
 self.failUnless(not self.lock.acquire(0),
 "Conditional acquiring of a locked lock incorrectly "
 "succeeded.")
 def test_uncond_acquire_success(self):
 #Make sure unconditional acquiring of a lock works.
 self.lock.acquire()
 self.failUnless(self.lock.locked(),
 "Uncondional locking failed.")
 def test_uncond_acquire_return_val(self):
 #Make sure that an unconditional locking returns True.
 self.failUnless(self.lock.acquire(1) is True,
 "Unconditional locking did not return True.")
 
 def test_uncond_acquire_blocking(self):
 #Make sure that unconditional acquiring of a locked lock blocks.
 def delay_unlock(to_unlock, delay):
 """Hold on to lock for a set amount of time before unlocking."""
 time.sleep(delay)
 to_unlock.release()
 self.lock.acquire()
 delay = 1 #In seconds
 start_time = int(time.time())
 _thread.start_new_thread(delay_unlock,(self.lock, delay))
 if test_support.verbose:
 print
 print "*** Waiting for thread to release the lock "\
 "(approx. %s sec.) ***" % delay
 self.lock.acquire()
 end_time = int(time.time())
 if test_support.verbose:
 print "done"
 self.failUnless((end_time - start_time) >= delay,
 "Blocking by unconditional acquiring failed.")
class MiscTests(unittest.TestCase):
 """Miscellaneous tests."""
 def test_exit(self):
 #Make sure _thread.exit() raises SystemExit
 self.failUnlessRaises(SystemExit, _thread.exit)
 def test_ident(self):
 #Test sanity of _thread.get_ident()
 self.failUnless(isinstance(_thread.get_ident(), int),
 "_thread.get_ident() returned a non-integer")
 self.failUnless(_thread.get_ident() != 0,
 "_thread.get_ident() returned 0")
 def test_LockType(self):
 #Make sure _thread.LockType is the same type as _thread.allocate_locke()
 self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
 "_thread.LockType is not an instance of what is "
 "returned by _thread.allocate_lock()")
class ThreadTests(unittest.TestCase):
 """Test thread creation."""
 def test_arg_passing(self):
 #Make sure that parameter passing works.
 def arg_tester(queue, arg1=False, arg2=False):
 """Use to test _thread.start_new_thread() passes args properly."""
 queue.put((arg1, arg2))
 testing_queue = Queue.Queue(1)
 _thread.start_new_thread(arg_tester, (testing_queue, True, True))
 result = testing_queue.get()
 self.failUnless(result[0] and result[1],
 "Argument passing for thread creation using tuple failed")
 _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
 'arg1':True, 'arg2':True})
 result = testing_queue.get()
 self.failUnless(result[0] and result[1],
 "Argument passing for thread creation using kwargs failed")
 _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
 result = testing_queue.get()
 self.failUnless(result[0] and result[1],
 "Argument passing for thread creation using both tuple"
 " and kwargs failed")
 
 def test_multi_creation(self):
 #Make sure multiple threads can be created.
 def queue_mark(queue, delay):
 """Wait for ``delay`` seconds and then put something into ``queue``"""
 time.sleep(delay)
 queue.put(_thread.get_ident())
 
 thread_count = 5
 delay = 1.5
 testing_queue = Queue.Queue(thread_count)
 if test_support.verbose:
 print
 print "*** Testing multiple thread creation "\
 "(will take approx. %s to %s sec.) ***" % (delay, thread_count)
 for count in xrange(thread_count):
 _thread.start_new_thread(queue_mark,
 (testing_queue, round(random.random(), 1)))
 time.sleep(delay)
 if test_support.verbose:
 print 'done'
 self.failUnless(testing_queue.qsize() == thread_count,
 "Not all %s threads executed properly after %s sec." % 
 (thread_count, delay))
def test_main(imported_module=None):
 global _thread
 if imported_module:
 _thread = imported_module
 if test_support.verbose:
 print
 print "*** Using %s as _thread module ***" % _thread
 suite = unittest.TestSuite()
 suite.addTest(unittest.makeSuite(LockTests))
 suite.addTest(unittest.makeSuite(MiscTests))
 suite.addTest(unittest.makeSuite(ThreadTests))
 test_support.run_suite(suite)
if __name__ == '__main__':
 test_main()
--- NEW FILE: test_dummy_threading.py ---
# Very rudimentary test of threading module
# Create a bunch of threads, let each do some work, wait until all are done
from test.test_support import verbose
import random
import dummy_threading as _threading
import time
class TestThread(_threading.Thread):
 
 def run(self):
 global running
 delay = random.random() * 2
 if verbose:
 print 'task', self.getName(), 'will run for', delay, 'sec'
 sema.acquire()
 mutex.acquire()
 running = running + 1
 if verbose:
 print running, 'tasks are running'
 mutex.release()
 time.sleep(delay)
 if verbose:
 print 'task', self.getName(), 'done'
 mutex.acquire()
 running = running - 1
 if verbose:
 print self.getName(), 'is finished.', running, 'tasks are running'
 mutex.release()
 sema.release()
def starttasks():
 for i in range(numtasks):
 t = TestThread(name="<thread %d>"%i)
 threads.append(t)
 t.start()
def test_main():
 # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
 # about 1 second per clump).
 global numtasks
 numtasks = 10
 # no more than 3 of the 10 can run at once
 global sema
 sema = _threading.BoundedSemaphore(value=3)
 global mutex
 mutex = _threading.RLock()
 global running
 running = 0
 global threads
 threads = []
 
 starttasks()
 if verbose:
 print 'waiting for all tasks to complete'
 for t in threads:
 t.join()
 if verbose:
 print 'all tasks done'
if __name__ == '__main__':
 test_main()

AltStyle によって変換されたページ (->オリジナル) /