Home | History | Annotate | Download | only in test

Lines Matching refs:threading

2 Tests for the threading module.
13 threading = import_module('threading')
43 class TestThread(threading.Thread):
45 threading.Thread.__init__(self, name=name)
95 sema = threading.BoundedSemaphore(value=3)
96 mutex = threading.RLock()
122 self.assertIsNotNone(threading.currentThread().ident)
124 ident.append(threading.currentThread().ident)
126 done = threading.Event()
132 del threading._active[ident[0]]
139 threading.stack_size(262144)
144 threading.stack_size(0)
151 threading.stack_size(0x100000)
156 threading.stack_size(0)
159 # Check that a "foreign" thread can use the threading module.
162 # thread to get made in the threading._active map.
163 threading.current_thread()
166 mutex = threading.Lock()
171 self.assertIn(tid, threading._active)
172 self.assertIsInstance(threading._active[tid], threading._DummyThread)
174 self.assertTrue(threading._active[tid].is_alive())
175 self.assertRegex(repr(threading._active[tid]), '_DummyThread')
176 del threading._active[tid]
191 tid = threading.get_ident()
215 worker_started = threading.Event()
216 worker_saw_exception = threading.Event()
218 class Worker(threading.Thread):
220 self.id = threading.get_ident()
268 raise threading.ThreadError()
269 _start_new_thread = threading._start_new_thread
270 threading._start_new_thread = fail_new_thread
272 t = threading.Thread(target=lambda: None)
273 self.assertRaises(threading.ThreadError, t.start)
275 t in threading._limbo,
278 threading._start_new_thread = _start_new_thread
315 # Avoid a deadlock when sys.settrace steps into threading._shutdown
317 import sys, threading
326 t = threading.Thread(target=killer)
332 threading.current_thread()
340 # Raising SystemExit skipped threading._shutdown
342 import threading
351 threading.Thread(target=child).start()
360 # threading.enumerate() after it has been join()ed.
361 enum = threading.enumerate
366 t = threading.Thread(target=lambda: None)
381 self.thread = threading.Thread(target=self._run,
409 t = threading.Thread()
415 e = threading.Event()
417 threading.activeCount()
420 t = threading.Thread()
426 t = threading.Thread()
428 t = threading.Thread(daemon=False)
430 t = threading.Thread(daemon=True)
438 import _thread, threading, os, time
442 threading.current_thread()
446 evt = threading.Event()
449 assert threading.active_count() == 2, threading.active_count()
451 assert threading.active_count() == 1, threading.active_count()
471 t = threading.Thread(target=lambda: None)
482 main = threading.main_thread()
484 self.assertEqual(main.ident, threading.current_thread().ident)
485 self.assertEqual(main.ident, threading.get_ident())
488 self.assertNotEqual(threading.main_thread().ident,
489 threading.current_thread().ident)
490 th = threading.Thread(target=f)
498 import os, threading
502 main = threading.main_thread()
504 print(main.ident == threading.current_thread().ident)
505 print(main.ident == threading.get_ident())
519 import os, threading, sys
524 main = threading.main_thread()
526 print(main.ident == threading.current_thread().ident)
527 print(main.ident == threading.get_ident())
534 th = threading.Thread(target=f)
554 t = threading.Thread(target=f)
586 t = threading.Thread(target=f)
606 bs = threading.BoundedSemaphore(limit)
607 threads = [threading.Thread(target=bs.acquire)
613 threads = [threading.Thread(target=bs.release)
647 threading.settrace(noop_trace)
666 import sys, os, time, threading
685 t = threading.Thread(target=joiningfunc,
686 args=(threading.current_thread(),))
703 t = threading.Thread(target=joiningfunc,
704 args=(threading.current_thread(),))
717 main_thread = threading.current_thread()
724 t = threading.Thread(target=joiningfunc,
730 w = threading.Thread(target=worker)
745 import threading
759 thread_has_run.add(threading.current_thread())
764 new_thread = threading.Thread(target=random_io)
795 t = threading.Thread(target=do_fork_and_wait)
809 t = threading.Thread(target=lambda : time.sleep(0.3))
838 import threading
846 threading.Thread(target=f).start()
863 import threading
870 tls = threading.local()
878 threading.Thread(target=f).start()
889 import threading
896 threading.Thread(target=f, daemon=True).start()
913 thread = threading.Thread()
918 current_thread = threading.current_thread()
922 thread = threading.Thread()
926 thread = threading.Thread()
931 lock = threading.Lock()
943 import threading
954 w = threading.Thread(target=outer)
969 import threading
979 t = threading.Thread(target=run)
998 import threading
1008 t = threading.Thread(target=run)
1027 import threading
1038 t = threading.Thread(target=run)
1053 class Issue27558(threading.Thread):
1073 self.callback_event = threading.Event()
1078 timer1 = threading.Timer(0.01, self._callback_spy)
1084 timer2 = threading.Timer(0.01, self._callback_spy)
1095 locktype = staticmethod(threading.Lock)
1098 locktype = staticmethod(threading._PyRLock)
1100 @unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1102 locktype = staticmethod(threading._CRLock)
1105 eventtype = staticmethod(threading.Event)
1109 locktype = staticmethod(threading.Condition)
1112 condtype = staticmethod(threading.Condition)
1115 semtype = staticmethod(threading.Semaphore)
1118 semtype = staticmethod(threading.BoundedSemaphore)
1121 barriertype = staticmethod(threading.Barrier)
1127 support.check__all__(self, threading, ('threading', '_thread'),