1 """Thread module emulating a subset of Java's threading model.""" 2 3 import sys as _sys 4 import _thread 5 6 from time import monotonic as _time 7 from traceback import format_exc as _format_exc 8 from _weakrefset import WeakSet 9 from itertools import islice as _islice, count as _count 10 try: 11 from _collections import deque as _deque 12 except ImportError: 13 from collections import deque as _deque 14 15 # Note regarding PEP 8 compliant names 16 # This threading model was originally inspired by Java, and inherited 17 # the convention of camelCase function and method names from that 18 # language. Those original names are not in any imminent danger of 19 # being deprecated (even for Py3k),so this module provides them as an 20 # alias for the PEP 8 compliant names 21 # Note that using the new PEP 8 compliant names facilitates substitution 22 # with the multiprocessing module, which doesn't provide the old 23 # Java inspired names. 24 25 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 26 'enumerate', 'main_thread', 'TIMEOUT_MAX', 27 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 28 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 29 'setprofile', 'settrace', 'local', 'stack_size'] 30 31 # Rename some stuff so "from threading import *" is safe 32 _start_new_thread = _thread.start_new_thread 33 _allocate_lock = _thread.allocate_lock 34 _set_sentinel = _thread._set_sentinel 35 get_ident = _thread.get_ident 36 ThreadError = _thread.error 37 try: 38 _CRLock = _thread.RLock 39 except AttributeError: 40 _CRLock = None 41 TIMEOUT_MAX = _thread.TIMEOUT_MAX 42 del _thread 43 44 45 # Support for profile and trace hooks 46 47 _profile_hook = None 48 _trace_hook = None 49 50 def setprofile(func): 51 """Set a profile function for all threads started from the threading module. 52 53 The func will be passed to sys.setprofile() for each thread, before its 54 run() method is called. 55 56 """ 57 global _profile_hook 58 _profile_hook = func 59 60 def settrace(func): 61 """Set a trace function for all threads started from the threading module. 62 63 The func will be passed to sys.settrace() for each thread, before its run() 64 method is called. 65 66 """ 67 global _trace_hook 68 _trace_hook = func 69 70 # Synchronization classes 71 72 Lock = _allocate_lock 73 74 def RLock(*args, **kwargs): 75 """Factory function that returns a new reentrant lock. 76 77 A reentrant lock must be released by the thread that acquired it. Once a 78 thread has acquired a reentrant lock, the same thread may acquire it again 79 without blocking; the thread must release it once for each time it has 80 acquired it. 81 82 """ 83 if _CRLock is None: 84 return _PyRLock(*args, **kwargs) 85 return _CRLock(*args, **kwargs) 86 87 class _RLock: 88 """This class implements reentrant lock objects. 89 90 A reentrant lock must be released by the thread that acquired it. Once a 91 thread has acquired a reentrant lock, the same thread may acquire it 92 again without blocking; the thread must release it once for each time it 93 has acquired it. 94 95 """ 96 97 def __init__(self): 98 self._block = _allocate_lock() 99 self._owner = None 100 self._count = 0 101 102 def __repr__(self): 103 owner = self._owner 104 try: 105 owner = _active[owner].name 106 except KeyError: 107 pass 108 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 109 "locked" if self._block.locked() else "unlocked", 110 self.__class__.__module__, 111 self.__class__.__qualname__, 112 owner, 113 self._count, 114 hex(id(self)) 115 ) 116 117 def acquire(self, blocking=True, timeout=-1): 118 """Acquire a lock, blocking or non-blocking. 119 120 When invoked without arguments: if this thread already owns the lock, 121 increment the recursion level by one, and return immediately. Otherwise, 122 if another thread owns the lock, block until the lock is unlocked. Once 123 the lock is unlocked (not owned by any thread), then grab ownership, set 124 the recursion level to one, and return. If more than one thread is 125 blocked waiting until the lock is unlocked, only one at a time will be 126 able to grab ownership of the lock. There is no return value in this 127 case. 128 129 When invoked with the blocking argument set to true, do the same thing 130 as when called without arguments, and return true. 131 132 When invoked with the blocking argument set to false, do not block. If a 133 call without an argument would block, return false immediately; 134 otherwise, do the same thing as when called without arguments, and 135 return true. 136 137 When invoked with the floating-point timeout argument set to a positive 138 value, block for at most the number of seconds specified by timeout 139 and as long as the lock cannot be acquired. Return true if the lock has 140 been acquired, false if the timeout has elapsed. 141 142 """ 143 me = get_ident() 144 if self._owner == me: 145 self._count += 1 146 return 1 147 rc = self._block.acquire(blocking, timeout) 148 if rc: 149 self._owner = me 150 self._count = 1 151 return rc 152 153 __enter__ = acquire 154 155 def release(self): 156 """Release a lock, decrementing the recursion level. 157 158 If after the decrement it is zero, reset the lock to unlocked (not owned 159 by any thread), and if any other threads are blocked waiting for the 160 lock to become unlocked, allow exactly one of them to proceed. If after 161 the decrement the recursion level is still nonzero, the lock remains 162 locked and owned by the calling thread. 163 164 Only call this method when the calling thread owns the lock. A 165 RuntimeError is raised if this method is called when the lock is 166 unlocked. 167 168 There is no return value. 169 170 """ 171 if self._owner != get_ident(): 172 raise RuntimeError("cannot release un-acquired lock") 173 self._count = count = self._count - 1 174 if not count: 175 self._owner = None 176 self._block.release() 177 178 def __exit__(self, t, v, tb): 179 self.release() 180 181 # Internal methods used by condition variables 182 183 def _acquire_restore(self, state): 184 self._block.acquire() 185 self._count, self._owner = state 186 187 def _release_save(self): 188 if self._count == 0: 189 raise RuntimeError("cannot release un-acquired lock") 190 count = self._count 191 self._count = 0 192 owner = self._owner 193 self._owner = None 194 self._block.release() 195 return (count, owner) 196 197 def _is_owned(self): 198 return self._owner == get_ident() 199 200 _PyRLock = _RLock 201 202 203 class Condition: 204 """Class that implements a condition variable. 205 206 A condition variable allows one or more threads to wait until they are 207 notified by another thread. 208 209 If the lock argument is given and not None, it must be a Lock or RLock 210 object, and it is used as the underlying lock. Otherwise, a new RLock object 211 is created and used as the underlying lock. 212 213 """ 214 215 def __init__(self, lock=None): 216 if lock is None: 217 lock = RLock() 218 self._lock = lock 219 # Export the lock's acquire() and release() methods 220 self.acquire = lock.acquire 221 self.release = lock.release 222 # If the lock defines _release_save() and/or _acquire_restore(), 223 # these override the default implementations (which just call 224 # release() and acquire() on the lock). Ditto for _is_owned(). 225 try: 226 self._release_save = lock._release_save 227 except AttributeError: 228 pass 229 try: 230 self._acquire_restore = lock._acquire_restore 231 except AttributeError: 232 pass 233 try: 234 self._is_owned = lock._is_owned 235 except AttributeError: 236 pass 237 self._waiters = _deque() 238 239 def __enter__(self): 240 return self._lock.__enter__() 241 242 def __exit__(self, *args): 243 return self._lock.__exit__(*args) 244 245 def __repr__(self): 246 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 247 248 def _release_save(self): 249 self._lock.release() # No state to save 250 251 def _acquire_restore(self, x): 252 self._lock.acquire() # Ignore saved state 253 254 def _is_owned(self): 255 # Return True if lock is owned by current_thread. 256 # This method is called only if _lock doesn't have _is_owned(). 257 if self._lock.acquire(0): 258 self._lock.release() 259 return False 260 else: 261 return True 262 263 def wait(self, timeout=None): 264 """Wait until notified or until a timeout occurs. 265 266 If the calling thread has not acquired the lock when this method is 267 called, a RuntimeError is raised. 268 269 This method releases the underlying lock, and then blocks until it is 270 awakened by a notify() or notify_all() call for the same condition 271 variable in another thread, or until the optional timeout occurs. Once 272 awakened or timed out, it re-acquires the lock and returns. 273 274 When the timeout argument is present and not None, it should be a 275 floating point number specifying a timeout for the operation in seconds 276 (or fractions thereof). 277 278 When the underlying lock is an RLock, it is not released using its 279 release() method, since this may not actually unlock the lock when it 280 was acquired multiple times recursively. Instead, an internal interface 281 of the RLock class is used, which really unlocks it even when it has 282 been recursively acquired several times. Another internal interface is 283 then used to restore the recursion level when the lock is reacquired. 284 285 """ 286 if not self._is_owned(): 287 raise RuntimeError("cannot wait on un-acquired lock") 288 waiter = _allocate_lock() 289 waiter.acquire() 290 self._waiters.append(waiter) 291 saved_state = self._release_save() 292 gotit = False 293 try: # restore state no matter what (e.g., KeyboardInterrupt) 294 if timeout is None: 295 waiter.acquire() 296 gotit = True 297 else: 298 if timeout > 0: 299 gotit = waiter.acquire(True, timeout) 300 else: 301 gotit = waiter.acquire(False) 302 return gotit 303 finally: 304 self._acquire_restore(saved_state) 305 if not gotit: 306 try: 307 self._waiters.remove(waiter) 308 except ValueError: 309 pass 310 311 def wait_for(self, predicate, timeout=None): 312 """Wait until a condition evaluates to True. 313 314 predicate should be a callable which result will be interpreted as a 315 boolean value. A timeout may be provided giving the maximum time to 316 wait. 317 318 """ 319 endtime = None 320 waittime = timeout 321 result = predicate() 322 while not result: 323 if waittime is not None: 324 if endtime is None: 325 endtime = _time() + waittime 326 else: 327 waittime = endtime - _time() 328 if waittime <= 0: 329 break 330 self.wait(waittime) 331 result = predicate() 332 return result 333 334 def notify(self, n=1): 335 """Wake up one or more threads waiting on this condition, if any. 336 337 If the calling thread has not acquired the lock when this method is 338 called, a RuntimeError is raised. 339 340 This method wakes up at most n of the threads waiting for the condition 341 variable; it is a no-op if no threads are waiting. 342 343 """ 344 if not self._is_owned(): 345 raise RuntimeError("cannot notify on un-acquired lock") 346 all_waiters = self._waiters 347 waiters_to_notify = _deque(_islice(all_waiters, n)) 348 if not waiters_to_notify: 349 return 350 for waiter in waiters_to_notify: 351 waiter.release() 352 try: 353 all_waiters.remove(waiter) 354 except ValueError: 355 pass 356 357 def notify_all(self): 358 """Wake up all threads waiting on this condition. 359 360 If the calling thread has not acquired the lock when this method 361 is called, a RuntimeError is raised. 362 363 """ 364 self.notify(len(self._waiters)) 365 366 notifyAll = notify_all 367 368 369 class Semaphore: 370 """This class implements semaphore objects. 371 372 Semaphores manage a counter representing the number of release() calls minus 373 the number of acquire() calls, plus an initial value. The acquire() method 374 blocks if necessary until it can return without making the counter 375 negative. If not given, value defaults to 1. 376 377 """ 378 379 # After Tim Peters' semaphore class, but not quite the same (no maximum) 380 381 def __init__(self, value=1): 382 if value < 0: 383 raise ValueError("semaphore initial value must be >= 0") 384 self._cond = Condition(Lock()) 385 self._value = value 386 387 def acquire(self, blocking=True, timeout=None): 388 """Acquire a semaphore, decrementing the internal counter by one. 389 390 When invoked without arguments: if the internal counter is larger than 391 zero on entry, decrement it by one and return immediately. If it is zero 392 on entry, block, waiting until some other thread has called release() to 393 make it larger than zero. This is done with proper interlocking so that 394 if multiple acquire() calls are blocked, release() will wake exactly one 395 of them up. The implementation may pick one at random, so the order in 396 which blocked threads are awakened should not be relied on. There is no 397 return value in this case. 398 399 When invoked with blocking set to true, do the same thing as when called 400 without arguments, and return true. 401 402 When invoked with blocking set to false, do not block. If a call without 403 an argument would block, return false immediately; otherwise, do the 404 same thing as when called without arguments, and return true. 405 406 When invoked with a timeout other than None, it will block for at 407 most timeout seconds. If acquire does not complete successfully in 408 that interval, return false. Return true otherwise. 409 410 """ 411 if not blocking and timeout is not None: 412 raise ValueError("can't specify timeout for non-blocking acquire") 413 rc = False 414 endtime = None 415 with self._cond: 416 while self._value == 0: 417 if not blocking: 418 break 419 if timeout is not None: 420 if endtime is None: 421 endtime = _time() + timeout 422 else: 423 timeout = endtime - _time() 424 if timeout <= 0: 425 break 426 self._cond.wait(timeout) 427 else: 428 self._value -= 1 429 rc = True 430 return rc 431 432 __enter__ = acquire 433 434 def release(self): 435 """Release a semaphore, incrementing the internal counter by one. 436 437 When the counter is zero on entry and another thread is waiting for it 438 to become larger than zero again, wake up that thread. 439 440 """ 441 with self._cond: 442 self._value += 1 443 self._cond.notify() 444 445 def __exit__(self, t, v, tb): 446 self.release() 447 448 449 class BoundedSemaphore(Semaphore): 450 """Implements a bounded semaphore. 451 452 A bounded semaphore checks to make sure its current value doesn't exceed its 453 initial value. If it does, ValueError is raised. In most situations 454 semaphores are used to guard resources with limited capacity. 455 456 If the semaphore is released too many times it's a sign of a bug. If not 457 given, value defaults to 1. 458 459 Like regular semaphores, bounded semaphores manage a counter representing 460 the number of release() calls minus the number of acquire() calls, plus an 461 initial value. The acquire() method blocks if necessary until it can return 462 without making the counter negative. If not given, value defaults to 1. 463 464 """ 465 466 def __init__(self, value=1): 467 Semaphore.__init__(self, value) 468 self._initial_value = value 469 470 def release(self): 471 """Release a semaphore, incrementing the internal counter by one. 472 473 When the counter is zero on entry and another thread is waiting for it 474 to become larger than zero again, wake up that thread. 475 476 If the number of releases exceeds the number of acquires, 477 raise a ValueError. 478 479 """ 480 with self._cond: 481 if self._value >= self._initial_value: 482 raise ValueError("Semaphore released too many times") 483 self._value += 1 484 self._cond.notify() 485 486 487 class Event: 488 """Class implementing event objects. 489 490 Events manage a flag that can be set to true with the set() method and reset 491 to false with the clear() method. The wait() method blocks until the flag is 492 true. The flag is initially false. 493 494 """ 495 496 # After Tim Peters' event class (without is_posted()) 497 498 def __init__(self): 499 self._cond = Condition(Lock()) 500 self._flag = False 501 502 def _reset_internal_locks(self): 503 # private! called by Thread._reset_internal_locks by _after_fork() 504 self._cond.__init__(Lock()) 505 506 def is_set(self): 507 """Return true if and only if the internal flag is true.""" 508 return self._flag 509 510 isSet = is_set 511 512 def set(self): 513 """Set the internal flag to true. 514 515 All threads waiting for it to become true are awakened. Threads 516 that call wait() once the flag is true will not block at all. 517 518 """ 519 with self._cond: 520 self._flag = True 521 self._cond.notify_all() 522 523 def clear(self): 524 """Reset the internal flag to false. 525 526 Subsequently, threads calling wait() will block until set() is called to 527 set the internal flag to true again. 528 529 """ 530 with self._cond: 531 self._flag = False 532 533 def wait(self, timeout=None): 534 """Block until the internal flag is true. 535 536 If the internal flag is true on entry, return immediately. Otherwise, 537 block until another thread calls set() to set the flag to true, or until 538 the optional timeout occurs. 539 540 When the timeout argument is present and not None, it should be a 541 floating point number specifying a timeout for the operation in seconds 542 (or fractions thereof). 543 544 This method returns the internal flag on exit, so it will always return 545 True except if a timeout is given and the operation times out. 546 547 """ 548 with self._cond: 549 signaled = self._flag 550 if not signaled: 551 signaled = self._cond.wait(timeout) 552 return signaled 553 554 555 # A barrier class. Inspired in part by the pthread_barrier_* api and 556 # the CyclicBarrier class from Java. See 557 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 558 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 559 # CyclicBarrier.html 560 # for information. 561 # We maintain two main states, 'filling' and 'draining' enabling the barrier 562 # to be cyclic. Threads are not allowed into it until it has fully drained 563 # since the previous cycle. In addition, a 'resetting' state exists which is 564 # similar to 'draining' except that threads leave with a BrokenBarrierError, 565 # and a 'broken' state in which all threads get the exception. 566 class Barrier: 567 """Implements a Barrier. 568 569 Useful for synchronizing a fixed number of threads at known synchronization 570 points. Threads block on 'wait()' and are simultaneously once they have all 571 made that call. 572 573 """ 574 575 def __init__(self, parties, action=None, timeout=None): 576 """Create a barrier, initialised to 'parties' threads. 577 578 'action' is a callable which, when supplied, will be called by one of 579 the threads after they have all entered the barrier and just prior to 580 releasing them all. If a 'timeout' is provided, it is uses as the 581 default for all subsequent 'wait()' calls. 582 583 """ 584 self._cond = Condition(Lock()) 585 self._action = action 586 self._timeout = timeout 587 self._parties = parties 588 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 589 self._count = 0 590 591 def wait(self, timeout=None): 592 """Wait for the barrier. 593 594 When the specified number of threads have started waiting, they are all 595 simultaneously awoken. If an 'action' was provided for the barrier, one 596 of the threads will have executed that callback prior to returning. 597 Returns an individual index number from 0 to 'parties-1'. 598 599 """ 600 if timeout is None: 601 timeout = self._timeout 602 with self._cond: 603 self._enter() # Block while the barrier drains. 604 index = self._count 605 self._count += 1 606 try: 607 if index + 1 == self._parties: 608 # We release the barrier 609 self._release() 610 else: 611 # We wait until someone releases us 612 self._wait(timeout) 613 return index 614 finally: 615 self._count -= 1 616 # Wake up any threads waiting for barrier to drain. 617 self._exit() 618 619 # Block until the barrier is ready for us, or raise an exception 620 # if it is broken. 621 def _enter(self): 622 while self._state in (-1, 1): 623 # It is draining or resetting, wait until done 624 self._cond.wait() 625 #see if the barrier is in a broken state 626 if self._state < 0: 627 raise BrokenBarrierError 628 assert self._state == 0 629 630 # Optionally run the 'action' and release the threads waiting 631 # in the barrier. 632 def _release(self): 633 try: 634 if self._action: 635 self._action() 636 # enter draining state 637 self._state = 1 638 self._cond.notify_all() 639 except: 640 #an exception during the _action handler. Break and reraise 641 self._break() 642 raise 643 644 # Wait in the barrier until we are released. Raise an exception 645 # if the barrier is reset or broken. 646 def _wait(self, timeout): 647 if not self._cond.wait_for(lambda : self._state != 0, timeout): 648 #timed out. Break the barrier 649 self._break() 650 raise BrokenBarrierError 651 if self._state < 0: 652 raise BrokenBarrierError 653 assert self._state == 1 654 655 # If we are the last thread to exit the barrier, signal any threads 656 # waiting for the barrier to drain. 657 def _exit(self): 658 if self._count == 0: 659 if self._state in (-1, 1): 660 #resetting or draining 661 self._state = 0 662 self._cond.notify_all() 663 664 def reset(self): 665 """Reset the barrier to the initial state. 666 667 Any threads currently waiting will get the BrokenBarrier exception 668 raised. 669 670 """ 671 with self._cond: 672 if self._count > 0: 673 if self._state == 0: 674 #reset the barrier, waking up threads 675 self._state = -1 676 elif self._state == -2: 677 #was broken, set it to reset state 678 #which clears when the last thread exits 679 self._state = -1 680 else: 681 self._state = 0 682 self._cond.notify_all() 683 684 def abort(self): 685 """Place the barrier into a 'broken' state. 686 687 Useful in case of error. Any currently waiting threads and threads 688 attempting to 'wait()' will have BrokenBarrierError raised. 689 690 """ 691 with self._cond: 692 self._break() 693 694 def _break(self): 695 # An internal error was detected. The barrier is set to 696 # a broken state all parties awakened. 697 self._state = -2 698 self._cond.notify_all() 699 700 @property 701 def parties(self): 702 """Return the number of threads required to trip the barrier.""" 703 return self._parties 704 705 @property 706 def n_waiting(self): 707 """Return the number of threads currently waiting at the barrier.""" 708 # We don't need synchronization here since this is an ephemeral result 709 # anyway. It returns the correct value in the steady state. 710 if self._state == 0: 711 return self._count 712 return 0 713 714 @property 715 def broken(self): 716 """Return True if the barrier is in a broken state.""" 717 return self._state == -2 718 719 # exception raised by the Barrier class 720 class BrokenBarrierError(RuntimeError): 721 pass 722 723 724 # Helper to generate new thread names 725 _counter = _count().__next__ 726 _counter() # Consume 0 so first non-main thread has id 1. 727 def _newname(template="Thread-%d"): 728 return template % _counter() 729 730 # Active thread administration 731 _active_limbo_lock = _allocate_lock() 732 _active = {} # maps thread id to Thread object 733 _limbo = {} 734 _dangling = WeakSet() 735 736 # Main class for threads 737 738 class Thread: 739 """A class that represents a thread of control. 740 741 This class can be safely subclassed in a limited fashion. There are two ways 742 to specify the activity: by passing a callable object to the constructor, or 743 by overriding the run() method in a subclass. 744 745 """ 746 747 _initialized = False 748 # Need to store a reference to sys.exc_info for printing 749 # out exceptions when a thread tries to use a global var. during interp. 750 # shutdown and thus raises an exception about trying to perform some 751 # operation on/with a NoneType 752 _exc_info = _sys.exc_info 753 # Keep sys.exc_clear too to clear the exception just before 754 # allowing .join() to return. 755 #XXX __exc_clear = _sys.exc_clear 756 757 def __init__(self, group=None, target=None, name=None, 758 args=(), kwargs=None, *, daemon=None): 759 """This constructor should always be called with keyword arguments. Arguments are: 760 761 *group* should be None; reserved for future extension when a ThreadGroup 762 class is implemented. 763 764 *target* is the callable object to be invoked by the run() 765 method. Defaults to None, meaning nothing is called. 766 767 *name* is the thread name. By default, a unique name is constructed of 768 the form "Thread-N" where N is a small decimal number. 769 770 *args* is the argument tuple for the target invocation. Defaults to (). 771 772 *kwargs* is a dictionary of keyword arguments for the target 773 invocation. Defaults to {}. 774 775 If a subclass overrides the constructor, it must make sure to invoke 776 the base class constructor (Thread.__init__()) before doing anything 777 else to the thread. 778 779 """ 780 assert group is None, "group argument must be None for now" 781 if kwargs is None: 782 kwargs = {} 783 self._target = target 784 self._name = str(name or _newname()) 785 self._args = args 786 self._kwargs = kwargs 787 if daemon is not None: 788 self._daemonic = daemon 789 else: 790 self._daemonic = current_thread().daemon 791 self._ident = None 792 self._tstate_lock = None 793 self._started = Event() 794 self._is_stopped = False 795 self._initialized = True 796 # sys.stderr is not stored in the class like 797 # sys.exc_info since it can be changed between instances 798 self._stderr = _sys.stderr 799 # For debugging and _after_fork() 800 _dangling.add(self) 801 802 def _reset_internal_locks(self, is_alive): 803 # private! Called by _after_fork() to reset our internal locks as 804 # they may be in an invalid state leading to a deadlock or crash. 805 self._started._reset_internal_locks() 806 if is_alive: 807 self._set_tstate_lock() 808 else: 809 # The thread isn't alive after fork: it doesn't have a tstate 810 # anymore. 811 self._is_stopped = True 812 self._tstate_lock = None 813 814 def __repr__(self): 815 assert self._initialized, "Thread.__init__() was not called" 816 status = "initial" 817 if self._started.is_set(): 818 status = "started" 819 self.is_alive() # easy way to get ._is_stopped set when appropriate 820 if self._is_stopped: 821 status = "stopped" 822 if self._daemonic: 823 status += " daemon" 824 if self._ident is not None: 825 status += " %s" % self._ident 826 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 827 828 def start(self): 829 """Start the thread's activity. 830 831 It must be called at most once per thread object. It arranges for the 832 object's run() method to be invoked in a separate thread of control. 833 834 This method will raise a RuntimeError if called more than once on the 835 same thread object. 836 837 """ 838 if not self._initialized: 839 raise RuntimeError("thread.__init__() not called") 840 841 if self._started.is_set(): 842 raise RuntimeError("threads can only be started once") 843 with _active_limbo_lock: 844 _limbo[self] = self 845 try: 846 _start_new_thread(self._bootstrap, ()) 847 except Exception: 848 with _active_limbo_lock: 849 del _limbo[self] 850 raise 851 self._started.wait() 852 853 def run(self): 854 """Method representing the thread's activity. 855 856 You may override this method in a subclass. The standard run() method 857 invokes the callable object passed to the object's constructor as the 858 target argument, if any, with sequential and keyword arguments taken 859 from the args and kwargs arguments, respectively. 860 861 """ 862 try: 863 if self._target: 864 self._target(*self._args, **self._kwargs) 865 finally: 866 # Avoid a refcycle if the thread is running a function with 867 # an argument that has a member that points to the thread. 868 del self._target, self._args, self._kwargs 869 870 def _bootstrap(self): 871 # Wrapper around the real bootstrap code that ignores 872 # exceptions during interpreter cleanup. Those typically 873 # happen when a daemon thread wakes up at an unfortunate 874 # moment, finds the world around it destroyed, and raises some 875 # random exception *** while trying to report the exception in 876 # _bootstrap_inner() below ***. Those random exceptions 877 # don't help anybody, and they confuse users, so we suppress 878 # them. We suppress them only when it appears that the world 879 # indeed has already been destroyed, so that exceptions in 880 # _bootstrap_inner() during normal business hours are properly 881 # reported. Also, we only suppress them for daemonic threads; 882 # if a non-daemonic encounters this, something else is wrong. 883 try: 884 self._bootstrap_inner() 885 except: 886 if self._daemonic and _sys is None: 887 return 888 raise 889 890 def _set_ident(self): 891 self._ident = get_ident() 892 893 def _set_tstate_lock(self): 894 """ 895 Set a lock object which will be released by the interpreter when 896 the underlying thread state (see pystate.h) gets deleted. 897 """ 898 self._tstate_lock = _set_sentinel() 899 self._tstate_lock.acquire() 900 901 def _bootstrap_inner(self): 902 try: 903 self._set_ident() 904 self._set_tstate_lock() 905 self._started.set() 906 with _active_limbo_lock: 907 _active[self._ident] = self 908 del _limbo[self] 909 910 if _trace_hook: 911 _sys.settrace(_trace_hook) 912 if _profile_hook: 913 _sys.setprofile(_profile_hook) 914 915 try: 916 self.run() 917 except SystemExit: 918 pass 919 except: 920 # If sys.stderr is no more (most likely from interpreter 921 # shutdown) use self._stderr. Otherwise still use sys (as in 922 # _sys) in case sys.stderr was redefined since the creation of 923 # self. 924 if _sys and _sys.stderr is not None: 925 print("Exception in thread %s:\n%s" % 926 (self.name, _format_exc()), file=_sys.stderr) 927 elif self._stderr is not None: 928 # Do the best job possible w/o a huge amt. of code to 929 # approximate a traceback (code ideas from 930 # Lib/traceback.py) 931 exc_type, exc_value, exc_tb = self._exc_info() 932 try: 933 print(( 934 "Exception in thread " + self.name + 935 " (most likely raised during interpreter shutdown):"), file=self._stderr) 936 print(( 937 "Traceback (most recent call last):"), file=self._stderr) 938 while exc_tb: 939 print(( 940 ' File "%s", line %s, in %s' % 941 (exc_tb.tb_frame.f_code.co_filename, 942 exc_tb.tb_lineno, 943 exc_tb.tb_frame.f_code.co_name)), file=self._stderr) 944 exc_tb = exc_tb.tb_next 945 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) 946 # Make sure that exc_tb gets deleted since it is a memory 947 # hog; deleting everything else is just for thoroughness 948 finally: 949 del exc_type, exc_value, exc_tb 950 finally: 951 # Prevent a race in 952 # test_threading.test_no_refcycle_through_target when 953 # the exception keeps the target alive past when we 954 # assert that it's dead. 955 #XXX self._exc_clear() 956 pass 957 finally: 958 with _active_limbo_lock: 959 try: 960 # We don't call self._delete() because it also 961 # grabs _active_limbo_lock. 962 del _active[get_ident()] 963 except: 964 pass 965 966 def _stop(self): 967 # After calling ._stop(), .is_alive() returns False and .join() returns 968 # immediately. ._tstate_lock must be released before calling ._stop(). 969 # 970 # Normal case: C code at the end of the thread's life 971 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 972 # that's detected by our ._wait_for_tstate_lock(), called by .join() 973 # and .is_alive(). Any number of threads _may_ call ._stop() 974 # simultaneously (for example, if multiple threads are blocked in 975 # .join() calls), and they're not serialized. That's harmless - 976 # they'll just make redundant rebindings of ._is_stopped and 977 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 978 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 979 # (the assert is executed only if ._tstate_lock is None). 980 # 981 # Special case: _main_thread releases ._tstate_lock via this 982 # module's _shutdown() function. 983 lock = self._tstate_lock 984 if lock is not None: 985 assert not lock.locked() 986 self._is_stopped = True 987 self._tstate_lock = None 988 989 def _delete(self): 990 "Remove current thread from the dict of currently running threads." 991 992 # Notes about running with _dummy_thread: 993 # 994 # Must take care to not raise an exception if _dummy_thread is being 995 # used (and thus this module is being used as an instance of 996 # dummy_threading). _dummy_thread.get_ident() always returns -1 since 997 # there is only one thread if _dummy_thread is being used. Thus 998 # len(_active) is always <= 1 here, and any Thread instance created 999 # overwrites the (if any) thread currently registered in _active. 1000 # 1001 # An instance of _MainThread is always created by 'threading'. This 1002 # gets overwritten the instant an instance of Thread is created; both 1003 # threads return -1 from _dummy_thread.get_ident() and thus have the 1004 # same key in the dict. So when the _MainThread instance created by 1005 # 'threading' tries to clean itself up when atexit calls this method 1006 # it gets a KeyError if another Thread instance was created. 1007 # 1008 # This all means that KeyError from trying to delete something from 1009 # _active if dummy_threading is being used is a red herring. But 1010 # since it isn't if dummy_threading is *not* being used then don't 1011 # hide the exception. 1012 1013 try: 1014 with _active_limbo_lock: 1015 del _active[get_ident()] 1016 # There must not be any python code between the previous line 1017 # and after the lock is released. Otherwise a tracing function 1018 # could try to acquire the lock again in the same thread, (in 1019 # current_thread()), and would block. 1020 except KeyError: 1021 if 'dummy_threading' not in _sys.modules: 1022 raise 1023 1024 def join(self, timeout=None): 1025 """Wait until the thread terminates. 1026 1027 This blocks the calling thread until the thread whose join() method is 1028 called terminates -- either normally or through an unhandled exception 1029 or until the optional timeout occurs. 1030 1031 When the timeout argument is present and not None, it should be a 1032 floating point number specifying a timeout for the operation in seconds 1033 (or fractions thereof). As join() always returns None, you must call 1034 isAlive() after join() to decide whether a timeout happened -- if the 1035 thread is still alive, the join() call timed out. 1036 1037 When the timeout argument is not present or None, the operation will 1038 block until the thread terminates. 1039 1040 A thread can be join()ed many times. 1041 1042 join() raises a RuntimeError if an attempt is made to join the current 1043 thread as that would cause a deadlock. It is also an error to join() a 1044 thread before it has been started and attempts to do so raises the same 1045 exception. 1046 1047 """ 1048 if not self._initialized: 1049 raise RuntimeError("Thread.__init__() not called") 1050 if not self._started.is_set(): 1051 raise RuntimeError("cannot join thread before it is started") 1052 if self is current_thread(): 1053 raise RuntimeError("cannot join current thread") 1054 1055 if timeout is None: 1056 self._wait_for_tstate_lock() 1057 else: 1058 # the behavior of a negative timeout isn't documented, but 1059 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1060 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1061 1062 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1063 # Issue #18808: wait for the thread state to be gone. 1064 # At the end of the thread's life, after all knowledge of the thread 1065 # is removed from C data structures, C code releases our _tstate_lock. 1066 # This method passes its arguments to _tstate_lock.acquire(). 1067 # If the lock is acquired, the C code is done, and self._stop() is 1068 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1069 lock = self._tstate_lock 1070 if lock is None: # already determined that the C code is done 1071 assert self._is_stopped 1072 elif lock.acquire(block, timeout): 1073 lock.release() 1074 self._stop() 1075 1076 @property 1077 def name(self): 1078 """A string used for identification purposes only. 1079 1080 It has no semantics. Multiple threads may be given the same name. The 1081 initial name is set by the constructor. 1082 1083 """ 1084 assert self._initialized, "Thread.__init__() not called" 1085 return self._name 1086 1087 @name.setter 1088 def name(self, name): 1089 assert self._initialized, "Thread.__init__() not called" 1090 self._name = str(name) 1091 1092 @property 1093 def ident(self): 1094 """Thread identifier of this thread or None if it has not been started. 1095 1096 This is a nonzero integer. See the thread.get_ident() function. Thread 1097 identifiers may be recycled when a thread exits and another thread is 1098 created. The identifier is available even after the thread has exited. 1099 1100 """ 1101 assert self._initialized, "Thread.__init__() not called" 1102 return self._ident 1103 1104 def is_alive(self): 1105 """Return whether the thread is alive. 1106 1107 This method returns True just before the run() method starts until just 1108 after the run() method terminates. The module function enumerate() 1109 returns a list of all alive threads. 1110 1111 """ 1112 assert self._initialized, "Thread.__init__() not called" 1113 if self._is_stopped or not self._started.is_set(): 1114 return False 1115 self._wait_for_tstate_lock(False) 1116 return not self._is_stopped 1117 1118 isAlive = is_alive 1119 1120 @property 1121 def daemon(self): 1122 """A boolean value indicating whether this thread is a daemon thread. 1123 1124 This must be set before start() is called, otherwise RuntimeError is 1125 raised. Its initial value is inherited from the creating thread; the 1126 main thread is not a daemon thread and therefore all threads created in 1127 the main thread default to daemon = False. 1128 1129 The entire Python program exits when no alive non-daemon threads are 1130 left. 1131 1132 """ 1133 assert self._initialized, "Thread.__init__() not called" 1134 return self._daemonic 1135 1136 @daemon.setter 1137 def daemon(self, daemonic): 1138 if not self._initialized: 1139 raise RuntimeError("Thread.__init__() not called") 1140 if self._started.is_set(): 1141 raise RuntimeError("cannot set daemon status of active thread") 1142 self._daemonic = daemonic 1143 1144 def isDaemon(self): 1145 return self.daemon 1146 1147 def setDaemon(self, daemonic): 1148 self.daemon = daemonic 1149 1150 def getName(self): 1151 return self.name 1152 1153 def setName(self, name): 1154 self.name = name 1155 1156 # The timer class was contributed by Itamar Shtull-Trauring 1157 1158 class Timer(Thread): 1159 """Call a function after a specified number of seconds: 1160 1161 t = Timer(30.0, f, args=None, kwargs=None) 1162 t.start() 1163 t.cancel() # stop the timer's action if it's still waiting 1164 1165 """ 1166 1167 def __init__(self, interval, function, args=None, kwargs=None): 1168 Thread.__init__(self) 1169 self.interval = interval 1170 self.function = function 1171 self.args = args if args is not None else [] 1172 self.kwargs = kwargs if kwargs is not None else {} 1173 self.finished = Event() 1174 1175 def cancel(self): 1176 """Stop the timer if it hasn't finished yet.""" 1177 self.finished.set() 1178 1179 def run(self): 1180 self.finished.wait(self.interval) 1181 if not self.finished.is_set(): 1182 self.function(*self.args, **self.kwargs) 1183 self.finished.set() 1184 1185 # Special thread class to represent the main thread 1186 # This is garbage collected through an exit handler 1187 1188 class _MainThread(Thread): 1189 1190 def __init__(self): 1191 Thread.__init__(self, name="MainThread", daemon=False) 1192 self._set_tstate_lock() 1193 self._started.set() 1194 self._set_ident() 1195 with _active_limbo_lock: 1196 _active[self._ident] = self 1197 1198 1199 # Dummy thread class to represent threads not started here. 1200 # These aren't garbage collected when they die, nor can they be waited for. 1201 # If they invoke anything in threading.py that calls current_thread(), they 1202 # leave an entry in the _active dict forever after. 1203 # Their purpose is to return *something* from current_thread(). 1204 # They are marked as daemon threads so we won't wait for them 1205 # when we exit (conform previous semantics). 1206 1207 class _DummyThread(Thread): 1208 1209 def __init__(self): 1210 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1211 1212 self._started.set() 1213 self._set_ident() 1214 with _active_limbo_lock: 1215 _active[self._ident] = self 1216 1217 def _stop(self): 1218 pass 1219 1220 def is_alive(self): 1221 assert not self._is_stopped and self._started.is_set() 1222 return True 1223 1224 def join(self, timeout=None): 1225 assert False, "cannot join a dummy thread" 1226 1227 1228 # Global API functions 1229 1230 def current_thread(): 1231 """Return the current Thread object, corresponding to the caller's thread of control. 1232 1233 If the caller's thread of control was not created through the threading 1234 module, a dummy thread object with limited functionality is returned. 1235 1236 """ 1237 try: 1238 return _active[get_ident()] 1239 except KeyError: 1240 return _DummyThread() 1241 1242 currentThread = current_thread 1243 1244 def active_count(): 1245 """Return the number of Thread objects currently alive. 1246 1247 The returned count is equal to the length of the list returned by 1248 enumerate(). 1249 1250 """ 1251 with _active_limbo_lock: 1252 return len(_active) + len(_limbo) 1253 1254 activeCount = active_count 1255 1256 def _enumerate(): 1257 # Same as enumerate(), but without the lock. Internal use only. 1258 return list(_active.values()) + list(_limbo.values()) 1259 1260 def enumerate(): 1261 """Return a list of all Thread objects currently alive. 1262 1263 The list includes daemonic threads, dummy thread objects created by 1264 current_thread(), and the main thread. It excludes terminated threads and 1265 threads that have not yet been started. 1266 1267 """ 1268 with _active_limbo_lock: 1269 return list(_active.values()) + list(_limbo.values()) 1270 1271 from _thread import stack_size 1272 1273 # Create the main thread object, 1274 # and make it available for the interpreter 1275 # (Py_Main) as threading._shutdown. 1276 1277 _main_thread = _MainThread() 1278 1279 def _shutdown(): 1280 # Obscure: other threads may be waiting to join _main_thread. That's 1281 # dubious, but some code does it. We can't wait for C code to release 1282 # the main thread's tstate_lock - that won't happen until the interpreter 1283 # is nearly dead. So we release it here. Note that just calling _stop() 1284 # isn't enough: other threads may already be waiting on _tstate_lock. 1285 tlock = _main_thread._tstate_lock 1286 # The main thread isn't finished yet, so its thread state lock can't have 1287 # been released. 1288 assert tlock is not None 1289 assert tlock.locked() 1290 tlock.release() 1291 _main_thread._stop() 1292 t = _pickSomeNonDaemonThread() 1293 while t: 1294 t.join() 1295 t = _pickSomeNonDaemonThread() 1296 _main_thread._delete() 1297 1298 def _pickSomeNonDaemonThread(): 1299 for t in enumerate(): 1300 if not t.daemon and t.is_alive(): 1301 return t 1302 return None 1303 1304 def main_thread(): 1305 """Return the main thread object. 1306 1307 In normal conditions, the main thread is the thread from which the 1308 Python interpreter was started. 1309 """ 1310 return _main_thread 1311 1312 # get thread-local implementation, either from the thread 1313 # module, or from the python fallback 1314 1315 try: 1316 from _thread import _local as local 1317 except ImportError: 1318 from _threading_local import local 1319 1320 1321 def _after_fork(): 1322 # This function is called by Python/ceval.c:PyEval_ReInitThreads which 1323 # is called from PyOS_AfterFork. Here we cleanup threading module state 1324 # that should not exist after a fork. 1325 1326 # Reset _active_limbo_lock, in case we forked while the lock was held 1327 # by another (non-forked) thread. http://bugs.python.org/issue874900 1328 global _active_limbo_lock, _main_thread 1329 _active_limbo_lock = _allocate_lock() 1330 1331 # fork() only copied the current thread; clear references to others. 1332 new_active = {} 1333 current = current_thread() 1334 _main_thread = current 1335 with _active_limbo_lock: 1336 # Dangling thread instances must still have their locks reset, 1337 # because someone may join() them. 1338 threads = set(_enumerate()) 1339 threads.update(_dangling) 1340 for thread in threads: 1341 # Any lock/condition variable may be currently locked or in an 1342 # invalid state, so we reinitialize them. 1343 if thread is current: 1344 # There is only one active thread. We reset the ident to 1345 # its new value since it can have changed. 1346 thread._reset_internal_locks(True) 1347 ident = get_ident() 1348 thread._ident = ident 1349 new_active[ident] = thread 1350 else: 1351 # All the others are already stopped. 1352 thread._reset_internal_locks(False) 1353 thread._stop() 1354 1355 _limbo.clear() 1356 _active.clear() 1357 _active.update(new_active) 1358 assert len(_active) == 1 1359