1 """Thread module emulating a subset of Java's threading model.""" 2 3 import sys as _sys 4 5 try: 6 import thread 7 except ImportError: 8 del _sys.modules[__name__] 9 raise 10 11 import warnings 12 13 from collections import deque as _deque 14 from time import time as _time, sleep as _sleep 15 from traceback import format_exc as _format_exc 16 17 # Note regarding PEP 8 compliant aliases 18 # This threading model was originally inspired by Java, and inherited 19 # the convention of camelCase function and method names from that 20 # language. While those names are not in any imminent danger of being 21 # deprecated, starting with Python 2.6, the module now provides a 22 # PEP 8 compliant alias for any such method name. 23 # Using the new PEP 8 compliant names also facilitates substitution 24 # with the multiprocessing module, which doesn't provide the old 25 # Java inspired names. 26 27 28 # Rename some stuff so "from threading import *" is safe 29 __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 30 'current_thread', 'enumerate', 'Event', 31 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 32 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 33 34 _start_new_thread = thread.start_new_thread 35 _allocate_lock = thread.allocate_lock 36 _get_ident = thread.get_ident 37 ThreadError = thread.error 38 del thread 39 40 41 # sys.exc_clear is used to work around the fact that except blocks 42 # don't fully clear the exception until 3.0. 43 warnings.filterwarnings('ignore', category=DeprecationWarning, 44 module='threading', message='sys.exc_clear') 45 46 # Debug support (adapted from ihooks.py). 47 # All the major classes here derive from _Verbose. We force that to 48 # be a new-style class so that all the major classes here are new-style. 49 # This helps debugging (type(instance) is more revealing for instances 50 # of new-style classes). 51 52 _VERBOSE = False 53 54 if __debug__: 55 56 class _Verbose(object): 57 58 def __init__(self, verbose=None): 59 if verbose is None: 60 verbose = _VERBOSE 61 self.__verbose = verbose 62 63 def _note(self, format, *args): 64 if self.__verbose: 65 format = format % args 66 # Issue #4188: calling current_thread() can incur an infinite 67 # recursion if it has to create a DummyThread on the fly. 68 ident = _get_ident() 69 try: 70 name = _active[ident].name 71 except KeyError: 72 name = "<OS thread %d>" % ident 73 format = "%s: %s\n" % (name, format) 74 _sys.stderr.write(format) 75 76 else: 77 # Disable this when using "python -O" 78 class _Verbose(object): 79 def __init__(self, verbose=None): 80 pass 81 def _note(self, *args): 82 pass 83 84 # Support for profile and trace hooks 85 86 _profile_hook = None 87 _trace_hook = None 88 89 def setprofile(func): 90 """Set a profile function for all threads started from the threading module. 91 92 The func will be passed to sys.setprofile() for each thread, before its 93 run() method is called. 94 95 """ 96 global _profile_hook 97 _profile_hook = func 98 99 def settrace(func): 100 """Set a trace function for all threads started from the threading module. 101 102 The func will be passed to sys.settrace() for each thread, before its run() 103 method is called. 104 105 """ 106 global _trace_hook 107 _trace_hook = func 108 109 # Synchronization classes 110 111 Lock = _allocate_lock 112 113 def RLock(*args, **kwargs): 114 """Factory function that returns a new reentrant lock. 115 116 A reentrant lock must be released by the thread that acquired it. Once a 117 thread has acquired a reentrant lock, the same thread may acquire it again 118 without blocking; the thread must release it once for each time it has 119 acquired it. 120 121 """ 122 return _RLock(*args, **kwargs) 123 124 class _RLock(_Verbose): 125 """A reentrant lock must be released by the thread that acquired it. Once a 126 thread has acquired a reentrant lock, the same thread may acquire it 127 again without blocking; the thread must release it once for each time it 128 has acquired it. 129 """ 130 131 def __init__(self, verbose=None): 132 _Verbose.__init__(self, verbose) 133 self.__block = _allocate_lock() 134 self.__owner = None 135 self.__count = 0 136 137 def __repr__(self): 138 owner = self.__owner 139 try: 140 owner = _active[owner].name 141 except KeyError: 142 pass 143 return "<%s owner=%r count=%d>" % ( 144 self.__class__.__name__, owner, self.__count) 145 146 def acquire(self, blocking=1): 147 """Acquire a lock, blocking or non-blocking. 148 149 When invoked without arguments: if this thread already owns the lock, 150 increment the recursion level by one, and return immediately. Otherwise, 151 if another thread owns the lock, block until the lock is unlocked. Once 152 the lock is unlocked (not owned by any thread), then grab ownership, set 153 the recursion level to one, and return. If more than one thread is 154 blocked waiting until the lock is unlocked, only one at a time will be 155 able to grab ownership of the lock. There is no return value in this 156 case. 157 158 When invoked with the blocking argument set to true, do the same thing 159 as when called without arguments, and return true. 160 161 When invoked with the blocking argument set to false, do not block. If a 162 call without an argument would block, return false immediately; 163 otherwise, do the same thing as when called without arguments, and 164 return true. 165 166 """ 167 me = _get_ident() 168 if self.__owner == me: 169 self.__count = self.__count + 1 170 if __debug__: 171 self._note("%s.acquire(%s): recursive success", self, blocking) 172 return 1 173 rc = self.__block.acquire(blocking) 174 if rc: 175 self.__owner = me 176 self.__count = 1 177 if __debug__: 178 self._note("%s.acquire(%s): initial success", self, blocking) 179 else: 180 if __debug__: 181 self._note("%s.acquire(%s): failure", self, blocking) 182 return rc 183 184 __enter__ = acquire 185 186 def release(self): 187 """Release a lock, decrementing the recursion level. 188 189 If after the decrement it is zero, reset the lock to unlocked (not owned 190 by any thread), and if any other threads are blocked waiting for the 191 lock to become unlocked, allow exactly one of them to proceed. If after 192 the decrement the recursion level is still nonzero, the lock remains 193 locked and owned by the calling thread. 194 195 Only call this method when the calling thread owns the lock. A 196 RuntimeError is raised if this method is called when the lock is 197 unlocked. 198 199 There is no return value. 200 201 """ 202 if self.__owner != _get_ident(): 203 raise RuntimeError("cannot release un-acquired lock") 204 self.__count = count = self.__count - 1 205 if not count: 206 self.__owner = None 207 self.__block.release() 208 if __debug__: 209 self._note("%s.release(): final release", self) 210 else: 211 if __debug__: 212 self._note("%s.release(): non-final release", self) 213 214 def __exit__(self, t, v, tb): 215 self.release() 216 217 # Internal methods used by condition variables 218 219 def _acquire_restore(self, count_owner): 220 count, owner = count_owner 221 self.__block.acquire() 222 self.__count = count 223 self.__owner = owner 224 if __debug__: 225 self._note("%s._acquire_restore()", self) 226 227 def _release_save(self): 228 if __debug__: 229 self._note("%s._release_save()", self) 230 count = self.__count 231 self.__count = 0 232 owner = self.__owner 233 self.__owner = None 234 self.__block.release() 235 return (count, owner) 236 237 def _is_owned(self): 238 return self.__owner == _get_ident() 239 240 241 def Condition(*args, **kwargs): 242 """Factory function that returns a new condition variable object. 243 244 A condition variable allows one or more threads to wait until they are 245 notified by another thread. 246 247 If the lock argument is given and not None, it must be a Lock or RLock 248 object, and it is used as the underlying lock. Otherwise, a new RLock object 249 is created and used as the underlying lock. 250 251 """ 252 return _Condition(*args, **kwargs) 253 254 class _Condition(_Verbose): 255 """Condition variables allow one or more threads to wait until they are 256 notified by another thread. 257 """ 258 259 def __init__(self, lock=None, verbose=None): 260 _Verbose.__init__(self, verbose) 261 if lock is None: 262 lock = RLock() 263 self.__lock = lock 264 # Export the lock's acquire() and release() methods 265 self.acquire = lock.acquire 266 self.release = lock.release 267 # If the lock defines _release_save() and/or _acquire_restore(), 268 # these override the default implementations (which just call 269 # release() and acquire() on the lock). Ditto for _is_owned(). 270 try: 271 self._release_save = lock._release_save 272 except AttributeError: 273 pass 274 try: 275 self._acquire_restore = lock._acquire_restore 276 except AttributeError: 277 pass 278 try: 279 self._is_owned = lock._is_owned 280 except AttributeError: 281 pass 282 self.__waiters = [] 283 284 def __enter__(self): 285 return self.__lock.__enter__() 286 287 def __exit__(self, *args): 288 return self.__lock.__exit__(*args) 289 290 def __repr__(self): 291 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) 292 293 def _release_save(self): 294 self.__lock.release() # No state to save 295 296 def _acquire_restore(self, x): 297 self.__lock.acquire() # Ignore saved state 298 299 def _is_owned(self): 300 # Return True if lock is owned by current_thread. 301 # This method is called only if __lock doesn't have _is_owned(). 302 if self.__lock.acquire(0): 303 self.__lock.release() 304 return False 305 else: 306 return True 307 308 def wait(self, timeout=None): 309 """Wait until notified or until a timeout occurs. 310 311 If the calling thread has not acquired the lock when this method is 312 called, a RuntimeError is raised. 313 314 This method releases the underlying lock, and then blocks until it is 315 awakened by a notify() or notifyAll() call for the same condition 316 variable in another thread, or until the optional timeout occurs. Once 317 awakened or timed out, it re-acquires the lock and returns. 318 319 When the timeout argument is present and not None, it should be a 320 floating point number specifying a timeout for the operation in seconds 321 (or fractions thereof). 322 323 When the underlying lock is an RLock, it is not released using its 324 release() method, since this may not actually unlock the lock when it 325 was acquired multiple times recursively. Instead, an internal interface 326 of the RLock class is used, which really unlocks it even when it has 327 been recursively acquired several times. Another internal interface is 328 then used to restore the recursion level when the lock is reacquired. 329 330 """ 331 if not self._is_owned(): 332 raise RuntimeError("cannot wait on un-acquired lock") 333 waiter = _allocate_lock() 334 waiter.acquire() 335 self.__waiters.append(waiter) 336 saved_state = self._release_save() 337 try: # restore state no matter what (e.g., KeyboardInterrupt) 338 if timeout is None: 339 waiter.acquire() 340 if __debug__: 341 self._note("%s.wait(): got it", self) 342 else: 343 # Balancing act: We can't afford a pure busy loop, so we 344 # have to sleep; but if we sleep the whole timeout time, 345 # we'll be unresponsive. The scheme here sleeps very 346 # little at first, longer as time goes on, but never longer 347 # than 20 times per second (or the timeout time remaining). 348 endtime = _time() + timeout 349 delay = 0.0005 # 500 us -> initial delay of 1 ms 350 while True: 351 gotit = waiter.acquire(0) 352 if gotit: 353 break 354 remaining = endtime - _time() 355 if remaining <= 0: 356 break 357 delay = min(delay * 2, remaining, .05) 358 _sleep(delay) 359 if not gotit: 360 if __debug__: 361 self._note("%s.wait(%s): timed out", self, timeout) 362 try: 363 self.__waiters.remove(waiter) 364 except ValueError: 365 pass 366 else: 367 if __debug__: 368 self._note("%s.wait(%s): got it", self, timeout) 369 finally: 370 self._acquire_restore(saved_state) 371 372 def notify(self, n=1): 373 """Wake up one or more threads waiting on this condition, if any. 374 375 If the calling thread has not acquired the lock when this method is 376 called, a RuntimeError is raised. 377 378 This method wakes up at most n of the threads waiting for the condition 379 variable; it is a no-op if no threads are waiting. 380 381 """ 382 if not self._is_owned(): 383 raise RuntimeError("cannot notify on un-acquired lock") 384 __waiters = self.__waiters 385 waiters = __waiters[:n] 386 if not waiters: 387 if __debug__: 388 self._note("%s.notify(): no waiters", self) 389 return 390 self._note("%s.notify(): notifying %d waiter%s", self, n, 391 n!=1 and "s" or "") 392 for waiter in waiters: 393 waiter.release() 394 try: 395 __waiters.remove(waiter) 396 except ValueError: 397 pass 398 399 def notifyAll(self): 400 """Wake up all threads waiting on this condition. 401 402 If the calling thread has not acquired the lock when this method 403 is called, a RuntimeError is raised. 404 405 """ 406 self.notify(len(self.__waiters)) 407 408 notify_all = notifyAll 409 410 411 def Semaphore(*args, **kwargs): 412 """A factory function that returns a new semaphore. 413 414 Semaphores manage a counter representing the number of release() calls minus 415 the number of acquire() calls, plus an initial value. The acquire() method 416 blocks if necessary until it can return without making the counter 417 negative. If not given, value defaults to 1. 418 419 """ 420 return _Semaphore(*args, **kwargs) 421 422 class _Semaphore(_Verbose): 423 """Semaphores manage a counter representing the number of release() calls 424 minus the number of acquire() calls, plus an initial value. The acquire() 425 method blocks if necessary until it can return without making the counter 426 negative. If not given, value defaults to 1. 427 428 """ 429 430 # After Tim Peters' semaphore class, but not quite the same (no maximum) 431 432 def __init__(self, value=1, verbose=None): 433 if value < 0: 434 raise ValueError("semaphore initial value must be >= 0") 435 _Verbose.__init__(self, verbose) 436 self.__cond = Condition(Lock()) 437 self.__value = value 438 439 def acquire(self, blocking=1): 440 """Acquire a semaphore, decrementing the internal counter by one. 441 442 When invoked without arguments: if the internal counter is larger than 443 zero on entry, decrement it by one and return immediately. If it is zero 444 on entry, block, waiting until some other thread has called release() to 445 make it larger than zero. This is done with proper interlocking so that 446 if multiple acquire() calls are blocked, release() will wake exactly one 447 of them up. The implementation may pick one at random, so the order in 448 which blocked threads are awakened should not be relied on. There is no 449 return value in this case. 450 451 When invoked with blocking set to true, do the same thing as when called 452 without arguments, and return true. 453 454 When invoked with blocking set to false, do not block. If a call without 455 an argument would block, return false immediately; otherwise, do the 456 same thing as when called without arguments, and return true. 457 458 """ 459 rc = False 460 with self.__cond: 461 while self.__value == 0: 462 if not blocking: 463 break 464 if __debug__: 465 self._note("%s.acquire(%s): blocked waiting, value=%s", 466 self, blocking, self.__value) 467 self.__cond.wait() 468 else: 469 self.__value = self.__value - 1 470 if __debug__: 471 self._note("%s.acquire: success, value=%s", 472 self, self.__value) 473 rc = True 474 return rc 475 476 __enter__ = acquire 477 478 def release(self): 479 """Release a semaphore, incrementing the internal counter by one. 480 481 When the counter is zero on entry and another thread is waiting for it 482 to become larger than zero again, wake up that thread. 483 484 """ 485 with self.__cond: 486 self.__value = self.__value + 1 487 if __debug__: 488 self._note("%s.release: success, value=%s", 489 self, self.__value) 490 self.__cond.notify() 491 492 def __exit__(self, t, v, tb): 493 self.release() 494 495 496 def BoundedSemaphore(*args, **kwargs): 497 """A factory function that returns a new bounded semaphore. 498 499 A bounded semaphore checks to make sure its current value doesn't exceed its 500 initial value. If it does, ValueError is raised. In most situations 501 semaphores are used to guard resources with limited capacity. 502 503 If the semaphore is released too many times it's a sign of a bug. If not 504 given, value defaults to 1. 505 506 Like regular semaphores, bounded semaphores manage a counter representing 507 the number of release() calls minus the number of acquire() calls, plus an 508 initial value. The acquire() method blocks if necessary until it can return 509 without making the counter negative. If not given, value defaults to 1. 510 511 """ 512 return _BoundedSemaphore(*args, **kwargs) 513 514 class _BoundedSemaphore(_Semaphore): 515 """A bounded semaphore checks to make sure its current value doesn't exceed 516 its initial value. If it does, ValueError is raised. In most situations 517 semaphores are used to guard resources with limited capacity. 518 """ 519 520 def __init__(self, value=1, verbose=None): 521 _Semaphore.__init__(self, value, verbose) 522 self._initial_value = value 523 524 def release(self): 525 """Release a semaphore, incrementing the internal counter by one. 526 527 When the counter is zero on entry and another thread is waiting for it 528 to become larger than zero again, wake up that thread. 529 530 If the number of releases exceeds the number of acquires, 531 raise a ValueError. 532 533 """ 534 if self._Semaphore__value >= self._initial_value: 535 raise ValueError("Semaphore released too many times") 536 return _Semaphore.release(self) 537 538 539 def Event(*args, **kwargs): 540 """A factory function that returns a new event. 541 542 Events manage a flag that can be set to true with the set() method and reset 543 to false with the clear() method. The wait() method blocks until the flag is 544 true. 545 546 """ 547 return _Event(*args, **kwargs) 548 549 class _Event(_Verbose): 550 """A factory function that returns a new event object. An event manages a 551 flag that can be set to true with the set() method and reset to false 552 with the clear() method. The wait() method blocks until the flag is true. 553 554 """ 555 556 # After Tim Peters' event class (without is_posted()) 557 558 def __init__(self, verbose=None): 559 _Verbose.__init__(self, verbose) 560 self.__cond = Condition(Lock()) 561 self.__flag = False 562 563 def _reset_internal_locks(self): 564 # private! called by Thread._reset_internal_locks by _after_fork() 565 self.__cond.__init__() 566 567 def isSet(self): 568 'Return true if and only if the internal flag is true.' 569 return self.__flag 570 571 is_set = isSet 572 573 def set(self): 574 """Set the internal flag to true. 575 576 All threads waiting for the flag to become true are awakened. Threads 577 that call wait() once the flag is true will not block at all. 578 579 """ 580 self.__cond.acquire() 581 try: 582 self.__flag = True 583 self.__cond.notify_all() 584 finally: 585 self.__cond.release() 586 587 def clear(self): 588 """Reset the internal flag to false. 589 590 Subsequently, threads calling wait() will block until set() is called to 591 set the internal flag to true again. 592 593 """ 594 self.__cond.acquire() 595 try: 596 self.__flag = False 597 finally: 598 self.__cond.release() 599 600 def wait(self, timeout=None): 601 """Block until the internal flag is true. 602 603 If the internal flag is true on entry, return immediately. Otherwise, 604 block until another thread calls set() to set the flag to true, or until 605 the optional timeout occurs. 606 607 When the timeout argument is present and not None, it should be a 608 floating point number specifying a timeout for the operation in seconds 609 (or fractions thereof). 610 611 This method returns the internal flag on exit, so it will always return 612 True except if a timeout is given and the operation times out. 613 614 """ 615 self.__cond.acquire() 616 try: 617 if not self.__flag: 618 self.__cond.wait(timeout) 619 return self.__flag 620 finally: 621 self.__cond.release() 622 623 # Helper to generate new thread names 624 _counter = 0 625 def _newname(template="Thread-%d"): 626 global _counter 627 _counter = _counter + 1 628 return template % _counter 629 630 # Active thread administration 631 _active_limbo_lock = _allocate_lock() 632 _active = {} # maps thread id to Thread object 633 _limbo = {} 634 635 636 # Main class for threads 637 638 class Thread(_Verbose): 639 """A class that represents a thread of control. 640 641 This class can be safely subclassed in a limited fashion. 642 643 """ 644 __initialized = False 645 # Need to store a reference to sys.exc_info for printing 646 # out exceptions when a thread tries to use a global var. during interp. 647 # shutdown and thus raises an exception about trying to perform some 648 # operation on/with a NoneType 649 __exc_info = _sys.exc_info 650 # Keep sys.exc_clear too to clear the exception just before 651 # allowing .join() to return. 652 __exc_clear = _sys.exc_clear 653 654 def __init__(self, group=None, target=None, name=None, 655 args=(), kwargs=None, verbose=None): 656 """This constructor should always be called with keyword arguments. Arguments are: 657 658 *group* should be None; reserved for future extension when a ThreadGroup 659 class is implemented. 660 661 *target* is the callable object to be invoked by the run() 662 method. Defaults to None, meaning nothing is called. 663 664 *name* is the thread name. By default, a unique name is constructed of 665 the form "Thread-N" where N is a small decimal number. 666 667 *args* is the argument tuple for the target invocation. Defaults to (). 668 669 *kwargs* is a dictionary of keyword arguments for the target 670 invocation. Defaults to {}. 671 672 If a subclass overrides the constructor, it must make sure to invoke 673 the base class constructor (Thread.__init__()) before doing anything 674 else to the thread. 675 676 """ 677 assert group is None, "group argument must be None for now" 678 _Verbose.__init__(self, verbose) 679 if kwargs is None: 680 kwargs = {} 681 self.__target = target 682 self.__name = str(name or _newname()) 683 self.__args = args 684 self.__kwargs = kwargs 685 self.__daemonic = self._set_daemon() 686 self.__ident = None 687 self.__started = Event() 688 self.__stopped = False 689 self.__block = Condition(Lock()) 690 self.__initialized = True 691 # sys.stderr is not stored in the class like 692 # sys.exc_info since it can be changed between instances 693 self.__stderr = _sys.stderr 694 695 def _reset_internal_locks(self): 696 # private! Called by _after_fork() to reset our internal locks as 697 # they may be in an invalid state leading to a deadlock or crash. 698 if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block 699 self.__block.__init__() 700 self.__started._reset_internal_locks() 701 702 @property 703 def _block(self): 704 # used by a unittest 705 return self.__block 706 707 def _set_daemon(self): 708 # Overridden in _MainThread and _DummyThread 709 return current_thread().daemon 710 711 def __repr__(self): 712 assert self.__initialized, "Thread.__init__() was not called" 713 status = "initial" 714 if self.__started.is_set(): 715 status = "started" 716 if self.__stopped: 717 status = "stopped" 718 if self.__daemonic: 719 status += " daemon" 720 if self.__ident is not None: 721 status += " %s" % self.__ident 722 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) 723 724 def start(self): 725 """Start the thread's activity. 726 727 It must be called at most once per thread object. It arranges for the 728 object's run() method to be invoked in a separate thread of control. 729 730 This method will raise a RuntimeError if called more than once on the 731 same thread object. 732 733 """ 734 if not self.__initialized: 735 raise RuntimeError("thread.__init__() not called") 736 if self.__started.is_set(): 737 raise RuntimeError("threads can only be started once") 738 if __debug__: 739 self._note("%s.start(): starting thread", self) 740 with _active_limbo_lock: 741 _limbo[self] = self 742 try: 743 _start_new_thread(self.__bootstrap, ()) 744 except Exception: 745 with _active_limbo_lock: 746 del _limbo[self] 747 raise 748 self.__started.wait() 749 750 def run(self): 751 """Method representing the thread's activity. 752 753 You may override this method in a subclass. The standard run() method 754 invokes the callable object passed to the object's constructor as the 755 target argument, if any, with sequential and keyword arguments taken 756 from the args and kwargs arguments, respectively. 757 758 """ 759 try: 760 if self.__target: 761 self.__target(*self.__args, **self.__kwargs) 762 finally: 763 # Avoid a refcycle if the thread is running a function with 764 # an argument that has a member that points to the thread. 765 del self.__target, self.__args, self.__kwargs 766 767 def __bootstrap(self): 768 # Wrapper around the real bootstrap code that ignores 769 # exceptions during interpreter cleanup. Those typically 770 # happen when a daemon thread wakes up at an unfortunate 771 # moment, finds the world around it destroyed, and raises some 772 # random exception *** while trying to report the exception in 773 # __bootstrap_inner() below ***. Those random exceptions 774 # don't help anybody, and they confuse users, so we suppress 775 # them. We suppress them only when it appears that the world 776 # indeed has already been destroyed, so that exceptions in 777 # __bootstrap_inner() during normal business hours are properly 778 # reported. Also, we only suppress them for daemonic threads; 779 # if a non-daemonic encounters this, something else is wrong. 780 try: 781 self.__bootstrap_inner() 782 except: 783 if self.__daemonic and _sys is None: 784 return 785 raise 786 787 def _set_ident(self): 788 self.__ident = _get_ident() 789 790 def __bootstrap_inner(self): 791 try: 792 self._set_ident() 793 self.__started.set() 794 with _active_limbo_lock: 795 _active[self.__ident] = self 796 del _limbo[self] 797 if __debug__: 798 self._note("%s.__bootstrap(): thread started", self) 799 800 if _trace_hook: 801 self._note("%s.__bootstrap(): registering trace hook", self) 802 _sys.settrace(_trace_hook) 803 if _profile_hook: 804 self._note("%s.__bootstrap(): registering profile hook", self) 805 _sys.setprofile(_profile_hook) 806 807 try: 808 self.run() 809 except SystemExit: 810 if __debug__: 811 self._note("%s.__bootstrap(): raised SystemExit", self) 812 except: 813 if __debug__: 814 self._note("%s.__bootstrap(): unhandled exception", self) 815 # If sys.stderr is no more (most likely from interpreter 816 # shutdown) use self.__stderr. Otherwise still use sys (as in 817 # _sys) in case sys.stderr was redefined since the creation of 818 # self. 819 if _sys: 820 _sys.stderr.write("Exception in thread %s:\n%s\n" % 821 (self.name, _format_exc())) 822 else: 823 # Do the best job possible w/o a huge amt. of code to 824 # approximate a traceback (code ideas from 825 # Lib/traceback.py) 826 exc_type, exc_value, exc_tb = self.__exc_info() 827 try: 828 print>>self.__stderr, ( 829 "Exception in thread " + self.name + 830 " (most likely raised during interpreter shutdown):") 831 print>>self.__stderr, ( 832 "Traceback (most recent call last):") 833 while exc_tb: 834 print>>self.__stderr, ( 835 ' File "%s", line %s, in %s' % 836 (exc_tb.tb_frame.f_code.co_filename, 837 exc_tb.tb_lineno, 838 exc_tb.tb_frame.f_code.co_name)) 839 exc_tb = exc_tb.tb_next 840 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 841 # Make sure that exc_tb gets deleted since it is a memory 842 # hog; deleting everything else is just for thoroughness 843 finally: 844 del exc_type, exc_value, exc_tb 845 else: 846 if __debug__: 847 self._note("%s.__bootstrap(): normal return", self) 848 finally: 849 # Prevent a race in 850 # test_threading.test_no_refcycle_through_target when 851 # the exception keeps the target alive past when we 852 # assert that it's dead. 853 self.__exc_clear() 854 finally: 855 with _active_limbo_lock: 856 self.__stop() 857 try: 858 # We don't call self.__delete() because it also 859 # grabs _active_limbo_lock. 860 del _active[_get_ident()] 861 except: 862 pass 863 864 def __stop(self): 865 # DummyThreads delete self.__block, but they have no waiters to 866 # notify anyway (join() is forbidden on them). 867 if not hasattr(self, '_Thread__block'): 868 return 869 self.__block.acquire() 870 self.__stopped = True 871 self.__block.notify_all() 872 self.__block.release() 873 874 def __delete(self): 875 "Remove current thread from the dict of currently running threads." 876 877 # Notes about running with dummy_thread: 878 # 879 # Must take care to not raise an exception if dummy_thread is being 880 # used (and thus this module is being used as an instance of 881 # dummy_threading). dummy_thread.get_ident() always returns -1 since 882 # there is only one thread if dummy_thread is being used. Thus 883 # len(_active) is always <= 1 here, and any Thread instance created 884 # overwrites the (if any) thread currently registered in _active. 885 # 886 # An instance of _MainThread is always created by 'threading'. This 887 # gets overwritten the instant an instance of Thread is created; both 888 # threads return -1 from dummy_thread.get_ident() and thus have the 889 # same key in the dict. So when the _MainThread instance created by 890 # 'threading' tries to clean itself up when atexit calls this method 891 # it gets a KeyError if another Thread instance was created. 892 # 893 # This all means that KeyError from trying to delete something from 894 # _active if dummy_threading is being used is a red herring. But 895 # since it isn't if dummy_threading is *not* being used then don't 896 # hide the exception. 897 898 try: 899 with _active_limbo_lock: 900 del _active[_get_ident()] 901 # There must not be any python code between the previous line 902 # and after the lock is released. Otherwise a tracing function 903 # could try to acquire the lock again in the same thread, (in 904 # current_thread()), and would block. 905 except KeyError: 906 if 'dummy_threading' not in _sys.modules: 907 raise 908 909 def join(self, timeout=None): 910 """Wait until the thread terminates. 911 912 This blocks the calling thread until the thread whose join() method is 913 called terminates -- either normally or through an unhandled exception 914 or until the optional timeout occurs. 915 916 When the timeout argument is present and not None, it should be a 917 floating point number specifying a timeout for the operation in seconds 918 (or fractions thereof). As join() always returns None, you must call 919 isAlive() after join() to decide whether a timeout happened -- if the 920 thread is still alive, the join() call timed out. 921 922 When the timeout argument is not present or None, the operation will 923 block until the thread terminates. 924 925 A thread can be join()ed many times. 926 927 join() raises a RuntimeError if an attempt is made to join the current 928 thread as that would cause a deadlock. It is also an error to join() a 929 thread before it has been started and attempts to do so raises the same 930 exception. 931 932 """ 933 if not self.__initialized: 934 raise RuntimeError("Thread.__init__() not called") 935 if not self.__started.is_set(): 936 raise RuntimeError("cannot join thread before it is started") 937 if self is current_thread(): 938 raise RuntimeError("cannot join current thread") 939 940 if __debug__: 941 if not self.__stopped: 942 self._note("%s.join(): waiting until thread stops", self) 943 self.__block.acquire() 944 try: 945 if timeout is None: 946 while not self.__stopped: 947 self.__block.wait() 948 if __debug__: 949 self._note("%s.join(): thread stopped", self) 950 else: 951 deadline = _time() + timeout 952 while not self.__stopped: 953 delay = deadline - _time() 954 if delay <= 0: 955 if __debug__: 956 self._note("%s.join(): timed out", self) 957 break 958 self.__block.wait(delay) 959 else: 960 if __debug__: 961 self._note("%s.join(): thread stopped", self) 962 finally: 963 self.__block.release() 964 965 @property 966 def name(self): 967 """A string used for identification purposes only. 968 969 It has no semantics. Multiple threads may be given the same name. The 970 initial name is set by the constructor. 971 972 """ 973 assert self.__initialized, "Thread.__init__() not called" 974 return self.__name 975 976 @name.setter 977 def name(self, name): 978 assert self.__initialized, "Thread.__init__() not called" 979 self.__name = str(name) 980 981 @property 982 def ident(self): 983 """Thread identifier of this thread or None if it has not been started. 984 985 This is a nonzero integer. See the thread.get_ident() function. Thread 986 identifiers may be recycled when a thread exits and another thread is 987 created. The identifier is available even after the thread has exited. 988 989 """ 990 assert self.__initialized, "Thread.__init__() not called" 991 return self.__ident 992 993 def isAlive(self): 994 """Return whether the thread is alive. 995 996 This method returns True just before the run() method starts until just 997 after the run() method terminates. The module function enumerate() 998 returns a list of all alive threads. 999 1000 """ 1001 assert self.__initialized, "Thread.__init__() not called" 1002 return self.__started.is_set() and not self.__stopped 1003 1004 is_alive = isAlive 1005 1006 @property 1007 def daemon(self): 1008 """A boolean value indicating whether this thread is a daemon thread (True) or not (False). 1009 1010 This must be set before start() is called, otherwise RuntimeError is 1011 raised. Its initial value is inherited from the creating thread; the 1012 main thread is not a daemon thread and therefore all threads created in 1013 the main thread default to daemon = False. 1014 1015 The entire Python program exits when no alive non-daemon threads are 1016 left. 1017 1018 """ 1019 assert self.__initialized, "Thread.__init__() not called" 1020 return self.__daemonic 1021 1022 @daemon.setter 1023 def daemon(self, daemonic): 1024 if not self.__initialized: 1025 raise RuntimeError("Thread.__init__() not called") 1026 if self.__started.is_set(): 1027 raise RuntimeError("cannot set daemon status of active thread"); 1028 self.__daemonic = daemonic 1029 1030 def isDaemon(self): 1031 return self.daemon 1032 1033 def setDaemon(self, daemonic): 1034 self.daemon = daemonic 1035 1036 def getName(self): 1037 return self.name 1038 1039 def setName(self, name): 1040 self.name = name 1041 1042 # The timer class was contributed by Itamar Shtull-Trauring 1043 1044 def Timer(*args, **kwargs): 1045 """Factory function to create a Timer object. 1046 1047 Timers call a function after a specified number of seconds: 1048 1049 t = Timer(30.0, f, args=[], kwargs={}) 1050 t.start() 1051 t.cancel() # stop the timer's action if it's still waiting 1052 1053 """ 1054 return _Timer(*args, **kwargs) 1055 1056 class _Timer(Thread): 1057 """Call a function after a specified number of seconds: 1058 1059 t = Timer(30.0, f, args=[], kwargs={}) 1060 t.start() 1061 t.cancel() # stop the timer's action if it's still waiting 1062 1063 """ 1064 1065 def __init__(self, interval, function, args=[], kwargs={}): 1066 Thread.__init__(self) 1067 self.interval = interval 1068 self.function = function 1069 self.args = args 1070 self.kwargs = kwargs 1071 self.finished = Event() 1072 1073 def cancel(self): 1074 """Stop the timer if it hasn't finished yet""" 1075 self.finished.set() 1076 1077 def run(self): 1078 self.finished.wait(self.interval) 1079 if not self.finished.is_set(): 1080 self.function(*self.args, **self.kwargs) 1081 self.finished.set() 1082 1083 # Special thread class to represent the main thread 1084 # This is garbage collected through an exit handler 1085 1086 class _MainThread(Thread): 1087 1088 def __init__(self): 1089 Thread.__init__(self, name="MainThread") 1090 self._Thread__started.set() 1091 self._set_ident() 1092 with _active_limbo_lock: 1093 _active[_get_ident()] = self 1094 1095 def _set_daemon(self): 1096 return False 1097 1098 def _exitfunc(self): 1099 self._Thread__stop() 1100 t = _pickSomeNonDaemonThread() 1101 if t: 1102 if __debug__: 1103 self._note("%s: waiting for other threads", self) 1104 while t: 1105 t.join() 1106 t = _pickSomeNonDaemonThread() 1107 if __debug__: 1108 self._note("%s: exiting", self) 1109 self._Thread__delete() 1110 1111 def _pickSomeNonDaemonThread(): 1112 for t in enumerate(): 1113 if not t.daemon and t.is_alive(): 1114 return t 1115 return None 1116 1117 1118 # Dummy thread class to represent threads not started here. 1119 # These aren't garbage collected when they die, nor can they be waited for. 1120 # If they invoke anything in threading.py that calls current_thread(), they 1121 # leave an entry in the _active dict forever after. 1122 # Their purpose is to return *something* from current_thread(). 1123 # They are marked as daemon threads so we won't wait for them 1124 # when we exit (conform previous semantics). 1125 1126 class _DummyThread(Thread): 1127 1128 def __init__(self): 1129 Thread.__init__(self, name=_newname("Dummy-%d")) 1130 1131 # Thread.__block consumes an OS-level locking primitive, which 1132 # can never be used by a _DummyThread. Since a _DummyThread 1133 # instance is immortal, that's bad, so release this resource. 1134 del self._Thread__block 1135 1136 self._Thread__started.set() 1137 self._set_ident() 1138 with _active_limbo_lock: 1139 _active[_get_ident()] = self 1140 1141 def _set_daemon(self): 1142 return True 1143 1144 def join(self, timeout=None): 1145 assert False, "cannot join a dummy thread" 1146 1147 1148 # Global API functions 1149 1150 def currentThread(): 1151 """Return the current Thread object, corresponding to the caller's thread of control. 1152 1153 If the caller's thread of control was not created through the threading 1154 module, a dummy thread object with limited functionality is returned. 1155 1156 """ 1157 try: 1158 return _active[_get_ident()] 1159 except KeyError: 1160 ##print "current_thread(): no current thread for", _get_ident() 1161 return _DummyThread() 1162 1163 current_thread = currentThread 1164 1165 def activeCount(): 1166 """Return the number of Thread objects currently alive. 1167 1168 The returned count is equal to the length of the list returned by 1169 enumerate(). 1170 1171 """ 1172 with _active_limbo_lock: 1173 return len(_active) + len(_limbo) 1174 1175 active_count = activeCount 1176 1177 def _enumerate(): 1178 # Same as enumerate(), but without the lock. Internal use only. 1179 return _active.values() + _limbo.values() 1180 1181 def enumerate(): 1182 """Return a list of all Thread objects currently alive. 1183 1184 The list includes daemonic threads, dummy thread objects created by 1185 current_thread(), and the main thread. It excludes terminated threads and 1186 threads that have not yet been started. 1187 1188 """ 1189 with _active_limbo_lock: 1190 return _active.values() + _limbo.values() 1191 1192 from thread import stack_size 1193 1194 # Create the main thread object, 1195 # and make it available for the interpreter 1196 # (Py_Main) as threading._shutdown. 1197 1198 _shutdown = _MainThread()._exitfunc 1199 1200 # get thread-local implementation, either from the thread 1201 # module, or from the python fallback 1202 1203 try: 1204 from thread import _local as local 1205 except ImportError: 1206 from _threading_local import local 1207 1208 1209 def _after_fork(): 1210 # This function is called by Python/ceval.c:PyEval_ReInitThreads which 1211 # is called from PyOS_AfterFork. Here we cleanup threading module state 1212 # that should not exist after a fork. 1213 1214 # Reset _active_limbo_lock, in case we forked while the lock was held 1215 # by another (non-forked) thread. http://bugs.python.org/issue874900 1216 global _active_limbo_lock 1217 _active_limbo_lock = _allocate_lock() 1218 1219 # fork() only copied the current thread; clear references to others. 1220 new_active = {} 1221 current = current_thread() 1222 with _active_limbo_lock: 1223 for thread in _active.itervalues(): 1224 # Any lock/condition variable may be currently locked or in an 1225 # invalid state, so we reinitialize them. 1226 if hasattr(thread, '_reset_internal_locks'): 1227 thread._reset_internal_locks() 1228 if thread is current: 1229 # There is only one active thread. We reset the ident to 1230 # its new value since it can have changed. 1231 ident = _get_ident() 1232 thread._Thread__ident = ident 1233 new_active[ident] = thread 1234 else: 1235 # All the others are already stopped. 1236 thread._Thread__stop() 1237 1238 _limbo.clear() 1239 _active.clear() 1240 _active.update(new_active) 1241 assert len(_active) == 1 1242 1243 1244 # Self-test code 1245 1246 def _test(): 1247 1248 class BoundedQueue(_Verbose): 1249 1250 def __init__(self, limit): 1251 _Verbose.__init__(self) 1252 self.mon = RLock() 1253 self.rc = Condition(self.mon) 1254 self.wc = Condition(self.mon) 1255 self.limit = limit 1256 self.queue = _deque() 1257 1258 def put(self, item): 1259 self.mon.acquire() 1260 while len(self.queue) >= self.limit: 1261 self._note("put(%s): queue full", item) 1262 self.wc.wait() 1263 self.queue.append(item) 1264 self._note("put(%s): appended, length now %d", 1265 item, len(self.queue)) 1266 self.rc.notify() 1267 self.mon.release() 1268 1269 def get(self): 1270 self.mon.acquire() 1271 while not self.queue: 1272 self._note("get(): queue empty") 1273 self.rc.wait() 1274 item = self.queue.popleft() 1275 self._note("get(): got %s, %d left", item, len(self.queue)) 1276 self.wc.notify() 1277 self.mon.release() 1278 return item 1279 1280 class ProducerThread(Thread): 1281 1282 def __init__(self, queue, quota): 1283 Thread.__init__(self, name="Producer") 1284 self.queue = queue 1285 self.quota = quota 1286 1287 def run(self): 1288 from random import random 1289 counter = 0 1290 while counter < self.quota: 1291 counter = counter + 1 1292 self.queue.put("%s.%d" % (self.name, counter)) 1293 _sleep(random() * 0.00001) 1294 1295 1296 class ConsumerThread(Thread): 1297 1298 def __init__(self, queue, count): 1299 Thread.__init__(self, name="Consumer") 1300 self.queue = queue 1301 self.count = count 1302 1303 def run(self): 1304 while self.count > 0: 1305 item = self.queue.get() 1306 print item 1307 self.count = self.count - 1 1308 1309 NP = 3 1310 QL = 4 1311 NI = 5 1312 1313 Q = BoundedQueue(QL) 1314 P = [] 1315 for i in range(NP): 1316 t = ProducerThread(Q, NI) 1317 t.name = ("Producer-%d" % (i+1)) 1318 P.append(t) 1319 C = ConsumerThread(Q, NI*NP) 1320 for t in P: 1321 t.start() 1322 _sleep(0.000001) 1323 C.start() 1324 for t in P: 1325 t.join() 1326 C.join() 1327 1328 if __name__ == '__main__': 1329 _test() 1330