1 """Thread module emulating a subset of Java's threading model.""" 2 3 import os as _os 4 import sys as _sys 5 import _thread 6 7 from time import monotonic as _time 8 from traceback import format_exc as _format_exc 9 from _weakrefset import WeakSet 10 from itertools import islice as _islice, count as _count 11 try: 12 from _collections import deque as _deque 13 except ImportError: 14 from collections import deque as _deque 15 16 # Note regarding PEP 8 compliant names 17 # This threading model was originally inspired by Java, and inherited 18 # the convention of camelCase function and method names from that 19 # language. Those original names are not in any imminent danger of 20 # being deprecated (even for Py3k),so this module provides them as an 21 # alias for the PEP 8 compliant names 22 # Note that using the new PEP 8 compliant names facilitates substitution 23 # with the multiprocessing module, which doesn't provide the old 24 # Java inspired names. 25 26 __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 27 'enumerate', 'main_thread', 'TIMEOUT_MAX', 28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 30 'setprofile', 'settrace', 'local', 'stack_size'] 31 32 # Rename some stuff so "from threading import *" is safe 33 _start_new_thread = _thread.start_new_thread 34 _allocate_lock = _thread.allocate_lock 35 _set_sentinel = _thread._set_sentinel 36 get_ident = _thread.get_ident 37 ThreadError = _thread.error 38 try: 39 _CRLock = _thread.RLock 40 except AttributeError: 41 _CRLock = None 42 TIMEOUT_MAX = _thread.TIMEOUT_MAX 43 del _thread 44 45 46 # Support for profile and trace hooks 47 48 _profile_hook = None 49 _trace_hook = None 50 51 def setprofile(func): 52 """Set a profile function for all threads started from the threading module. 53 54 The func will be passed to sys.setprofile() for each thread, before its 55 run() method is called. 56 57 """ 58 global _profile_hook 59 _profile_hook = func 60 61 def settrace(func): 62 """Set a trace function for all threads started from the threading module. 63 64 The func will be passed to sys.settrace() for each thread, before its run() 65 method is called. 66 67 """ 68 global _trace_hook 69 _trace_hook = func 70 71 # Synchronization classes 72 73 Lock = _allocate_lock 74 75 def RLock(*args, **kwargs): 76 """Factory function that returns a new reentrant lock. 77 78 A reentrant lock must be released by the thread that acquired it. Once a 79 thread has acquired a reentrant lock, the same thread may acquire it again 80 without blocking; the thread must release it once for each time it has 81 acquired it. 82 83 """ 84 if _CRLock is None: 85 return _PyRLock(*args, **kwargs) 86 return _CRLock(*args, **kwargs) 87 88 class _RLock: 89 """This class implements reentrant lock objects. 90 91 A reentrant lock must be released by the thread that acquired it. Once a 92 thread has acquired a reentrant lock, the same thread may acquire it 93 again without blocking; the thread must release it once for each time it 94 has acquired it. 95 96 """ 97 98 def __init__(self): 99 self._block = _allocate_lock() 100 self._owner = None 101 self._count = 0 102 103 def __repr__(self): 104 owner = self._owner 105 try: 106 owner = _active[owner].name 107 except KeyError: 108 pass 109 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 110 "locked" if self._block.locked() else "unlocked", 111 self.__class__.__module__, 112 self.__class__.__qualname__, 113 owner, 114 self._count, 115 hex(id(self)) 116 ) 117 118 def acquire(self, blocking=True, timeout=-1): 119 """Acquire a lock, blocking or non-blocking. 120 121 When invoked without arguments: if this thread already owns the lock, 122 increment the recursion level by one, and return immediately. Otherwise, 123 if another thread owns the lock, block until the lock is unlocked. Once 124 the lock is unlocked (not owned by any thread), then grab ownership, set 125 the recursion level to one, and return. If more than one thread is 126 blocked waiting until the lock is unlocked, only one at a time will be 127 able to grab ownership of the lock. There is no return value in this 128 case. 129 130 When invoked with the blocking argument set to true, do the same thing 131 as when called without arguments, and return true. 132 133 When invoked with the blocking argument set to false, do not block. If a 134 call without an argument would block, return false immediately; 135 otherwise, do the same thing as when called without arguments, and 136 return true. 137 138 When invoked with the floating-point timeout argument set to a positive 139 value, block for at most the number of seconds specified by timeout 140 and as long as the lock cannot be acquired. Return true if the lock has 141 been acquired, false if the timeout has elapsed. 142 143 """ 144 me = get_ident() 145 if self._owner == me: 146 self._count += 1 147 return 1 148 rc = self._block.acquire(blocking, timeout) 149 if rc: 150 self._owner = me 151 self._count = 1 152 return rc 153 154 __enter__ = acquire 155 156 def release(self): 157 """Release a lock, decrementing the recursion level. 158 159 If after the decrement it is zero, reset the lock to unlocked (not owned 160 by any thread), and if any other threads are blocked waiting for the 161 lock to become unlocked, allow exactly one of them to proceed. If after 162 the decrement the recursion level is still nonzero, the lock remains 163 locked and owned by the calling thread. 164 165 Only call this method when the calling thread owns the lock. A 166 RuntimeError is raised if this method is called when the lock is 167 unlocked. 168 169 There is no return value. 170 171 """ 172 if self._owner != get_ident(): 173 raise RuntimeError("cannot release un-acquired lock") 174 self._count = count = self._count - 1 175 if not count: 176 self._owner = None 177 self._block.release() 178 179 def __exit__(self, t, v, tb): 180 self.release() 181 182 # Internal methods used by condition variables 183 184 def _acquire_restore(self, state): 185 self._block.acquire() 186 self._count, self._owner = state 187 188 def _release_save(self): 189 if self._count == 0: 190 raise RuntimeError("cannot release un-acquired lock") 191 count = self._count 192 self._count = 0 193 owner = self._owner 194 self._owner = None 195 self._block.release() 196 return (count, owner) 197 198 def _is_owned(self): 199 return self._owner == get_ident() 200 201 _PyRLock = _RLock 202 203 204 class Condition: 205 """Class that implements a condition variable. 206 207 A condition variable allows one or more threads to wait until they are 208 notified by another thread. 209 210 If the lock argument is given and not None, it must be a Lock or RLock 211 object, and it is used as the underlying lock. Otherwise, a new RLock object 212 is created and used as the underlying lock. 213 214 """ 215 216 def __init__(self, lock=None): 217 if lock is None: 218 lock = RLock() 219 self._lock = lock 220 # Export the lock's acquire() and release() methods 221 self.acquire = lock.acquire 222 self.release = lock.release 223 # If the lock defines _release_save() and/or _acquire_restore(), 224 # these override the default implementations (which just call 225 # release() and acquire() on the lock). Ditto for _is_owned(). 226 try: 227 self._release_save = lock._release_save 228 except AttributeError: 229 pass 230 try: 231 self._acquire_restore = lock._acquire_restore 232 except AttributeError: 233 pass 234 try: 235 self._is_owned = lock._is_owned 236 except AttributeError: 237 pass 238 self._waiters = _deque() 239 240 def __enter__(self): 241 return self._lock.__enter__() 242 243 def __exit__(self, *args): 244 return self._lock.__exit__(*args) 245 246 def __repr__(self): 247 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 248 249 def _release_save(self): 250 self._lock.release() # No state to save 251 252 def _acquire_restore(self, x): 253 self._lock.acquire() # Ignore saved state 254 255 def _is_owned(self): 256 # Return True if lock is owned by current_thread. 257 # This method is called only if _lock doesn't have _is_owned(). 258 if self._lock.acquire(0): 259 self._lock.release() 260 return False 261 else: 262 return True 263 264 def wait(self, timeout=None): 265 """Wait until notified or until a timeout occurs. 266 267 If the calling thread has not acquired the lock when this method is 268 called, a RuntimeError is raised. 269 270 This method releases the underlying lock, and then blocks until it is 271 awakened by a notify() or notify_all() call for the same condition 272 variable in another thread, or until the optional timeout occurs. Once 273 awakened or timed out, it re-acquires the lock and returns. 274 275 When the timeout argument is present and not None, it should be a 276 floating point number specifying a timeout for the operation in seconds 277 (or fractions thereof). 278 279 When the underlying lock is an RLock, it is not released using its 280 release() method, since this may not actually unlock the lock when it 281 was acquired multiple times recursively. Instead, an internal interface 282 of the RLock class is used, which really unlocks it even when it has 283 been recursively acquired several times. Another internal interface is 284 then used to restore the recursion level when the lock is reacquired. 285 286 """ 287 if not self._is_owned(): 288 raise RuntimeError("cannot wait on un-acquired lock") 289 waiter = _allocate_lock() 290 waiter.acquire() 291 self._waiters.append(waiter) 292 saved_state = self._release_save() 293 gotit = False 294 try: # restore state no matter what (e.g., KeyboardInterrupt) 295 if timeout is None: 296 waiter.acquire() 297 gotit = True 298 else: 299 if timeout > 0: 300 gotit = waiter.acquire(True, timeout) 301 else: 302 gotit = waiter.acquire(False) 303 return gotit 304 finally: 305 self._acquire_restore(saved_state) 306 if not gotit: 307 try: 308 self._waiters.remove(waiter) 309 except ValueError: 310 pass 311 312 def wait_for(self, predicate, timeout=None): 313 """Wait until a condition evaluates to True. 314 315 predicate should be a callable which result will be interpreted as a 316 boolean value. A timeout may be provided giving the maximum time to 317 wait. 318 319 """ 320 endtime = None 321 waittime = timeout 322 result = predicate() 323 while not result: 324 if waittime is not None: 325 if endtime is None: 326 endtime = _time() + waittime 327 else: 328 waittime = endtime - _time() 329 if waittime <= 0: 330 break 331 self.wait(waittime) 332 result = predicate() 333 return result 334 335 def notify(self, n=1): 336 """Wake up one or more threads waiting on this condition, if any. 337 338 If the calling thread has not acquired the lock when this method is 339 called, a RuntimeError is raised. 340 341 This method wakes up at most n of the threads waiting for the condition 342 variable; it is a no-op if no threads are waiting. 343 344 """ 345 if not self._is_owned(): 346 raise RuntimeError("cannot notify on un-acquired lock") 347 all_waiters = self._waiters 348 waiters_to_notify = _deque(_islice(all_waiters, n)) 349 if not waiters_to_notify: 350 return 351 for waiter in waiters_to_notify: 352 waiter.release() 353 try: 354 all_waiters.remove(waiter) 355 except ValueError: 356 pass 357 358 def notify_all(self): 359 """Wake up all threads waiting on this condition. 360 361 If the calling thread has not acquired the lock when this method 362 is called, a RuntimeError is raised. 363 364 """ 365 self.notify(len(self._waiters)) 366 367 notifyAll = notify_all 368 369 370 class Semaphore: 371 """This class implements semaphore objects. 372 373 Semaphores manage a counter representing the number of release() calls minus 374 the number of acquire() calls, plus an initial value. The acquire() method 375 blocks if necessary until it can return without making the counter 376 negative. If not given, value defaults to 1. 377 378 """ 379 380 # After Tim Peters' semaphore class, but not quite the same (no maximum) 381 382 def __init__(self, value=1): 383 if value < 0: 384 raise ValueError("semaphore initial value must be >= 0") 385 self._cond = Condition(Lock()) 386 self._value = value 387 388 def acquire(self, blocking=True, timeout=None): 389 """Acquire a semaphore, decrementing the internal counter by one. 390 391 When invoked without arguments: if the internal counter is larger than 392 zero on entry, decrement it by one and return immediately. If it is zero 393 on entry, block, waiting until some other thread has called release() to 394 make it larger than zero. This is done with proper interlocking so that 395 if multiple acquire() calls are blocked, release() will wake exactly one 396 of them up. The implementation may pick one at random, so the order in 397 which blocked threads are awakened should not be relied on. There is no 398 return value in this case. 399 400 When invoked with blocking set to true, do the same thing as when called 401 without arguments, and return true. 402 403 When invoked with blocking set to false, do not block. If a call without 404 an argument would block, return false immediately; otherwise, do the 405 same thing as when called without arguments, and return true. 406 407 When invoked with a timeout other than None, it will block for at 408 most timeout seconds. If acquire does not complete successfully in 409 that interval, return false. Return true otherwise. 410 411 """ 412 if not blocking and timeout is not None: 413 raise ValueError("can't specify timeout for non-blocking acquire") 414 rc = False 415 endtime = None 416 with self._cond: 417 while self._value == 0: 418 if not blocking: 419 break 420 if timeout is not None: 421 if endtime is None: 422 endtime = _time() + timeout 423 else: 424 timeout = endtime - _time() 425 if timeout <= 0: 426 break 427 self._cond.wait(timeout) 428 else: 429 self._value -= 1 430 rc = True 431 return rc 432 433 __enter__ = acquire 434 435 def release(self): 436 """Release a semaphore, incrementing the internal counter by one. 437 438 When the counter is zero on entry and another thread is waiting for it 439 to become larger than zero again, wake up that thread. 440 441 """ 442 with self._cond: 443 self._value += 1 444 self._cond.notify() 445 446 def __exit__(self, t, v, tb): 447 self.release() 448 449 450 class BoundedSemaphore(Semaphore): 451 """Implements a bounded semaphore. 452 453 A bounded semaphore checks to make sure its current value doesn't exceed its 454 initial value. If it does, ValueError is raised. In most situations 455 semaphores are used to guard resources with limited capacity. 456 457 If the semaphore is released too many times it's a sign of a bug. If not 458 given, value defaults to 1. 459 460 Like regular semaphores, bounded semaphores manage a counter representing 461 the number of release() calls minus the number of acquire() calls, plus an 462 initial value. The acquire() method blocks if necessary until it can return 463 without making the counter negative. If not given, value defaults to 1. 464 465 """ 466 467 def __init__(self, value=1): 468 Semaphore.__init__(self, value) 469 self._initial_value = value 470 471 def release(self): 472 """Release a semaphore, incrementing the internal counter by one. 473 474 When the counter is zero on entry and another thread is waiting for it 475 to become larger than zero again, wake up that thread. 476 477 If the number of releases exceeds the number of acquires, 478 raise a ValueError. 479 480 """ 481 with self._cond: 482 if self._value >= self._initial_value: 483 raise ValueError("Semaphore released too many times") 484 self._value += 1 485 self._cond.notify() 486 487 488 class Event: 489 """Class implementing event objects. 490 491 Events manage a flag that can be set to true with the set() method and reset 492 to false with the clear() method. The wait() method blocks until the flag is 493 true. The flag is initially false. 494 495 """ 496 497 # After Tim Peters' event class (without is_posted()) 498 499 def __init__(self): 500 self._cond = Condition(Lock()) 501 self._flag = False 502 503 def _reset_internal_locks(self): 504 # private! called by Thread._reset_internal_locks by _after_fork() 505 self._cond.__init__(Lock()) 506 507 def is_set(self): 508 """Return true if and only if the internal flag is true.""" 509 return self._flag 510 511 isSet = is_set 512 513 def set(self): 514 """Set the internal flag to true. 515 516 All threads waiting for it to become true are awakened. Threads 517 that call wait() once the flag is true will not block at all. 518 519 """ 520 with self._cond: 521 self._flag = True 522 self._cond.notify_all() 523 524 def clear(self): 525 """Reset the internal flag to false. 526 527 Subsequently, threads calling wait() will block until set() is called to 528 set the internal flag to true again. 529 530 """ 531 with self._cond: 532 self._flag = False 533 534 def wait(self, timeout=None): 535 """Block until the internal flag is true. 536 537 If the internal flag is true on entry, return immediately. Otherwise, 538 block until another thread calls set() to set the flag to true, or until 539 the optional timeout occurs. 540 541 When the timeout argument is present and not None, it should be a 542 floating point number specifying a timeout for the operation in seconds 543 (or fractions thereof). 544 545 This method returns the internal flag on exit, so it will always return 546 True except if a timeout is given and the operation times out. 547 548 """ 549 with self._cond: 550 signaled = self._flag 551 if not signaled: 552 signaled = self._cond.wait(timeout) 553 return signaled 554 555 556 # A barrier class. Inspired in part by the pthread_barrier_* api and 557 # the CyclicBarrier class from Java. See 558 # http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 559 # http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 560 # CyclicBarrier.html 561 # for information. 562 # We maintain two main states, 'filling' and 'draining' enabling the barrier 563 # to be cyclic. Threads are not allowed into it until it has fully drained 564 # since the previous cycle. In addition, a 'resetting' state exists which is 565 # similar to 'draining' except that threads leave with a BrokenBarrierError, 566 # and a 'broken' state in which all threads get the exception. 567 class Barrier: 568 """Implements a Barrier. 569 570 Useful for synchronizing a fixed number of threads at known synchronization 571 points. Threads block on 'wait()' and are simultaneously awoken once they 572 have all made that call. 573 574 """ 575 576 def __init__(self, parties, action=None, timeout=None): 577 """Create a barrier, initialised to 'parties' threads. 578 579 'action' is a callable which, when supplied, will be called by one of 580 the threads after they have all entered the barrier and just prior to 581 releasing them all. If a 'timeout' is provided, it is used as the 582 default for all subsequent 'wait()' calls. 583 584 """ 585 self._cond = Condition(Lock()) 586 self._action = action 587 self._timeout = timeout 588 self._parties = parties 589 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 590 self._count = 0 591 592 def wait(self, timeout=None): 593 """Wait for the barrier. 594 595 When the specified number of threads have started waiting, they are all 596 simultaneously awoken. If an 'action' was provided for the barrier, one 597 of the threads will have executed that callback prior to returning. 598 Returns an individual index number from 0 to 'parties-1'. 599 600 """ 601 if timeout is None: 602 timeout = self._timeout 603 with self._cond: 604 self._enter() # Block while the barrier drains. 605 index = self._count 606 self._count += 1 607 try: 608 if index + 1 == self._parties: 609 # We release the barrier 610 self._release() 611 else: 612 # We wait until someone releases us 613 self._wait(timeout) 614 return index 615 finally: 616 self._count -= 1 617 # Wake up any threads waiting for barrier to drain. 618 self._exit() 619 620 # Block until the barrier is ready for us, or raise an exception 621 # if it is broken. 622 def _enter(self): 623 while self._state in (-1, 1): 624 # It is draining or resetting, wait until done 625 self._cond.wait() 626 #see if the barrier is in a broken state 627 if self._state < 0: 628 raise BrokenBarrierError 629 assert self._state == 0 630 631 # Optionally run the 'action' and release the threads waiting 632 # in the barrier. 633 def _release(self): 634 try: 635 if self._action: 636 self._action() 637 # enter draining state 638 self._state = 1 639 self._cond.notify_all() 640 except: 641 #an exception during the _action handler. Break and reraise 642 self._break() 643 raise 644 645 # Wait in the barrier until we are released. Raise an exception 646 # if the barrier is reset or broken. 647 def _wait(self, timeout): 648 if not self._cond.wait_for(lambda : self._state != 0, timeout): 649 #timed out. Break the barrier 650 self._break() 651 raise BrokenBarrierError 652 if self._state < 0: 653 raise BrokenBarrierError 654 assert self._state == 1 655 656 # If we are the last thread to exit the barrier, signal any threads 657 # waiting for the barrier to drain. 658 def _exit(self): 659 if self._count == 0: 660 if self._state in (-1, 1): 661 #resetting or draining 662 self._state = 0 663 self._cond.notify_all() 664 665 def reset(self): 666 """Reset the barrier to the initial state. 667 668 Any threads currently waiting will get the BrokenBarrier exception 669 raised. 670 671 """ 672 with self._cond: 673 if self._count > 0: 674 if self._state == 0: 675 #reset the barrier, waking up threads 676 self._state = -1 677 elif self._state == -2: 678 #was broken, set it to reset state 679 #which clears when the last thread exits 680 self._state = -1 681 else: 682 self._state = 0 683 self._cond.notify_all() 684 685 def abort(self): 686 """Place the barrier into a 'broken' state. 687 688 Useful in case of error. Any currently waiting threads and threads 689 attempting to 'wait()' will have BrokenBarrierError raised. 690 691 """ 692 with self._cond: 693 self._break() 694 695 def _break(self): 696 # An internal error was detected. The barrier is set to 697 # a broken state all parties awakened. 698 self._state = -2 699 self._cond.notify_all() 700 701 @property 702 def parties(self): 703 """Return the number of threads required to trip the barrier.""" 704 return self._parties 705 706 @property 707 def n_waiting(self): 708 """Return the number of threads currently waiting at the barrier.""" 709 # We don't need synchronization here since this is an ephemeral result 710 # anyway. It returns the correct value in the steady state. 711 if self._state == 0: 712 return self._count 713 return 0 714 715 @property 716 def broken(self): 717 """Return True if the barrier is in a broken state.""" 718 return self._state == -2 719 720 # exception raised by the Barrier class 721 class BrokenBarrierError(RuntimeError): 722 pass 723 724 725 # Helper to generate new thread names 726 _counter = _count().__next__ 727 _counter() # Consume 0 so first non-main thread has id 1. 728 def _newname(template="Thread-%d"): 729 return template % _counter() 730 731 # Active thread administration 732 _active_limbo_lock = _allocate_lock() 733 _active = {} # maps thread id to Thread object 734 _limbo = {} 735 _dangling = WeakSet() 736 737 # Main class for threads 738 739 class Thread: 740 """A class that represents a thread of control. 741 742 This class can be safely subclassed in a limited fashion. There are two ways 743 to specify the activity: by passing a callable object to the constructor, or 744 by overriding the run() method in a subclass. 745 746 """ 747 748 _initialized = False 749 # Need to store a reference to sys.exc_info for printing 750 # out exceptions when a thread tries to use a global var. during interp. 751 # shutdown and thus raises an exception about trying to perform some 752 # operation on/with a NoneType 753 _exc_info = _sys.exc_info 754 # Keep sys.exc_clear too to clear the exception just before 755 # allowing .join() to return. 756 #XXX __exc_clear = _sys.exc_clear 757 758 def __init__(self, group=None, target=None, name=None, 759 args=(), kwargs=None, *, daemon=None): 760 """This constructor should always be called with keyword arguments. Arguments are: 761 762 *group* should be None; reserved for future extension when a ThreadGroup 763 class is implemented. 764 765 *target* is the callable object to be invoked by the run() 766 method. Defaults to None, meaning nothing is called. 767 768 *name* is the thread name. By default, a unique name is constructed of 769 the form "Thread-N" where N is a small decimal number. 770 771 *args* is the argument tuple for the target invocation. Defaults to (). 772 773 *kwargs* is a dictionary of keyword arguments for the target 774 invocation. Defaults to {}. 775 776 If a subclass overrides the constructor, it must make sure to invoke 777 the base class constructor (Thread.__init__()) before doing anything 778 else to the thread. 779 780 """ 781 assert group is None, "group argument must be None for now" 782 if kwargs is None: 783 kwargs = {} 784 self._target = target 785 self._name = str(name or _newname()) 786 self._args = args 787 self._kwargs = kwargs 788 if daemon is not None: 789 self._daemonic = daemon 790 else: 791 self._daemonic = current_thread().daemon 792 self._ident = None 793 self._tstate_lock = None 794 self._started = Event() 795 self._is_stopped = False 796 self._initialized = True 797 # sys.stderr is not stored in the class like 798 # sys.exc_info since it can be changed between instances 799 self._stderr = _sys.stderr 800 # For debugging and _after_fork() 801 _dangling.add(self) 802 803 def _reset_internal_locks(self, is_alive): 804 # private! Called by _after_fork() to reset our internal locks as 805 # they may be in an invalid state leading to a deadlock or crash. 806 self._started._reset_internal_locks() 807 if is_alive: 808 self._set_tstate_lock() 809 else: 810 # The thread isn't alive after fork: it doesn't have a tstate 811 # anymore. 812 self._is_stopped = True 813 self._tstate_lock = None 814 815 def __repr__(self): 816 assert self._initialized, "Thread.__init__() was not called" 817 status = "initial" 818 if self._started.is_set(): 819 status = "started" 820 self.is_alive() # easy way to get ._is_stopped set when appropriate 821 if self._is_stopped: 822 status = "stopped" 823 if self._daemonic: 824 status += " daemon" 825 if self._ident is not None: 826 status += " %s" % self._ident 827 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 828 829 def start(self): 830 """Start the thread's activity. 831 832 It must be called at most once per thread object. It arranges for the 833 object's run() method to be invoked in a separate thread of control. 834 835 This method will raise a RuntimeError if called more than once on the 836 same thread object. 837 838 """ 839 if not self._initialized: 840 raise RuntimeError("thread.__init__() not called") 841 842 if self._started.is_set(): 843 raise RuntimeError("threads can only be started once") 844 with _active_limbo_lock: 845 _limbo[self] = self 846 try: 847 _start_new_thread(self._bootstrap, ()) 848 except Exception: 849 with _active_limbo_lock: 850 del _limbo[self] 851 raise 852 self._started.wait() 853 854 def run(self): 855 """Method representing the thread's activity. 856 857 You may override this method in a subclass. The standard run() method 858 invokes the callable object passed to the object's constructor as the 859 target argument, if any, with sequential and keyword arguments taken 860 from the args and kwargs arguments, respectively. 861 862 """ 863 try: 864 if self._target: 865 self._target(*self._args, **self._kwargs) 866 finally: 867 # Avoid a refcycle if the thread is running a function with 868 # an argument that has a member that points to the thread. 869 del self._target, self._args, self._kwargs 870 871 def _bootstrap(self): 872 # Wrapper around the real bootstrap code that ignores 873 # exceptions during interpreter cleanup. Those typically 874 # happen when a daemon thread wakes up at an unfortunate 875 # moment, finds the world around it destroyed, and raises some 876 # random exception *** while trying to report the exception in 877 # _bootstrap_inner() below ***. Those random exceptions 878 # don't help anybody, and they confuse users, so we suppress 879 # them. We suppress them only when it appears that the world 880 # indeed has already been destroyed, so that exceptions in 881 # _bootstrap_inner() during normal business hours are properly 882 # reported. Also, we only suppress them for daemonic threads; 883 # if a non-daemonic encounters this, something else is wrong. 884 try: 885 self._bootstrap_inner() 886 except: 887 if self._daemonic and _sys is None: 888 return 889 raise 890 891 def _set_ident(self): 892 self._ident = get_ident() 893 894 def _set_tstate_lock(self): 895 """ 896 Set a lock object which will be released by the interpreter when 897 the underlying thread state (see pystate.h) gets deleted. 898 """ 899 self._tstate_lock = _set_sentinel() 900 self._tstate_lock.acquire() 901 902 def _bootstrap_inner(self): 903 try: 904 self._set_ident() 905 self._set_tstate_lock() 906 self._started.set() 907 with _active_limbo_lock: 908 _active[self._ident] = self 909 del _limbo[self] 910 911 if _trace_hook: 912 _sys.settrace(_trace_hook) 913 if _profile_hook: 914 _sys.setprofile(_profile_hook) 915 916 try: 917 self.run() 918 except SystemExit: 919 pass 920 except: 921 # If sys.stderr is no more (most likely from interpreter 922 # shutdown) use self._stderr. Otherwise still use sys (as in 923 # _sys) in case sys.stderr was redefined since the creation of 924 # self. 925 if _sys and _sys.stderr is not None: 926 print("Exception in thread %s:\n%s" % 927 (self.name, _format_exc()), file=_sys.stderr) 928 elif self._stderr is not None: 929 # Do the best job possible w/o a huge amt. of code to 930 # approximate a traceback (code ideas from 931 # Lib/traceback.py) 932 exc_type, exc_value, exc_tb = self._exc_info() 933 try: 934 print(( 935 "Exception in thread " + self.name + 936 " (most likely raised during interpreter shutdown):"), file=self._stderr) 937 print(( 938 "Traceback (most recent call last):"), file=self._stderr) 939 while exc_tb: 940 print(( 941 ' File "%s", line %s, in %s' % 942 (exc_tb.tb_frame.f_code.co_filename, 943 exc_tb.tb_lineno, 944 exc_tb.tb_frame.f_code.co_name)), file=self._stderr) 945 exc_tb = exc_tb.tb_next 946 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) 947 self._stderr.flush() 948 # Make sure that exc_tb gets deleted since it is a memory 949 # hog; deleting everything else is just for thoroughness 950 finally: 951 del exc_type, exc_value, exc_tb 952 finally: 953 # Prevent a race in 954 # test_threading.test_no_refcycle_through_target when 955 # the exception keeps the target alive past when we 956 # assert that it's dead. 957 #XXX self._exc_clear() 958 pass 959 finally: 960 with _active_limbo_lock: 961 try: 962 # We don't call self._delete() because it also 963 # grabs _active_limbo_lock. 964 del _active[get_ident()] 965 except: 966 pass 967 968 def _stop(self): 969 # After calling ._stop(), .is_alive() returns False and .join() returns 970 # immediately. ._tstate_lock must be released before calling ._stop(). 971 # 972 # Normal case: C code at the end of the thread's life 973 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 974 # that's detected by our ._wait_for_tstate_lock(), called by .join() 975 # and .is_alive(). Any number of threads _may_ call ._stop() 976 # simultaneously (for example, if multiple threads are blocked in 977 # .join() calls), and they're not serialized. That's harmless - 978 # they'll just make redundant rebindings of ._is_stopped and 979 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 980 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 981 # (the assert is executed only if ._tstate_lock is None). 982 # 983 # Special case: _main_thread releases ._tstate_lock via this 984 # module's _shutdown() function. 985 lock = self._tstate_lock 986 if lock is not None: 987 assert not lock.locked() 988 self._is_stopped = True 989 self._tstate_lock = None 990 991 def _delete(self): 992 "Remove current thread from the dict of currently running threads." 993 with _active_limbo_lock: 994 del _active[get_ident()] 995 # There must not be any python code between the previous line 996 # and after the lock is released. Otherwise a tracing function 997 # could try to acquire the lock again in the same thread, (in 998 # current_thread()), and would block. 999 1000 def join(self, timeout=None): 1001 """Wait until the thread terminates. 1002 1003 This blocks the calling thread until the thread whose join() method is 1004 called terminates -- either normally or through an unhandled exception 1005 or until the optional timeout occurs. 1006 1007 When the timeout argument is present and not None, it should be a 1008 floating point number specifying a timeout for the operation in seconds 1009 (or fractions thereof). As join() always returns None, you must call 1010 is_alive() after join() to decide whether a timeout happened -- if the 1011 thread is still alive, the join() call timed out. 1012 1013 When the timeout argument is not present or None, the operation will 1014 block until the thread terminates. 1015 1016 A thread can be join()ed many times. 1017 1018 join() raises a RuntimeError if an attempt is made to join the current 1019 thread as that would cause a deadlock. It is also an error to join() a 1020 thread before it has been started and attempts to do so raises the same 1021 exception. 1022 1023 """ 1024 if not self._initialized: 1025 raise RuntimeError("Thread.__init__() not called") 1026 if not self._started.is_set(): 1027 raise RuntimeError("cannot join thread before it is started") 1028 if self is current_thread(): 1029 raise RuntimeError("cannot join current thread") 1030 1031 if timeout is None: 1032 self._wait_for_tstate_lock() 1033 else: 1034 # the behavior of a negative timeout isn't documented, but 1035 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1036 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1037 1038 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1039 # Issue #18808: wait for the thread state to be gone. 1040 # At the end of the thread's life, after all knowledge of the thread 1041 # is removed from C data structures, C code releases our _tstate_lock. 1042 # This method passes its arguments to _tstate_lock.acquire(). 1043 # If the lock is acquired, the C code is done, and self._stop() is 1044 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1045 lock = self._tstate_lock 1046 if lock is None: # already determined that the C code is done 1047 assert self._is_stopped 1048 elif lock.acquire(block, timeout): 1049 lock.release() 1050 self._stop() 1051 1052 @property 1053 def name(self): 1054 """A string used for identification purposes only. 1055 1056 It has no semantics. Multiple threads may be given the same name. The 1057 initial name is set by the constructor. 1058 1059 """ 1060 assert self._initialized, "Thread.__init__() not called" 1061 return self._name 1062 1063 @name.setter 1064 def name(self, name): 1065 assert self._initialized, "Thread.__init__() not called" 1066 self._name = str(name) 1067 1068 @property 1069 def ident(self): 1070 """Thread identifier of this thread or None if it has not been started. 1071 1072 This is a nonzero integer. See the get_ident() function. Thread 1073 identifiers may be recycled when a thread exits and another thread is 1074 created. The identifier is available even after the thread has exited. 1075 1076 """ 1077 assert self._initialized, "Thread.__init__() not called" 1078 return self._ident 1079 1080 def is_alive(self): 1081 """Return whether the thread is alive. 1082 1083 This method returns True just before the run() method starts until just 1084 after the run() method terminates. The module function enumerate() 1085 returns a list of all alive threads. 1086 1087 """ 1088 assert self._initialized, "Thread.__init__() not called" 1089 if self._is_stopped or not self._started.is_set(): 1090 return False 1091 self._wait_for_tstate_lock(False) 1092 return not self._is_stopped 1093 1094 def isAlive(self): 1095 """Return whether the thread is alive. 1096 1097 This method is deprecated, use is_alive() instead. 1098 """ 1099 import warnings 1100 warnings.warn('isAlive() is deprecated, use is_alive() instead', 1101 PendingDeprecationWarning, stacklevel=2) 1102 return self.is_alive() 1103 1104 @property 1105 def daemon(self): 1106 """A boolean value indicating whether this thread is a daemon thread. 1107 1108 This must be set before start() is called, otherwise RuntimeError is 1109 raised. Its initial value is inherited from the creating thread; the 1110 main thread is not a daemon thread and therefore all threads created in 1111 the main thread default to daemon = False. 1112 1113 The entire Python program exits when no alive non-daemon threads are 1114 left. 1115 1116 """ 1117 assert self._initialized, "Thread.__init__() not called" 1118 return self._daemonic 1119 1120 @daemon.setter 1121 def daemon(self, daemonic): 1122 if not self._initialized: 1123 raise RuntimeError("Thread.__init__() not called") 1124 if self._started.is_set(): 1125 raise RuntimeError("cannot set daemon status of active thread") 1126 self._daemonic = daemonic 1127 1128 def isDaemon(self): 1129 return self.daemon 1130 1131 def setDaemon(self, daemonic): 1132 self.daemon = daemonic 1133 1134 def getName(self): 1135 return self.name 1136 1137 def setName(self, name): 1138 self.name = name 1139 1140 # The timer class was contributed by Itamar Shtull-Trauring 1141 1142 class Timer(Thread): 1143 """Call a function after a specified number of seconds: 1144 1145 t = Timer(30.0, f, args=None, kwargs=None) 1146 t.start() 1147 t.cancel() # stop the timer's action if it's still waiting 1148 1149 """ 1150 1151 def __init__(self, interval, function, args=None, kwargs=None): 1152 Thread.__init__(self) 1153 self.interval = interval 1154 self.function = function 1155 self.args = args if args is not None else [] 1156 self.kwargs = kwargs if kwargs is not None else {} 1157 self.finished = Event() 1158 1159 def cancel(self): 1160 """Stop the timer if it hasn't finished yet.""" 1161 self.finished.set() 1162 1163 def run(self): 1164 self.finished.wait(self.interval) 1165 if not self.finished.is_set(): 1166 self.function(*self.args, **self.kwargs) 1167 self.finished.set() 1168 1169 1170 # Special thread class to represent the main thread 1171 1172 class _MainThread(Thread): 1173 1174 def __init__(self): 1175 Thread.__init__(self, name="MainThread", daemon=False) 1176 self._set_tstate_lock() 1177 self._started.set() 1178 self._set_ident() 1179 with _active_limbo_lock: 1180 _active[self._ident] = self 1181 1182 1183 # Dummy thread class to represent threads not started here. 1184 # These aren't garbage collected when they die, nor can they be waited for. 1185 # If they invoke anything in threading.py that calls current_thread(), they 1186 # leave an entry in the _active dict forever after. 1187 # Their purpose is to return *something* from current_thread(). 1188 # They are marked as daemon threads so we won't wait for them 1189 # when we exit (conform previous semantics). 1190 1191 class _DummyThread(Thread): 1192 1193 def __init__(self): 1194 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1195 1196 self._started.set() 1197 self._set_ident() 1198 with _active_limbo_lock: 1199 _active[self._ident] = self 1200 1201 def _stop(self): 1202 pass 1203 1204 def is_alive(self): 1205 assert not self._is_stopped and self._started.is_set() 1206 return True 1207 1208 def join(self, timeout=None): 1209 assert False, "cannot join a dummy thread" 1210 1211 1212 # Global API functions 1213 1214 def current_thread(): 1215 """Return the current Thread object, corresponding to the caller's thread of control. 1216 1217 If the caller's thread of control was not created through the threading 1218 module, a dummy thread object with limited functionality is returned. 1219 1220 """ 1221 try: 1222 return _active[get_ident()] 1223 except KeyError: 1224 return _DummyThread() 1225 1226 currentThread = current_thread 1227 1228 def active_count(): 1229 """Return the number of Thread objects currently alive. 1230 1231 The returned count is equal to the length of the list returned by 1232 enumerate(). 1233 1234 """ 1235 with _active_limbo_lock: 1236 return len(_active) + len(_limbo) 1237 1238 activeCount = active_count 1239 1240 def _enumerate(): 1241 # Same as enumerate(), but without the lock. Internal use only. 1242 return list(_active.values()) + list(_limbo.values()) 1243 1244 def enumerate(): 1245 """Return a list of all Thread objects currently alive. 1246 1247 The list includes daemonic threads, dummy thread objects created by 1248 current_thread(), and the main thread. It excludes terminated threads and 1249 threads that have not yet been started. 1250 1251 """ 1252 with _active_limbo_lock: 1253 return list(_active.values()) + list(_limbo.values()) 1254 1255 from _thread import stack_size 1256 1257 # Create the main thread object, 1258 # and make it available for the interpreter 1259 # (Py_Main) as threading._shutdown. 1260 1261 _main_thread = _MainThread() 1262 1263 def _shutdown(): 1264 # Obscure: other threads may be waiting to join _main_thread. That's 1265 # dubious, but some code does it. We can't wait for C code to release 1266 # the main thread's tstate_lock - that won't happen until the interpreter 1267 # is nearly dead. So we release it here. Note that just calling _stop() 1268 # isn't enough: other threads may already be waiting on _tstate_lock. 1269 if _main_thread._is_stopped: 1270 # _shutdown() was already called 1271 return 1272 tlock = _main_thread._tstate_lock 1273 # The main thread isn't finished yet, so its thread state lock can't have 1274 # been released. 1275 assert tlock is not None 1276 assert tlock.locked() 1277 tlock.release() 1278 _main_thread._stop() 1279 t = _pickSomeNonDaemonThread() 1280 while t: 1281 t.join() 1282 t = _pickSomeNonDaemonThread() 1283 1284 def _pickSomeNonDaemonThread(): 1285 for t in enumerate(): 1286 if not t.daemon and t.is_alive(): 1287 return t 1288 return None 1289 1290 def main_thread(): 1291 """Return the main thread object. 1292 1293 In normal conditions, the main thread is the thread from which the 1294 Python interpreter was started. 1295 """ 1296 return _main_thread 1297 1298 # get thread-local implementation, either from the thread 1299 # module, or from the python fallback 1300 1301 try: 1302 from _thread import _local as local 1303 except ImportError: 1304 from _threading_local import local 1305 1306 1307 def _after_fork(): 1308 """ 1309 Cleanup threading module state that should not exist after a fork. 1310 """ 1311 # Reset _active_limbo_lock, in case we forked while the lock was held 1312 # by another (non-forked) thread. http://bugs.python.org/issue874900 1313 global _active_limbo_lock, _main_thread 1314 _active_limbo_lock = _allocate_lock() 1315 1316 # fork() only copied the current thread; clear references to others. 1317 new_active = {} 1318 current = current_thread() 1319 _main_thread = current 1320 with _active_limbo_lock: 1321 # Dangling thread instances must still have their locks reset, 1322 # because someone may join() them. 1323 threads = set(_enumerate()) 1324 threads.update(_dangling) 1325 for thread in threads: 1326 # Any lock/condition variable may be currently locked or in an 1327 # invalid state, so we reinitialize them. 1328 if thread is current: 1329 # There is only one active thread. We reset the ident to 1330 # its new value since it can have changed. 1331 thread._reset_internal_locks(True) 1332 ident = get_ident() 1333 thread._ident = ident 1334 new_active[ident] = thread 1335 else: 1336 # All the others are already stopped. 1337 thread._reset_internal_locks(False) 1338 thread._stop() 1339 1340 _limbo.clear() 1341 _active.clear() 1342 _active.update(new_active) 1343 assert len(_active) == 1 1344 1345 1346 if hasattr(_os, "register_at_fork"): 1347 _os.register_at_fork(after_in_child=_after_fork) 1348